# Copyright (c) 2024 Elektrobit Automotive GmbH
#
# This program and the accompanying materials are made available under the
# terms of the Apache License, Version 2.0 which is available at
# https://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# SPDX-License-Identifier: Apache-2.0
"""
This script defines the Workload and WorkloadBuilder classes for
creating and managing workloads.
Classes
--------
- Workload:
Represents a workload with various attributes and methods to update them.
- WorkloadBuilder:
A builder class to create a Workload object with a fluent interface.
Usage
------
- Create a workload using the WorkloadBuilder:
.. code-block:: python
workload = Workload.builder() \\
.workload_name("nginx") \\
.agent_name("agent_A") \\
.runtime("podman") \\
.restart_policy("NEVER") \\
.runtime_config("image: docker.io/library/nginx\\n"
+ "commandOptions: [\\"-p\\", \\"8080:80\\"]") \\
.add_dependency("other_workload", "ADD_COND_RUNNING") \\
.add_tag("key1", "value1") \\
.add_tag("key2", "value2") \\
.build()
- Update fields of the workload:
.. code-block:: python
workload.update_agent_name("agent_B")
- Update dependencies:
.. code-block:: python
deps = workload.get_dependencies()
deps["other_workload"] = "ADD_COND_SUCCEEDED"
workload.update_dependencies(deps)
- Update tags:
.. code-block:: python
tags = workload.get_tags()
tags.pop("key1")
workload.update_tags(tags)
- Print the updated workload:
.. code-block:: python
print(workload)
"""
__all__ = ["Workload", "WorkloadBuilder"]
from .._protos import _ank_base
from ..exceptions import WorkloadFieldException, WorkloadBuilderException
from ..utils import get_logger, WORKLOADS_PREFIX
# pylint: disable=too-many-public-methods
[docs]
class Workload:
"""
A class to represent a workload.
Attributes:
name (str): The workload name.
"""
[docs]
def __init__(self, name: str) -> None:
"""
Initialize a Workload object.
The Workload object should be created using the
Workload.builder() method.
Args:
name (str): The workload name.
"""
self._workload = _ank_base.Workload()
self.name = name
self._main_mask = f"{WORKLOADS_PREFIX}.{self.name}"
self.masks = [self._main_mask]
self.logger = get_logger()
[docs]
def __str__(self) -> str:
"""
Return a string representation of the Workload object.
Returns:
str: String representation of the Workload object.
"""
return str(self._to_proto())
[docs]
@staticmethod
def builder() -> "WorkloadBuilder":
"""
Return a WorkloadBuilder object.
Returns:
WorkloadBuilder: A builder object to create a Workload.
"""
return WorkloadBuilder()
[docs]
def update_workload_name(self, name: str) -> None:
"""
Set the workload name.
Args:
name (str): The workload name to update.
"""
self.name = name
self._add_mask(self._main_mask)
[docs]
def update_agent_name(self, agent_name: str) -> None:
"""
Set the agent name for the workload.
Args:
agent_name (str): The agent name to update.
"""
self._workload.agent = agent_name
self._add_mask(f"{self._main_mask}.agent")
[docs]
def update_runtime(self, runtime: str) -> None:
"""
Set the runtime for the workload.
Args:
runtime (str): The runtime to update.
"""
self._workload.runtime = runtime
self._add_mask(f"{self._main_mask}.runtime")
[docs]
def update_runtime_config(self, config: str) -> None:
"""
Set the runtime-specific configuration for the workload.
Args:
config (str): The runtime configuration to update.
"""
self._workload.runtimeConfig = config
self._add_mask(f"{self._main_mask}.runtimeConfig")
[docs]
def update_runtime_config_from_file(self, config_file: str) -> None:
"""
Set the runtime-specific configuration for the workload from a file.
Args:
config_file (str): The path to the configuration file.
"""
with open(config_file, "r", encoding="utf-8") as file:
self.update_runtime_config(file.read())
[docs]
def update_restart_policy(self, policy: str) -> None:
"""
Set the restart policy for the workload.
Supported values: `NEVER`, `ON_FAILURE`, `ALWAYS`.
Args:
policy (str): The restart policy to update.
Raises:
WorkloadFieldException: If an invalid restart policy is provided.
"""
if policy not in _ank_base.RestartPolicy.keys():
self.logger.error(
"Invalid restart policy provided.")
raise WorkloadFieldException(
"restart policy", policy, _ank_base.RestartPolicy.keys()
)
self._workload.restartPolicy = _ank_base.RestartPolicy.Value(policy)
self._add_mask(f"{self._main_mask}.restartPolicy")
[docs]
def get_dependencies(self) -> dict:
"""
Return the dependencies of the workload.
Returns:
dict: A dictionary of dependencies with workload names \
as keys and conditions as values.
"""
deps = dict(self._workload.dependencies.dependencies)
for dep in deps:
deps[dep] = _ank_base.AddCondition.Name(deps[dep])
return deps
[docs]
def update_dependencies(self, dependencies: dict[str, str]) -> None:
"""
Update the dependencies of the workload.
Supported conditions: `ADD_COND_RUNNING`, `ADD_COND_SUCCEEDED`,
`ADD_COND_FAILED`.
Args:
dependencies (dict): A dictionary of dependencies with
workload names and condition as values.
Raises:
WorkloadFieldException: If an invalid condition is provided.
"""
self._workload.dependencies.dependencies.clear()
for workload_name, condition in dependencies.items():
if condition not in _ank_base.AddCondition.keys():
self.logger.error(
"Invalid dependency condition provided.")
raise WorkloadFieldException(
"dependency condition", condition,
_ank_base.AddCondition.keys()
)
self._workload.dependencies.dependencies[workload_name] = \
_ank_base.AddCondition.Value(condition)
self._add_mask(f"{self._main_mask}.dependencies")
[docs]
def add_tag(self, key: str, value: str) -> None:
"""
Add a tag to the workload.
Args:
key (str): The key of the tag.
value (str): The value of the tag.
"""
tag = _ank_base.Tag(key=key, value=value)
self._workload.tags.tags.append(tag)
if f"{self._main_mask}.tags" not in self.masks:
self._add_mask(f"{self._main_mask}.tags.{key}")
def _generate_access_right_rule(self,
operation: str,
filter_masks: list[str]
) -> _ank_base.AccessRightsRule:
"""
Generate an access rights rule for the workload.
Args:
operation (str): The operation the rule allows.
filter_masks (list): The list of filter masks.
Returns:
_ank_base.AccessRightsRule: The access rights rule generated.
Raises:
WorkloadFieldException: If an invalid operation is provided.
"""
enum_mapper = {
"Nothing": _ank_base.ReadWriteEnum.RW_NOTHING,
"Write": _ank_base.ReadWriteEnum.RW_WRITE,
"Read": _ank_base.ReadWriteEnum.RW_READ,
"ReadWrite": _ank_base.ReadWriteEnum.RW_READ_WRITE,
}
if operation not in enum_mapper:
self.logger.error(
"Invalid rule operation provided.")
raise WorkloadFieldException(
"rule operation", operation, enum_mapper.keys()
)
return _ank_base.AccessRightsRule(
stateRule=_ank_base.StateRule(
operation=enum_mapper[operation],
filterMasks=filter_masks
)
)
def _access_right_rule_to_str(self, rule: _ank_base.AccessRightsRule
) -> tuple[str, list[str]]:
"""
Convert an access rights rule to a tuple.
Args:
rule (_ank_base.AccessRightsRule): The access
rights rule to convert.
Returns:
tuple: A tuple containing operation and filter masks.
"""
enum_mapper = {
_ank_base.ReadWriteEnum.RW_NOTHING: "Nothing",
_ank_base.ReadWriteEnum.RW_WRITE: "Write",
_ank_base.ReadWriteEnum.RW_READ: "Read",
_ank_base.ReadWriteEnum.RW_READ_WRITE: "ReadWrite",
}
return (
enum_mapper[rule.stateRule.operation],
rule.stateRule.filterMasks
)
[docs]
def get_allow_rules(self) -> list[tuple[str, list[str]]]:
"""
Return the allow rules of the workload.
Returns:
list: A list of tuples containing operation and filter masks.
"""
rules = []
for rule in self._workload.controlInterfaceAccess.allowRules:
rules.append(self._access_right_rule_to_str(rule))
return rules
[docs]
def update_allow_rules(self, rules: list[tuple[str, list[str]]]) -> None:
"""
Update the allow rules of the workload.
Supported values: `Nothing`, `Write`, `Read`, `ReadWrite`.
Args:
rules (list): A list of tuples containing
operation and filter masks.
Raises:
WorkloadFieldException: If an invalid operation is provided
"""
del self._workload.controlInterfaceAccess.allowRules[:]
for operation, filter_masks in rules:
self._workload.controlInterfaceAccess.allowRules.append(
self._generate_access_right_rule(operation, filter_masks)
)
self._add_mask(f"{self._main_mask}.controlInterfaceAccess.allowRules")
[docs]
def get_deny_rules(self) -> list[tuple[str, list[str]]]:
"""
Return the deny rules of the workload.
Returns:
list: A list of tuples containing operation and filter masks.
"""
rules = []
for rule in self._workload.controlInterfaceAccess.denyRules:
rules.append(self._access_right_rule_to_str(rule))
return rules
[docs]
def update_deny_rules(self, rules: list[tuple[str, list[str]]]) -> None:
"""
Update the deny rules of the workload.
Supported values: `Nothing`, `Write`, `Read`, `ReadWrite`.
Args:
rules (list): A list of tuples containing
operation and filter masks.
Raises:
WorkloadFieldException: If an invalid operation is provided
"""
del self._workload.controlInterfaceAccess.denyRules[:]
for operation, filter_masks in rules:
self._workload.controlInterfaceAccess.denyRules.append(
self._generate_access_right_rule(operation, filter_masks)
)
self._add_mask(f"{self._main_mask}.controlInterfaceAccess.denyRules")
[docs]
def add_config(self, alias: str, name: str) -> None:
"""
Link a configuration to the workload.
Args:
alias (str): The alias of the configuration.
name (str): The name of the configuration.
"""
self._workload.configs.configs[alias] = name
self._add_mask(f"{self._main_mask}.configs")
[docs]
def get_configs(self) -> dict[str, str]:
"""
Return the configurations linked to the workload.
Returns:
dict[str, str]: A dict containing the alias as key and name of the
configuration as value.
"""
config_mappings = {}
for alias, name in self._workload.configs.configs.items():
config_mappings[alias] = name
return config_mappings
[docs]
def update_configs(self, configs: dict[str, str]) -> None:
"""
Update the configurations linked to the workload.
Args:
configs (dict[str, str]): A tuple containing the alias and
name of the configurations.
"""
self._workload.configs.configs.clear()
for alias, name in configs.items():
self.add_config(alias, name)
def _add_mask(self, mask: str) -> None:
"""
Add a mask to the list of masks.
Args:
mask (str): The mask to add.
"""
if self._main_mask not in self.masks and mask not in self.masks:
self.masks.append(mask)
[docs]
def to_dict(self) -> dict:
"""
Convert the Workload object to a dictionary.
Returns:
dict: The dictionary representation of the Workload object.
"""
workload_dict = {}
if self._workload.agent:
workload_dict["agent"] = self._workload.agent
if self._workload.runtime:
workload_dict["runtime"] = self._workload.runtime
if self._workload.runtimeConfig:
workload_dict["runtimeConfig"] = self._workload.runtimeConfig
workload_dict["restartPolicy"] = _ank_base.RestartPolicy.Name(
self._workload.restartPolicy
)
workload_dict["dependencies"] = {}
if self._workload.dependencies:
for dep_key, dep_value in \
self._workload.dependencies.dependencies.items():
workload_dict["dependencies"][dep_key] = \
_ank_base.AddCondition.Name(dep_value)
workload_dict["tags"] = []
if self._workload.tags:
for tag in self._workload.tags.tags:
workload_dict["tags"].append(
{"key": tag.key, "value": tag.value}
)
workload_dict["controlInterfaceAccess"] = {}
if self._workload.controlInterfaceAccess:
workload_dict["controlInterfaceAccess"]["allowRules"] = []
for rule in self._workload.controlInterfaceAccess.allowRules:
operation, filter_masks = self._access_right_rule_to_str(rule)
workload_dict["controlInterfaceAccess"]["allowRules"].append({
"type": "StateRule",
"operation": operation,
"filterMask": [str(mask) for mask in filter_masks]}
)
workload_dict["controlInterfaceAccess"]["denyRules"] = []
for rule in self._workload.controlInterfaceAccess.denyRules:
operation, filter_masks = self._access_right_rule_to_str(rule)
workload_dict["controlInterfaceAccess"]["denyRules"].append({
"type": "StateRule",
"operation": operation,
"filterMask": [str(mask) for mask in filter_masks]}
)
workload_dict["configs"] = {}
for alias, name in self._workload.configs.configs.items():
workload_dict["configs"][alias] = name
return workload_dict
# pylint: disable=too-many-branches
@staticmethod
def _from_dict(workload_name: str, dict_workload: dict) -> "Workload":
"""
Convert a dictionary to a Workload object.
Args:
workload_name (str): The name of the workload.
dict_workload (dict): The dictionary to convert.
Returns:
Workload: The Workload object created from the dictionary.
"""
workload = Workload.builder().workload_name(workload_name)
if "agent" in dict_workload:
workload = workload.agent_name(dict_workload["agent"])
if "runtime" in dict_workload:
workload = workload.runtime(dict_workload["runtime"])
if "runtimeConfig" in dict_workload:
workload = workload.runtime_config(dict_workload["runtimeConfig"])
if "restartPolicy" in dict_workload:
workload = workload.restart_policy(dict_workload["restartPolicy"])
if "dependencies" in dict_workload:
for dep_key, dep_value in dict_workload["dependencies"].items():
workload = workload.add_dependency(dep_key, dep_value)
if "tags" in dict_workload:
for tag in dict_workload["tags"]:
workload = workload.add_tag(tag["key"], tag["value"])
if "controlInterfaceAccess" in dict_workload:
if "allowRules" in dict_workload["controlInterfaceAccess"]:
for rule in dict_workload[
"controlInterfaceAccess"][
"allowRules"
]:
workload = workload.add_allow_rule(
rule["operation"], rule["filterMask"]
)
if "denyRules" in dict_workload["controlInterfaceAccess"]:
for rule in dict_workload[
"controlInterfaceAccess"][
"denyRules"
]:
workload = workload.add_deny_rule(
rule["operation"], rule["filterMask"]
)
if "configs" in dict_workload:
for alias, name in dict_workload["configs"].items():
workload = workload.add_config(alias, name)
return workload.build()
def _to_proto(self) -> _ank_base.Workload:
"""
Convert the Workload object to a proto message.
Returns:
_ank_base.Workload: The proto message representation
of the Workload object.
"""
return self._workload
def _from_proto(self, proto: _ank_base.Workload) -> None:
"""
Convert the proto message to a Workload object.
Args:
proto (_ank_base.Workload): The proto message to convert.
"""
self._workload = proto
self.masks = []
# pylint: disable=too-many-instance-attributes
[docs]
class WorkloadBuilder:
"""
A builder class to create a Workload object.
Attributes:
wl_name (str): The workload name.
wl_agent_name (str): The agent name.
wl_runtime (str): The runtime.
wl_runtime_config (str): The runtime configuration.
wl_restart_policy (str): The restart policy.
dependencies (dict): The dependencies.
tags (list): The tags.
"""
[docs]
def __init__(self) -> None:
"""
Initialize a WorkloadBuilder object.
"""
self.wl_name = None
self.wl_agent_name = None
self.wl_runtime = None
self.wl_runtime_config = None
self.wl_restart_policy = None
self.dependencies = {}
self.tags = []
self.allow_rules = []
self.deny_rules = []
self.configs = {}
[docs]
def workload_name(self, workload_name: str) -> "WorkloadBuilder":
"""
Set the workload name.
Args:
workload_name (str): The workload name to set.
Returns:
WorkloadBuilder: The builder object.
"""
self.wl_name = workload_name
return self
[docs]
def agent_name(self, agent_name: str) -> "WorkloadBuilder":
"""
Set the agent name.
Args:
agent_name (str): The agent name to set.
Returns:
WorkloadBuilder: The builder object.
"""
self.wl_agent_name = agent_name
return self
[docs]
def runtime(self, runtime: str) -> "WorkloadBuilder":
"""
Set the runtime.
Args:
runtime (str): The runtime to set.
Returns:
WorkloadBuilder: The builder object.
"""
self.wl_runtime = runtime
return self
[docs]
def runtime_config(self, runtime_config: str) -> "WorkloadBuilder":
"""
Set the runtime configuration.
Args:
runtime_config (str): The runtime configuration to set.
Returns:
WorkloadBuilder: The builder object.
"""
self.wl_runtime_config = runtime_config
return self
[docs]
def runtime_config_from_file(
self, runtime_config_path: str
) -> "WorkloadBuilder":
"""
Set the runtime configuration using a file.
Args:
runtime_config_path (str): The path to the configuration file.
Returns:
WorkloadBuilder: The builder object.
"""
with open(runtime_config_path, "r", encoding="utf-8") as file:
self.wl_runtime_config = file.read()
return self
[docs]
def restart_policy(self, restart_policy: str) -> "WorkloadBuilder":
"""
Set the restart policy.
Args:
restart_policy (str): The restart policy to set.
Returns:
WorkloadBuilder: The builder object.
"""
self.wl_restart_policy = restart_policy
return self
[docs]
def add_dependency(
self, workload_name: str, condition: str
) -> "WorkloadBuilder":
"""
Add a dependency.
Args:
workload_name (str): The name of the dependent workload.
condition (str): The condition for the dependency.
Returns:
WorkloadBuilder: The builder object.
"""
self.dependencies[workload_name] = condition
return self
[docs]
def add_tag(self, key: str, value: str) -> "WorkloadBuilder":
"""
Add a tag.
Args:
key (str): The key of the tag.
value (str): The value of the tag.
Returns:
WorkloadBuilder: The builder object.
"""
self.tags.append((key, value))
return self
[docs]
def add_allow_rule(
self, operation: str, filter_masks: list[str]
) -> "WorkloadBuilder":
"""
Add an allow rule to the workload.
Args:
operation (str): The operation the rule allows.
filter_masks (list): The list of filter masks.
Returns:
WorkloadBuilder: The builder object.
"""
self.allow_rules.append((operation, filter_masks))
return self
[docs]
def add_deny_rule(
self, operation: str, filter_masks: list[str]
) -> "WorkloadBuilder":
"""
Add a deny rule to the workload.
Args:
operation (str): The operation the rule denies.
filter_masks (list): The list of filter masks.
Returns:
WorkloadBuilder: The builder object.
"""
self.deny_rules.append((operation, filter_masks))
return self
[docs]
def add_config(self, alias: str, name: str) -> "WorkloadBuilder":
"""
Link a configuration to the workload.
Args:
alias (str): The alias of the configuration.
name (str): The name of the configuration.
"""
self.configs[alias] = name
return self
[docs]
def build(self) -> Workload:
"""
Build the Workload object.
Required fields: workload name, agent name, runtime and
runtime configuration.
Returns:
Workload: The built Workload object.
Raises:
WorkloadBuilderException: If required fields are not set.
"""
if self.wl_name is None:
raise WorkloadBuilderException(
"Workload can not be built without a name.")
workload = Workload(self.wl_name)
if self.wl_agent_name is None:
raise WorkloadBuilderException(
"Workload can not be built without an agent name.")
if self.wl_runtime is None:
raise WorkloadBuilderException(
"Workload can not be built without a runtime.")
if self.wl_runtime_config is None:
raise WorkloadBuilderException(
"Workload can not be built without a runtime configuration.")
workload.update_agent_name(self.wl_agent_name)
workload.update_runtime(self.wl_runtime)
workload.update_runtime_config(self.wl_runtime_config)
if self.wl_restart_policy is not None:
workload.update_restart_policy(self.wl_restart_policy)
if len(self.dependencies) > 0:
workload.update_dependencies(self.dependencies)
if len(self.tags) > 0:
workload.update_tags(self.tags)
if len(self.allow_rules) > 0:
workload.update_allow_rules(self.allow_rules)
if len(self.deny_rules) > 0:
workload.update_deny_rules(self.deny_rules)
if len(self.configs) > 0:
workload.update_configs(self.configs)
return workload