# Copyright (c) 2024 Elektrobit Automotive GmbH
#
# This program and the accompanying materials are made available under the
# terms of the Apache License, Version 2.0 which is available at
# https://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# SPDX-License-Identifier: Apache-2.0
"""
This script defines the Workload class for creating and managing workloads and
the AccessRightRule class for managing access rights.
Classes
--------
- :class:`Workload`:
Represents a workload with various attributes and methods to update them.
- :class:`AccessRightRule`:
Represents an access right rule for a workload. It can be either a
state rule or a log rule.
Usage
------
- Create a workload using the WorkloadBuilder:
.. code-block:: python
workload = Workload.builder().build()
- Update fields of the workload:
.. code-block:: python
workload.update_agent_name("agent_B")
- Update dependencies:
.. code-block:: python
deps = workload.get_dependencies()
deps["other_workload"] = "ADD_COND_SUCCEEDED"
workload.update_dependencies(deps)
- Update tags:
.. code-block:: python
tags = workload.get_tags()
del tags["key1"]
workload.update_tags(tags)
- Print the updated workload:
.. code-block:: python
print(workload)
- Create an access state rule:
.. code-block:: python
rule = AccessRightRule.state_rule("Read", ["*"])
- Create an access log rule:
.. code-block:: python
rule = AccessRightRule.log_rule(['workload_A'])
"""
__all__ = ["Workload", "AccessRightRule"]
from typing import TYPE_CHECKING
from .._protos import _ank_base
from ..exceptions import WorkloadFieldException
from ..utils import get_logger, WORKLOADS_PREFIX
from .._components.file import File
logger = get_logger()
if TYPE_CHECKING:
from .workload_builder import WorkloadBuilder # pragma: no cover
# pylint: disable=too-many-public-methods
[docs]
class Workload:
"""
A class to represent a workload.
:var str name:
The workload name.
"""
[docs]
def __init__(self, name: str) -> None:
"""
Initialize a Workload object.
The Workload object should be created using the
Workload.builder() method.
:param name: The workload name.
:type name: str
"""
self._workload = _ank_base.Workload()
self.name = name
self._main_mask = f"{WORKLOADS_PREFIX}.{self.name}"
self._masks = [self._main_mask]
[docs]
def __str__(self) -> str:
"""
Return a string representation of the Workload object.
:returns: String representation of the Workload object.
:rtype: str
"""
return str(self._to_proto())
[docs]
@staticmethod
def builder() -> "WorkloadBuilder":
"""
Return a WorkloadBuilder object.
:returns: A builder object to create a Workload.
:rtype: WorkloadBuilder
"""
# pylint: disable=import-outside-toplevel
from .workload_builder import WorkloadBuilder
return WorkloadBuilder()
[docs]
def update_workload_name(self, name: str) -> None:
"""
Set the workload name.
:param name: The workload name to update.
:type name: str
"""
self.name = name
self._add_mask(self._main_mask)
[docs]
def update_agent_name(self, agent_name: str) -> None:
"""
Set the agent name for the workload.
:param agent_name: The agent name to update.
:type agent_name: str
"""
self._workload.agent = agent_name
self._add_mask(f"{self._main_mask}.agent")
[docs]
def update_runtime(self, runtime: str) -> None:
"""
Set the runtime for the workload.
:param runtime: The runtime to update.
:type runtime: str
"""
self._workload.runtime = runtime
self._add_mask(f"{self._main_mask}.runtime")
[docs]
def update_runtime_config(self, config: str) -> None:
"""
Set the runtime-specific configuration for the workload.
:param config: The runtime configuration to update.
:type config: str
"""
self._workload.runtimeConfig = config
self._add_mask(f"{self._main_mask}.runtimeConfig")
[docs]
def update_runtime_config_from_file(self, config_file: str) -> None:
"""
Set the runtime-specific configuration for the workload from a file.
:param config_file: The path to the configuration file.
:type config_file: str
"""
with open(config_file, "r", encoding="utf-8") as file:
self.update_runtime_config(file.read())
[docs]
def update_restart_policy(self, policy: str) -> None:
"""
Set the restart policy for the workload.
Supported values: `NEVER`, `ON_FAILURE`, `ALWAYS`.
:param policy: The restart policy to update.
:type policy: str
:raises WorkloadFieldException:
If an invalid restart policy is provided.
"""
if policy not in _ank_base.RestartPolicy.keys():
logger.error("Invalid restart policy provided.")
raise WorkloadFieldException(
"restart policy", policy, _ank_base.RestartPolicy.keys()
)
self._workload.restartPolicy = _ank_base.RestartPolicy.Value(policy)
self._add_mask(f"{self._main_mask}.restartPolicy")
[docs]
def get_dependencies(self) -> dict:
"""
Return the dependencies of the workload.
:returns: A dictionary of dependencies with workload names
as keys and conditions as values.
:rtype: dict
"""
deps = dict(self._workload.dependencies.dependencies)
for dep in deps:
deps[dep] = _ank_base.AddCondition.Name(deps[dep])
return deps
[docs]
def update_dependencies(self, dependencies: dict[str, str]) -> None:
"""
Update the dependencies of the workload.
Supported conditions: `ADD_COND_RUNNING`, `ADD_COND_SUCCEEDED`,
`ADD_COND_FAILED`.
:param dependencies: A dictionary of dependencies with
workload names and condition as values.
:type dependencies: dict
:raises WorkloadFieldException: If an invalid condition is provided.
"""
self._workload.dependencies.dependencies.clear()
for workload_name, condition in dependencies.items():
if condition not in _ank_base.AddCondition.keys():
logger.error("Invalid dependency condition provided.")
raise WorkloadFieldException(
"dependency condition",
condition,
_ank_base.AddCondition.keys(),
)
self._workload.dependencies.dependencies[workload_name] = (
_ank_base.AddCondition.Value(condition)
)
self._add_mask(f"{self._main_mask}.dependencies")
[docs]
def add_tag(self, key: str, value: str) -> None:
"""
Add a tag to the workload.
:param key: The key of the tag.
:type key: str
:param value: The value of the tag.
:type value: str
"""
tag = {key: value}
self._workload.tags.tags.update(tag)
if f"{self._main_mask}.tags" not in self._masks:
self._add_mask(f"{self._main_mask}.tags.{key}")
[docs]
def get_allow_rules(self) -> list["AccessRightRule"]:
"""
Return the allow rules of the workload.
:returns: A list of AccessRightRules
:rtype: list
"""
rules = []
for rule in self._workload.controlInterfaceAccess.allowRules:
rules.append(AccessRightRule(rule))
return rules
[docs]
def update_allow_rules(self, rules: list["AccessRightRule"]) -> None:
"""
Update the allow rules of the workload.
:param rules: A list of AccessRightRules.
:type rules: list
"""
del self._workload.controlInterfaceAccess.allowRules[:]
for rule in rules:
self._workload.controlInterfaceAccess.allowRules.append(
rule._to_proto()
)
self._add_mask(f"{self._main_mask}.controlInterfaceAccess.allowRules")
[docs]
def get_deny_rules(self) -> list["AccessRightRule"]:
"""
Return the deny rules of the workload.
:returns: A list of AccessRightRules
:rtype: list
"""
rules = []
for rule in self._workload.controlInterfaceAccess.denyRules:
rules.append(AccessRightRule(rule))
return rules
[docs]
def update_deny_rules(self, rules: list["AccessRightRule"]) -> None:
"""
Update the deny rules of the workload.
:param rules: A list of AccessRightRules.
:type rules: list
"""
del self._workload.controlInterfaceAccess.denyRules[:]
for rule in rules:
self._workload.controlInterfaceAccess.denyRules.append(
rule._to_proto()
)
self._add_mask(f"{self._main_mask}.controlInterfaceAccess.denyRules")
[docs]
def add_config(self, alias: str, name: str) -> None:
"""
Link a configuration to the workload.
:param alias: The alias of the configuration.
:type alias: str
:param name: The name of the configuration.
:type name: str
"""
self._workload.configs.configs[alias] = name
self._add_mask(f"{self._main_mask}.configs")
[docs]
def get_configs(self) -> dict[str, str]:
"""
Return the configurations linked to the workload.
:returns: A dict containing the alias as key and name of the
configuration as value.
:rtype: dict[str, str]
"""
config_mappings = {}
for alias, name in self._workload.configs.configs.items():
config_mappings[alias] = name
return config_mappings
[docs]
def update_configs(self, configs: dict[str, str]) -> None:
"""
Update the configurations linked to the workload.
:param configs: A tuple containing the alias and
name of the configurations.
:type configs: dict[str, str]
"""
self._workload.configs.configs.clear()
for alias, name in configs.items():
self.add_config(alias, name)
[docs]
def add_file(self, file: File) -> None:
"""
Link a workload file to the workload.
:param file: The File object to mount to the workload.
:type file: File
"""
self._workload.files.files.append(file._to_proto())
self._add_mask(f"{self._main_mask}.files")
[docs]
def get_files(self) -> list[File]:
"""
Return the files linked to the workload.
:returns: A list of File objects mounted to the workload.
:rtype: list[File]
"""
return [File._from_proto(file) for file in self._workload.files.files]
[docs]
def update_files(self, files: list[File]) -> None:
"""
Update the files linked to the workload.
:param files: List of File objects mounted to the workload.
:type files: list[File]
"""
del self._workload.files.files[:]
for file in files:
self.add_file(file)
def _add_mask(self, mask: str) -> None:
"""
Add a mask to the list of masks.
:param mask: The mask to add.
:type mask: str
"""
if self._main_mask not in self._masks and mask not in self._masks:
self._masks.append(mask)
# pylint: disable=too-many-branches
[docs]
def to_dict(self) -> dict:
"""
Convert the Workload object to a dictionary.
:returns: The dictionary representation of the Workload object.
:rtype: dict
"""
workload_dict = {}
if self._workload.agent:
workload_dict["agent"] = self._workload.agent
if self._workload.runtime:
workload_dict["runtime"] = self._workload.runtime
if self._workload.runtimeConfig:
workload_dict["runtimeConfig"] = self._workload.runtimeConfig
workload_dict["restartPolicy"] = _ank_base.RestartPolicy.Name(
self._workload.restartPolicy
)
workload_dict["dependencies"] = {}
if self._workload.dependencies:
for (
dep_key,
dep_value,
) in self._workload.dependencies.dependencies.items():
workload_dict["dependencies"][dep_key] = (
_ank_base.AddCondition.Name(dep_value)
)
workload_dict["tags"] = {}
if self._workload.tags:
for key, value in self._workload.tags.tags.items():
workload_dict["tags"].update({key: value})
workload_dict["controlInterfaceAccess"] = {}
if self._workload.controlInterfaceAccess:
workload_dict["controlInterfaceAccess"]["allowRules"] = []
for rule in self._workload.controlInterfaceAccess.allowRules:
access_rule = AccessRightRule(rule)
workload_dict["controlInterfaceAccess"]["allowRules"].append(
access_rule.to_dict()
)
workload_dict["controlInterfaceAccess"]["denyRules"] = []
for rule in self._workload.controlInterfaceAccess.denyRules:
access_rule = AccessRightRule(rule)
workload_dict["controlInterfaceAccess"]["denyRules"].append(
access_rule.to_dict()
)
workload_dict["configs"] = {}
for alias, name in self._workload.configs.configs.items():
workload_dict["configs"][alias] = name
workload_dict["files"] = []
for file in self._workload.files.files:
workload_dict["files"].append(File._from_proto(file).to_dict())
return workload_dict
# pylint: disable=too-many-branches
@staticmethod
def _from_dict(workload_name: str, dict_workload: dict) -> "Workload":
"""
Convert a dictionary to a Workload object.
:param workload_name: The name of the workload.
:type workload_name: str
:param dict_workload: The dictionary to convert.
:type dict_workload: dict
:returns: The Workload object created from the dictionary.
:rtype: Workload
"""
workload = Workload.builder().workload_name(workload_name)
if "agent" in dict_workload:
workload = workload.agent_name(dict_workload["agent"])
if "runtime" in dict_workload:
workload = workload.runtime(dict_workload["runtime"])
if "runtimeConfig" in dict_workload:
workload = workload.runtime_config(dict_workload["runtimeConfig"])
if "restartPolicy" in dict_workload:
workload = workload.restart_policy(dict_workload["restartPolicy"])
if "dependencies" in dict_workload:
for dep_key, dep_value in dict_workload["dependencies"].items():
workload = workload.add_dependency(dep_key, dep_value)
if "tags" in dict_workload:
for key, value in dict_workload["tags"].items():
workload = workload.add_tag(key, value)
if "controlInterfaceAccess" in dict_workload:
if "allowRules" in dict_workload["controlInterfaceAccess"]:
for rule in dict_workload["controlInterfaceAccess"][
"allowRules"
]:
workload = workload.add_allow_state_rule(
rule["operation"], rule["filterMask"]
)
if "denyRules" in dict_workload["controlInterfaceAccess"]:
for rule in dict_workload["controlInterfaceAccess"][
"denyRules"
]:
workload = workload.add_deny_state_rule(
rule["operation"], rule["filterMask"]
)
if "configs" in dict_workload:
for alias, name in dict_workload["configs"].items():
workload = workload.add_config(alias, name)
if "files" in dict_workload:
for file in dict_workload["files"]:
workload = workload.add_file(File._from_dict(file))
return workload.build()
def _to_proto(self) -> _ank_base.Workload:
"""
Convert the Workload object to a proto message.
:returns: The proto message representation
of the Workload object.
:rtype: _ank_base.Workload
"""
return self._workload
def _from_proto(self, proto: _ank_base.Workload) -> None:
"""
Convert the proto message to a Workload object.
:param proto: The proto message to convert.
:type proto: _ank_base.Workload
"""
self._workload = proto
self._masks = []
class AccessRightRule:
"""
Represents an access right rule for a workload. It can be either a
state rule or a log rule.
"""
def __init__(self, rule: _ank_base.AccessRightsRule) -> None:
"""
Initializes the AccessRightRule. For initialization, use
the static methods `state_rule` or `log_rule`, depending
on the type of rule you want to create.
:param rule: The access right rule.
:type rule: _ank_base.AccessRightsRule
"""
self._rule = rule
def __str__(self) -> str:
"""
Returns the string representation of the access right rule.
:returns: The string representation of the access right rule.
:rtype: str
"""
if self.type == "StateRule":
operation, filter_masks = self._state_rule_to_str(
self._rule.stateRule
)
return f"StateRule: {operation}, {filter_masks}"
if self.type == "LogRule":
return f"LogRule: {self._rule.logRule.workloadNames}"
return "Unknown rule"
@property
def type(self) -> str:
"""
Returns the type of the access right rule.
:returns: The type of the access right rule.
:rtype: str
"""
if self._rule.HasField("stateRule"):
return "StateRule"
if self._rule.HasField("logRule"):
return "LogRule"
return "Unknown"
@staticmethod
def state_rule(
operation: str, filter_masks: list[str]
) -> "AccessRightRule":
"""
Create an access state rule for a workload.
Supported operations: `Nothing`, `Write`, `Read`, `ReadWrite`.
:param operation: The operation the rule allows.
:type operation: str
:param filter_masks: The list of filter masks.
:type filter_masks: list
:returns: The access right rule object.
:rtype: AccessRightRule
:raises WorkloadFieldException: If an invalid operation is provided.
"""
return AccessRightRule(
_ank_base.AccessRightsRule(
stateRule=AccessRightRule._generate_state_rule(
operation, filter_masks
)
)
)
@staticmethod
def log_rule(workload_names: list[str]) -> "AccessRightRule":
"""
Create an access log rule for a workload.
:param workload_names: The list of workload names.
:type workload_names: list
:returns: The access right rule object.
:rtype: AccessRightRule
"""
return AccessRightRule(
_ank_base.AccessRightsRule(
logRule=_ank_base.LogRule(workloadNames=workload_names)
)
)
def _to_proto(self) -> _ank_base.AccessRightsRule:
"""
Convert the AccessRightRule object to a proto message.
:returns: The proto message representation
of the AccessRightRule object.
:rtype: _ank_base.AccessRightsRule
"""
return self._rule
def to_dict(self) -> dict:
"""
Convert the AccessRightRule object to a dictionary.
:returns: The dictionary representation of the AccessRightRule object.
:rtype: dict
"""
if self.type == "StateRule":
operation, filter_masks = self._state_rule_to_str(
self._rule.stateRule
)
return {
"type": "StateRule",
"operation": operation,
"filterMask": [str(mask) for mask in filter_masks],
}
if self.type == "LogRule":
return {
"type": "LogRule",
"workloadNames": [
str(name) for name in self._rule.logRule.workloadNames
],
}
return {"type": "Unknown"}
@staticmethod
def _generate_state_rule(
operation: str, filter_masks: list[str]
) -> _ank_base.StateRule:
"""
Generate an access rights rule for the workload.
:param operation: The operation the rule allows.
:type operation: str
:param filter_masks: The list of filter masks.
:type filter_masks: list
:returns: The state rule generated.
:rtype: _ank_base.StateRule
:raises WorkloadFieldException: If an invalid operation is provided.
"""
enum_mapper = {
"Nothing": _ank_base.ReadWriteEnum.RW_NOTHING,
"Write": _ank_base.ReadWriteEnum.RW_WRITE,
"Read": _ank_base.ReadWriteEnum.RW_READ,
"ReadWrite": _ank_base.ReadWriteEnum.RW_READ_WRITE,
}
if operation not in enum_mapper:
logger.error("Invalid state rule operation provided.")
raise WorkloadFieldException(
"state rule operation", operation, enum_mapper.keys()
)
return _ank_base.StateRule(
operation=enum_mapper[operation], filterMasks=filter_masks
)
@staticmethod
def _state_rule_to_str(rule: _ank_base.StateRule) -> tuple[str, list[str]]:
"""
Convert an access rights rule to a tuple.
:param rule: The state rule to convert.
:type rule: _ank_base.StateRule
:returns: A tuple containing operation and filter masks.
:rtype: tuple
"""
enum_mapper = {
_ank_base.ReadWriteEnum.RW_NOTHING: "Nothing",
_ank_base.ReadWriteEnum.RW_WRITE: "Write",
_ank_base.ReadWriteEnum.RW_READ: "Read",
_ank_base.ReadWriteEnum.RW_READ_WRITE: "ReadWrite",
}
return (enum_mapper[rule.operation], rule.filterMasks)