Source code for ankaios_sdk._components.workload_state

# Copyright (c) 2024 Elektrobit Automotive GmbH
#
# This program and the accompanying materials are made available under the
# terms of the Apache License, Version 2.0 which is available at
# https://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# SPDX-License-Identifier: Apache-2.0

"""
This module defines various classes and enumerations related to the state
of workloads. It provides functionality to interpret and manage the states
and sub-states of workloads, including converting between different
representations and handling collections of workload states.

Classes
-------

- WorkloadExecutionState
    Represents the execution state and sub-state of a workload.
- WorkloadInstanceName:
    Represents the name of a workload instance.
- WorkloadState:
    Represents the state of a workload (execution state and name).
- WorkloadStateCollection:
    A collection of workload states.

Enums
-----

- WorkloadStateEnum:
    Enumeration for different states of a workload.
- WorkloadSubStateEnum:
    Enumeration for different sub-states of a workload.

Usage
-----

- Get all workload states:
    .. code-block:: python

        workload_state_collection = WorkloadStateCollection()
        list_of_workload_states = workload_state_collection.get_as_list()
        dict_of_workload_states = workload_state_collection.get_as_dict()

- Unpack a workload state:
    .. code-block:: python

        workload_state = WorkloadState()
        agent_name = workload_state.workload_instance_name.agent_name
        workload_name = workload_state.workload_instance_name.workload_name
        state = workload_state.execution_state.state
        substate = workload_state.execution_state.substate
        info = workload_state.execution_state.additional_info

- Get the workload instance name as a dictionary:
    .. code-block:: python

        workload_instance_name = WorkloadInstanceName()
        instance_name_dict = workload_instance_name.to_dict()
        json_instance_name = json.dumps(instance_name_dict)
"""

__all__ = ["WorkloadStateCollection", "WorkloadState",
           "WorkloadInstanceName", "WorkloadExecutionState",
           "WorkloadStateEnum", "WorkloadSubStateEnum"]

from typing import Optional, Union
from enum import Enum
from .._protos import _ank_base


[docs] class WorkloadStateEnum(Enum): """ Enumeration for different states of a workload. """ AGENT_DISCONNECTED: int = 0 "(int): The agent is disconnected." PENDING: int = 1 "(int): The workload is pending." RUNNING: int = 2 "(int): The workload is running." STOPPING: int = 3 "(int): The workload is stopping." SUCCEEDED: int = 4 "(int): The workload has succeeded." FAILED: int = 5 "(int): The workload has failed." NOT_SCHEDULED: int = 6 "(int): The workload is not scheduled." REMOVED: int = 7 "(int): The workload has been removed."
[docs] def __str__(self) -> str: """ Return the name of the enumeration member. Returns: str: The name of the enumeration member. """ return self.name
@staticmethod def _get(field: str) -> "WorkloadStateEnum": """ Get the enumeration member corresponding to the given field name. Args: field (str): The field name to look up. Returns: WorkloadStateEnum: The enumeration member corresponding to the field name. Raises: KeyError: If the field name does not correspond to any enumeration member. """ # camelCase to SNAKE_CASE if field == "agentDisconnected": return WorkloadStateEnum.AGENT_DISCONNECTED if field == "notScheduled": return WorkloadStateEnum.NOT_SCHEDULED return WorkloadStateEnum[field.upper()]
[docs] class WorkloadSubStateEnum(Enum): """ Enumeration for different sub-states of a workload. """ AGENT_DISCONNECTED: int = 0 "(int): The agent is disconnected." PENDING_INITIAL: int = 1 "(int): The workload is in the initial pending state." PENDING_WAITING_TO_START: int = 2 "(int): The workload is waiting to start." PENDING_STARTING: int = 3 "(int): The workload is starting." PENDING_STARTING_FAILED: int = 4 "(int): The workload failed to start." RUNNING_OK: int = 5 "(int): The workload is running successfully." STOPPING: int = 6 "(int): The workload is stopping." STOPPING_WAITING_TO_STOP: int = 7 "(int): The workload is waiting to stop." STOPPING_REQUESTED_AT_RUNTIME: int = 8 "(int): The workload stop was requested at runtime." STOPPING_DELETE_FAILED: int = 9 "(int): The workload stop failed to delete." SUCCEEDED_OK: int = 10 "(int): The workload succeeded successfully." FAILED_EXEC_FAILED: int = 11 "(int): The workload failed due to execution failure." FAILED_UNKNOWN: int = 12 "(int): The workload failed due to an unknown reason." FAILED_LOST: int = 13 "(int): The workload failed because it was lost." NOT_SCHEDULED: int = 14 "(int): The workload is not scheduled." REMOVED: int = 15 "(int): The workload has been removed."
[docs] def __str__(self) -> str: """ Return the name of the enumeration member. Returns: str: The name of the enumeration member. """ return self.name
@staticmethod def _get(state: WorkloadStateEnum, field: _ank_base) -> "WorkloadSubStateEnum": """ Get the enumeration member corresponding to the given state and field. Args: state (WorkloadStateEnum): The state of the workload. field (_ank_base): The field to look up. Returns: WorkloadSubStateEnum: The enumeration member corresponding to the state and field. Raises: ValueError: If the field does not correspond to any enumeration member. """ # SNAKE_CASE to CamelCase state_name = "".join([elem.title() for elem in state.name.split("_")]) if field not in getattr(_ank_base, state_name).values(): raise ValueError("No corresponding WorkloadSubStateEnum " + f"value for enum: {field}") return WorkloadSubStateEnum[getattr(_ank_base, state_name).Name(field)] def _sub_state2ank_base(self) -> _ank_base: """ Convert the WorkloadSubStateEnum member to the corresponding _ank_base value. Returns: _ank_base: The corresponding _ank_base value. Raises: ValueError: If there is no corresponding _ank_base value for the enumeration member. """ try: return getattr(_ank_base, self.name) except AttributeError as e: # pragma: no cover raise ValueError("No corresponding ank_base value " + f"for enum: {self.name}") from e
# pylint: disable=too-few-public-methods
[docs] class WorkloadExecutionState: """ Represents the execution state of a workload. Attributes: state (WorkloadStateEnum): The state of the workload. substate (WorkloadSubStateEnum): The sub-state of the workload. additional_info (str): Additional information about the workload state. """
[docs] def __init__(self, state: _ank_base.ExecutionState) -> None: """ Initializes a WorkloadExecutionState instance. Args: state (_ank_base.ExecutionState): The execution state to interpret. """ self.state: WorkloadStateEnum = None self.substate: WorkloadSubStateEnum = None self.additional_info: str = None self._interpret_state(state)
def __str__(self) -> str: """ Returns the string representation of the workload execution state. Returns: str: The string representation of the workload execution state. """ return f"{self.state.name} ({self.substate.name}):" \ + f"{self.additional_info}" def _interpret_state(self, exec_state: _ank_base.ExecutionState) -> None: """ Interprets the execution state and sets the state, substate, and info attributes. Args: exec_state (_ank_base.ExecutionState): The execution state to interpret. Raises: ValueError: If the execution state is invalid. """ self.additional_info = str(exec_state.additionalInfo) field = exec_state.WhichOneof("ExecutionStateEnum") if field is None: raise ValueError("Invalid state for workload.") self.state = WorkloadStateEnum._get(field) self.substate = WorkloadSubStateEnum._get( self.state, getattr(exec_state, field) )
[docs] def to_dict(self) -> dict: """ Returns the execution state as a dictionary. Returns: dict: The execution state as a dictionary. """ return { "state": str(self.state), "substate": str(self.substate), "additional_info": self.additional_info }
# pylint: disable=too-few-public-methods
[docs] class WorkloadInstanceName: """ Represents the name of a workload instance. Attributes: agent_name (str): The name of the agent. workload_name (str): The name of the workload. workload_id (str): The ID of the workload. """
[docs] def __init__(self, agent_name: str, workload_name: str, workload_id: str) -> None: """ Initializes a WorkloadInstanceName instance. Args: agent_name (str): The name of the agent. workload_name (str): The name of the workload. workload_id (str): The ID of the workload. """ self.agent_name = agent_name self.workload_name = workload_name self.workload_id = workload_id
def __eq__(self, other: "WorkloadInstanceName") -> bool: """ Checks if two workload instance names are equal. Args: other (WorkloadInstanceName): The instance name to compare with. Returns: bool: True if the workload instance names are equal, False otherwise. """ if isinstance(other, WorkloadInstanceName): return (self.agent_name == other.agent_name and self.workload_name == other.workload_name and self.workload_id == other.workload_id) return NotImplemented
[docs] def __str__(self) -> str: """ Returns the string representation of the workload instance name. Returns: str: The string representation of the workload instance name. """ return f"{self.workload_name}.{self.workload_id}.{self.agent_name}"
[docs] def to_dict(self) -> dict: """ Returns the workload instance name as a dictionary. Returns: dict: The workload instance name as a dictionary. """ return { "agent_name": self.agent_name, "workload_name": self.workload_name, "workload_id": self.workload_id }
[docs] def get_filter_mask(self) -> str: """ Returns the filter mask for the workload instance name. Returns: str: The filter mask for the workload instance name. """ return f"workloadStates.{self.agent_name}." \ + f"{self.workload_name}.{self.workload_id}"
# pylint: disable=too-few-public-methods
[docs] class WorkloadState: """ Represents the state of a workload. Attributes: execution_state (WorkloadExecutionState): The execution state of the workload. workload_instance_name (WorkloadInstanceName): The name of the workload instance. """
[docs] def __init__(self, agent_name: str, workload_name: str, workload_id: str, state: Union[WorkloadExecutionState, _ank_base.ExecutionState] ) -> None: """ Initializes a WorkloadState instance. Args: agent_name (str): The name of the agent. workload_name (str): The name of the workload. workload_id (str): The ID of the workload. state (WorkloadExecutionState): The execution state. """ if isinstance(state, _ank_base.ExecutionState): self.execution_state = WorkloadExecutionState(state) else: self.execution_state = state self.workload_instance_name = WorkloadInstanceName( agent_name, workload_name, workload_id )
def __str__(self) -> str: """ Returns the string representation of the workload state. Returns: str: The string representation of the workload state. """ return f"{self.workload_instance_name}: {self.execution_state}"
[docs] class WorkloadStateCollection: """ A class that represents a collection of workload states and provides methods to manipulate them. """ ExecutionsStatesForId = dict[str, WorkloadExecutionState] ExecutionsStatesOfWorkload = dict[str, ExecutionsStatesForId] WorkloadStatesMap = dict[str, ExecutionsStatesOfWorkload]
[docs] def __init__(self) -> None: """ Initializes a WorkloadStateCollection instance. """ self._workload_states: dict = {}
[docs] def add_workload_state(self, state: WorkloadState) -> None: """ Adds a workload state to the collection. Args: state (WorkloadState): The workload state to add. """ agent_name = state.workload_instance_name.agent_name workload_name = state.workload_instance_name.workload_name workload_id = state.workload_instance_name.workload_id if agent_name not in self._workload_states: self._workload_states[agent_name] = \ self.ExecutionsStatesOfWorkload() if workload_name not in self._workload_states[agent_name]: self._workload_states[agent_name][workload_name] = \ self.ExecutionsStatesForId() self._workload_states[agent_name][workload_name][workload_id] = \ state.execution_state
[docs] def get_as_dict(self) -> WorkloadStatesMap: """ Returns the workload states as a dict. Returns: WorkloadStatesMap: A dict of workload states. """ return self._workload_states
[docs] def get_as_list(self) -> list[WorkloadState]: """ Returns the workload states as a list. Returns: list[WorkloadState]: A list of workload states. """ workload_states = [] for agent_name, workloads in self._workload_states.items(): for workload_name, workload_ids in workloads.items(): for workload_id, exec_state in workload_ids.items(): workload_states.append(WorkloadState( agent_name, workload_name, workload_id, exec_state )) return workload_states
[docs] def get_for_instance_name(self, instance_name: WorkloadInstanceName ) -> Optional[WorkloadState]: """ Returns the workload state for the given workload instance name. Args: instance_name (WorkloadInstanceName): The workload instance name to look up. Returns: WorkloadState: The workload state for the given instance name. None: If no workload state was found. """ try: return WorkloadState( instance_name.agent_name, instance_name.workload_name, instance_name.workload_id, self._workload_states[instance_name.agent_name] [instance_name.workload_name][instance_name.workload_id] ) except KeyError: return None
def _from_proto(self, state: _ank_base.WorkloadStatesMap) -> None: """ Populates the collection from a proto message. Args: state (_ank_base.WorkloadStatesMap): The proto message to interpret. """ for agent_name in state.agentStateMap: for workload_name in state.agentStateMap[agent_name].\ wlNameStateMap: for workload_id in state.agentStateMap[agent_name].\ wlNameStateMap[workload_name].idStateMap: self.add_workload_state(WorkloadState( agent_name, workload_name, workload_id, state.agentStateMap[agent_name] .wlNameStateMap[workload_name] .idStateMap[workload_id] ))