Source code for ankaios_sdk._components.workload

# Copyright (c) 2024 Elektrobit Automotive GmbH
#
# This program and the accompanying materials are made available under the
# terms of the Apache License, Version 2.0 which is available at
# https://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# SPDX-License-Identifier: Apache-2.0

"""
This script defines the Workload class for creating and managing workloads and
the AccessRightRule class for managing access rights.

Classes
--------

- Workload:
    Represents a workload with various attributes and methods to update them.
- AccessRightRule:
    Represents an access right rule for a workload. It can be either a
    state rule or a log rule.

Usage
------

- Create a workload using the WorkloadBuilder:
    .. code-block:: python

        workload = Workload.builder().build()

- Update fields of the workload:
    .. code-block:: python

        workload.update_agent_name("agent_B")

- Update dependencies:
    .. code-block:: python

        deps = workload.get_dependencies()
        deps["other_workload"] = "ADD_COND_SUCCEEDED"
        workload.update_dependencies(deps)

- Update tags:
    .. code-block:: python

        tags = workload.get_tags()
        tags.pop("key1")
        workload.update_tags(tags)

- Print the updated workload:
    .. code-block:: python

        print(workload)

- Create an access state rule:
    .. code-block:: python

        rule = AccessRightRule.state_rule("Read", ["*"])

- Create an access log rule:
    .. code-block:: python

        rule = AccessRightRule.log_rule(['workload_A'])
"""


__all__ = ["Workload", "AccessRightRule"]

from typing import TYPE_CHECKING
from .._protos import _ank_base
from ..exceptions import WorkloadFieldException
from ..utils import get_logger, WORKLOADS_PREFIX
from .._components.file import File


logger = get_logger()
if TYPE_CHECKING:
    from .workload_builder import WorkloadBuilder  # pragma: no cover


# pylint: disable=too-many-public-methods
[docs] class Workload: """ A class to represent a workload. Attributes: name (str): The workload name. """
[docs] def __init__(self, name: str) -> None: """ Initialize a Workload object. The Workload object should be created using the Workload.builder() method. Args: name (str): The workload name. """ self._workload = _ank_base.Workload() self.name = name self._main_mask = f"{WORKLOADS_PREFIX}.{self.name}" self.masks = [self._main_mask]
[docs] def __str__(self) -> str: """ Return a string representation of the Workload object. Returns: str: String representation of the Workload object. """ return str(self._to_proto())
[docs] @staticmethod def builder() -> "WorkloadBuilder": """ Return a WorkloadBuilder object. Returns: WorkloadBuilder: A builder object to create a Workload. """ # pylint: disable=import-outside-toplevel from .workload_builder import WorkloadBuilder return WorkloadBuilder()
[docs] def update_workload_name(self, name: str) -> None: """ Set the workload name. Args: name (str): The workload name to update. """ self.name = name self._add_mask(self._main_mask)
[docs] def update_agent_name(self, agent_name: str) -> None: """ Set the agent name for the workload. Args: agent_name (str): The agent name to update. """ self._workload.agent = agent_name self._add_mask(f"{self._main_mask}.agent")
[docs] def update_runtime(self, runtime: str) -> None: """ Set the runtime for the workload. Args: runtime (str): The runtime to update. """ self._workload.runtime = runtime self._add_mask(f"{self._main_mask}.runtime")
[docs] def update_runtime_config(self, config: str) -> None: """ Set the runtime-specific configuration for the workload. Args: config (str): The runtime configuration to update. """ self._workload.runtimeConfig = config self._add_mask(f"{self._main_mask}.runtimeConfig")
[docs] def update_runtime_config_from_file(self, config_file: str) -> None: """ Set the runtime-specific configuration for the workload from a file. Args: config_file (str): The path to the configuration file. """ with open(config_file, "r", encoding="utf-8") as file: self.update_runtime_config(file.read())
[docs] def update_restart_policy(self, policy: str) -> None: """ Set the restart policy for the workload. Supported values: `NEVER`, `ON_FAILURE`, `ALWAYS`. Args: policy (str): The restart policy to update. Raises: WorkloadFieldException: If an invalid restart policy is provided. """ if policy not in _ank_base.RestartPolicy.keys(): logger.error("Invalid restart policy provided.") raise WorkloadFieldException( "restart policy", policy, _ank_base.RestartPolicy.keys() ) self._workload.restartPolicy = _ank_base.RestartPolicy.Value(policy) self._add_mask(f"{self._main_mask}.restartPolicy")
[docs] def get_dependencies(self) -> dict: """ Return the dependencies of the workload. Returns: dict: A dictionary of dependencies with workload names \ as keys and conditions as values. """ deps = dict(self._workload.dependencies.dependencies) for dep in deps: deps[dep] = _ank_base.AddCondition.Name(deps[dep]) return deps
[docs] def update_dependencies(self, dependencies: dict[str, str]) -> None: """ Update the dependencies of the workload. Supported conditions: `ADD_COND_RUNNING`, `ADD_COND_SUCCEEDED`, `ADD_COND_FAILED`. Args: dependencies (dict): A dictionary of dependencies with workload names and condition as values. Raises: WorkloadFieldException: If an invalid condition is provided. """ self._workload.dependencies.dependencies.clear() for workload_name, condition in dependencies.items(): if condition not in _ank_base.AddCondition.keys(): logger.error("Invalid dependency condition provided.") raise WorkloadFieldException( "dependency condition", condition, _ank_base.AddCondition.keys(), ) self._workload.dependencies.dependencies[workload_name] = ( _ank_base.AddCondition.Value(condition) ) self._add_mask(f"{self._main_mask}.dependencies")
[docs] def add_tag(self, key: str, value: str) -> None: """ Add a tag to the workload. Args: key (str): The key of the tag. value (str): The value of the tag. """ tag = _ank_base.Tag(key=key, value=value) self._workload.tags.tags.append(tag) if f"{self._main_mask}.tags" not in self.masks: self._add_mask(f"{self._main_mask}.tags.{key}")
[docs] def get_tags(self) -> list[tuple[str, str]]: """ Return the tags of the workload. Returns: list: A list of tuples containing tag keys and values. """ tags = [] for tag in self._workload.tags.tags: tags.append((tag.key, tag.value)) return tags
[docs] def update_tags(self, tags: list) -> None: """ Update the tags of the workload. Args: tags (list): A list of tuples containing tag keys and values. """ del self._workload.tags.tags[:] for key, value in tags: tag = _ank_base.Tag(key=key, value=value) self._workload.tags.tags.append(tag) self.masks = [ mask for mask in self.masks if not mask.startswith(f"{self._main_mask}.tags") ] self._add_mask(f"{self._main_mask}.tags")
[docs] def get_allow_rules(self) -> list["AccessRightRule"]: """ Return the allow rules of the workload. Returns: list: A list of AccessRightRules """ rules = [] for rule in self._workload.controlInterfaceAccess.allowRules: rules.append(AccessRightRule(rule)) return rules
[docs] def update_allow_rules(self, rules: list["AccessRightRule"]) -> None: """ Update the allow rules of the workload. Args: rules (list): A list of AccessRightRules. """ del self._workload.controlInterfaceAccess.allowRules[:] for rule in rules: self._workload.controlInterfaceAccess.allowRules.append( rule._to_proto() ) self._add_mask(f"{self._main_mask}.controlInterfaceAccess.allowRules")
[docs] def get_deny_rules(self) -> list["AccessRightRule"]: """ Return the deny rules of the workload. Returns: list: A list of AccessRightRules """ rules = [] for rule in self._workload.controlInterfaceAccess.denyRules: rules.append(AccessRightRule(rule)) return rules
[docs] def update_deny_rules(self, rules: list["AccessRightRule"]) -> None: """ Update the deny rules of the workload. Args: rules (list): A list of AccessRightRules. """ del self._workload.controlInterfaceAccess.denyRules[:] for rule in rules: self._workload.controlInterfaceAccess.denyRules.append( rule._to_proto() ) self._add_mask(f"{self._main_mask}.controlInterfaceAccess.denyRules")
[docs] def add_config(self, alias: str, name: str) -> None: """ Link a configuration to the workload. Args: alias (str): The alias of the configuration. name (str): The name of the configuration. """ self._workload.configs.configs[alias] = name self._add_mask(f"{self._main_mask}.configs")
[docs] def get_configs(self) -> dict[str, str]: """ Return the configurations linked to the workload. Returns: dict[str, str]: A dict containing the alias as key and name of the configuration as value. """ config_mappings = {} for alias, name in self._workload.configs.configs.items(): config_mappings[alias] = name return config_mappings
[docs] def update_configs(self, configs: dict[str, str]) -> None: """ Update the configurations linked to the workload. Args: configs (dict[str, str]): A tuple containing the alias and name of the configurations. """ self._workload.configs.configs.clear() for alias, name in configs.items(): self.add_config(alias, name)
[docs] def add_file(self, file: File) -> None: """ Link a workload file to the workload. Args: file (File): The File object to mount to the workload. """ self._workload.files.files.append(file._to_proto()) self._add_mask(f"{self._main_mask}.files")
[docs] def get_files(self) -> list[File]: """ Return the files linked to the workload. Returns: list[File]: A list of File objects mounted to the workload. """ return [File._from_proto(file) for file in self._workload.files.files]
[docs] def update_files(self, files: list[File]) -> None: """ Update the files linked to the workload. Args: files (list[File]): List of File objects mounted to the workload. """ del self._workload.files.files[:] for file in files: self.add_file(file)
def _add_mask(self, mask: str) -> None: """ Add a mask to the list of masks. Args: mask (str): The mask to add. """ if self._main_mask not in self.masks and mask not in self.masks: self.masks.append(mask) # pylint: disable=too-many-branches
[docs] def to_dict(self) -> dict: """ Convert the Workload object to a dictionary. Returns: dict: The dictionary representation of the Workload object. """ workload_dict = {} if self._workload.agent: workload_dict["agent"] = self._workload.agent if self._workload.runtime: workload_dict["runtime"] = self._workload.runtime if self._workload.runtimeConfig: workload_dict["runtimeConfig"] = self._workload.runtimeConfig workload_dict["restartPolicy"] = _ank_base.RestartPolicy.Name( self._workload.restartPolicy ) workload_dict["dependencies"] = {} if self._workload.dependencies: for ( dep_key, dep_value, ) in self._workload.dependencies.dependencies.items(): workload_dict["dependencies"][dep_key] = ( _ank_base.AddCondition.Name(dep_value) ) workload_dict["tags"] = [] if self._workload.tags: for tag in self._workload.tags.tags: workload_dict["tags"].append( {"key": tag.key, "value": tag.value} ) workload_dict["controlInterfaceAccess"] = {} if self._workload.controlInterfaceAccess: workload_dict["controlInterfaceAccess"]["allowRules"] = [] for rule in self._workload.controlInterfaceAccess.allowRules: access_rule = AccessRightRule(rule) workload_dict["controlInterfaceAccess"]["allowRules"].append( access_rule.to_dict() ) workload_dict["controlInterfaceAccess"]["denyRules"] = [] for rule in self._workload.controlInterfaceAccess.denyRules: access_rule = AccessRightRule(rule) workload_dict["controlInterfaceAccess"]["denyRules"].append( access_rule.to_dict() ) workload_dict["configs"] = {} for alias, name in self._workload.configs.configs.items(): workload_dict["configs"][alias] = name workload_dict["files"] = [] for file in self._workload.files.files: workload_dict["files"].append(File._from_proto(file).to_dict()) return workload_dict
# pylint: disable=too-many-branches @staticmethod def _from_dict(workload_name: str, dict_workload: dict) -> "Workload": """ Convert a dictionary to a Workload object. Args: workload_name (str): The name of the workload. dict_workload (dict): The dictionary to convert. Returns: Workload: The Workload object created from the dictionary. """ workload = Workload.builder().workload_name(workload_name) if "agent" in dict_workload: workload = workload.agent_name(dict_workload["agent"]) if "runtime" in dict_workload: workload = workload.runtime(dict_workload["runtime"]) if "runtimeConfig" in dict_workload: workload = workload.runtime_config(dict_workload["runtimeConfig"]) if "restartPolicy" in dict_workload: workload = workload.restart_policy(dict_workload["restartPolicy"]) if "dependencies" in dict_workload: for dep_key, dep_value in dict_workload["dependencies"].items(): workload = workload.add_dependency(dep_key, dep_value) if "tags" in dict_workload: for tag in dict_workload["tags"]: workload = workload.add_tag(tag["key"], tag["value"]) if "controlInterfaceAccess" in dict_workload: if "allowRules" in dict_workload["controlInterfaceAccess"]: for rule in dict_workload["controlInterfaceAccess"][ "allowRules" ]: workload = workload.add_allow_state_rule( rule["operation"], rule["filterMask"] ) if "denyRules" in dict_workload["controlInterfaceAccess"]: for rule in dict_workload["controlInterfaceAccess"][ "denyRules" ]: workload = workload.add_deny_state_rule( rule["operation"], rule["filterMask"] ) if "configs" in dict_workload: for alias, name in dict_workload["configs"].items(): workload = workload.add_config(alias, name) if "files" in dict_workload: for file in dict_workload["files"]: workload = workload.add_file(File._from_dict(file)) return workload.build() def _to_proto(self) -> _ank_base.Workload: """ Convert the Workload object to a proto message. Returns: _ank_base.Workload: The proto message representation of the Workload object. """ return self._workload def _from_proto(self, proto: _ank_base.Workload) -> None: """ Convert the proto message to a Workload object. Args: proto (_ank_base.Workload): The proto message to convert. """ self._workload = proto self.masks = []
[docs] class AccessRightRule: """ Represents an access right rule for a workload. It can be either a state rule or a log rule. """
[docs] def __init__(self, rule: _ank_base.AccessRightsRule) -> None: """ Initializes the AccessRightRule. For initialization, use the static methods `state_rule` or `log_rule`, depending on the type of rule you want to create. Args: rule (_ank_base.AccessRightsRule): The access right rule. """ self._rule = rule
[docs] def __str__(self) -> str: """ Returns the string representation of the access right rule. Returns: str: The string representation of the access right rule. """ if self.type == "StateRule": operation, filter_masks = self._state_rule_to_str( self._rule.stateRule ) return f"StateRule: {operation}, {filter_masks}" if self.type == "LogRule": return f"LogRule: {self._rule.logRule.workloadNames}" return "Unknown rule"
@property def type(self) -> str: """ Returns the type of the access right rule. Returns: str: The type of the access right rule. """ if self._rule.HasField("stateRule"): return "StateRule" if self._rule.HasField("logRule"): return "LogRule" return "Unknown"
[docs] @staticmethod def state_rule( operation: str, filter_masks: list[str] ) -> "AccessRightRule": """ Create an access state rule for a workload. Supported operations: `Nothing`, `Write`, `Read`, `ReadWrite`. Args: operation (str): The operation the rule allows. filter_masks (list): The list of filter masks. Returns: AccessRightRule: The access right rule object. Raises: WorkloadFieldException: If an invalid operation is provided. """ return AccessRightRule( _ank_base.AccessRightsRule( stateRule=AccessRightRule._generate_state_rule( operation, filter_masks ) ) )
[docs] @staticmethod def log_rule(workload_names: list[str]) -> "AccessRightRule": """ Create an access log rule for a workload. Args: workload_names (list): The list of workload names. Returns: AccessRightRule: The access right rule object. """ return AccessRightRule( _ank_base.AccessRightsRule( logRule=_ank_base.LogRule(workloadNames=workload_names) ) )
def _to_proto(self) -> _ank_base.AccessRightsRule: """ Convert the AccessRightRule object to a proto message. Returns: _ank_base.AccessRightsRule: The proto message representation of the AccessRightRule object. """ return self._rule
[docs] def to_dict(self) -> dict: """ Convert the AccessRightRule object to a dictionary. Returns: dict: The dictionary representation of the AccessRightRule object. """ if self.type == "StateRule": operation, filter_masks = self._state_rule_to_str( self._rule.stateRule ) return { "type": "StateRule", "operation": operation, "filterMask": [str(mask) for mask in filter_masks], } if self.type == "LogRule": return { "type": "LogRule", "workloadNames": [ str(name) for name in self._rule.logRule.workloadNames ], } return {"type": "Unknown"}
@staticmethod def _generate_state_rule( operation: str, filter_masks: list[str] ) -> _ank_base.StateRule: """ Generate an access rights rule for the workload. Args: operation (str): The operation the rule allows. filter_masks (list): The list of filter masks. Returns: _ank_base.StateRule: The state rule generated. Raises: WorkloadFieldException: If an invalid operation is provided. """ enum_mapper = { "Nothing": _ank_base.ReadWriteEnum.RW_NOTHING, "Write": _ank_base.ReadWriteEnum.RW_WRITE, "Read": _ank_base.ReadWriteEnum.RW_READ, "ReadWrite": _ank_base.ReadWriteEnum.RW_READ_WRITE, } if operation not in enum_mapper: logger.error("Invalid state rule operation provided.") raise WorkloadFieldException( "state rule operation", operation, enum_mapper.keys() ) return _ank_base.StateRule( operation=enum_mapper[operation], filterMasks=filter_masks ) @staticmethod def _state_rule_to_str(rule: _ank_base.StateRule) -> tuple[str, list[str]]: """ Convert an access rights rule to a tuple. Args: rule (_ank_base.StateRule): The state rule to convert. Returns: tuple: A tuple containing operation and filter masks. """ enum_mapper = { _ank_base.ReadWriteEnum.RW_NOTHING: "Nothing", _ank_base.ReadWriteEnum.RW_WRITE: "Write", _ank_base.ReadWriteEnum.RW_READ: "Read", _ank_base.ReadWriteEnum.RW_READ_WRITE: "ReadWrite", } return (enum_mapper[rule.operation], rule.filterMasks)