# Copyright (c) 2024 Elektrobit Automotive GmbH
#
# This program and the accompanying materials are made available under the
# terms of the Apache License, Version 2.0 which is available at
# https://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# SPDX-License-Identifier: Apache-2.0
"""
This script defines the CompleteState class for managing
the state of the Ankaios cluster.
Classes
-------
- CompleteState:
Represents the complete state of the Ankaios cluster.
Usage
-----
- Create a CompleteState instance:
.. code-block:: python
complete_state = CompleteState()
- Create a CompleteState instance from a Manifest:
.. code-block:: python
manifest = Manifest()
complete_state = CompleteState(manifest=manifest)
- Get the API version of the complete state:
.. code-block:: python
api_version = complete_state.get_api_version()
- Get a workload from the complete state:
.. code-block:: python
workload = complete_state.get_workload("nginx")
- Get the entire list of workloads from the complete state:
.. code-block:: python
workloads = complete_state.get_workloads()
- Get the connected agents:
.. code-block:: python
agents = complete_state.get_agents()
- Get the workload states:
.. code-block:: python
workload_states = complete_state.get_workload_states()
"""
__all__ = ["CompleteState"]
from typing import Union
from .._protos import _ank_base
from .workload import Workload
from .workload_state import WorkloadStateCollection
from .manifest import Manifest
from ..utils import SUPPORTED_API_VERSION, _to_config_item, get_logger
logger = get_logger()
[docs]
class CompleteState:
"""
A class to represent the complete state.
"""
[docs]
def __init__(
self,
manifest: Manifest = None,
configs: dict = None,
workloads: list[Workload] = None,
_proto: _ank_base.CompleteState = None,
) -> None:
"""
Initializes a CompleteState instance with the provided data.
Args:
manifest (Manifest): The manifest to initialize the complete state.
configs (dict): The configurations to set in the complete state.
workloads (list[Workload]): The workloads to set
in the complete state.
_proto (_ank_base.CompleteState): The proto message to initialize
the complete state.
"""
self._complete_state = _ank_base.CompleteState()
self._set_api_version(SUPPORTED_API_VERSION)
if _proto:
self._complete_state = _proto
logger.debug("CompleteState initialized from proto message")
return
if manifest:
self._complete_state.desiredState.CopyFrom(
manifest._to_desired_state()
)
logger.debug("CompleteState initialized from manifest")
return
if configs:
self.set_configs(configs)
logger.debug("CompleteState initialized from configs")
return
if workloads:
self._complete_state.desiredState.workloads.workloads.clear()
for workload in workloads:
self._complete_state.desiredState.workloads.workloads[
workload.name
].CopyFrom(workload._to_proto())
logger.debug("CompleteState initialized from workloads")
return
[docs]
def __str__(self) -> str:
"""
Returns the string representation of the complete state.
Returns:
str: The string representation of the complete state.
"""
return str(self._to_proto())
def _set_api_version(self, version: str) -> None:
"""
Sets the API version for the complete state.
Args:
version (str): The API version to set.
"""
self._complete_state.desiredState.apiVersion = version
[docs]
def get_api_version(self) -> str:
"""
Gets the API version of the complete state.
Returns:
str: The API version of the complete state.
"""
return str(self._complete_state.desiredState.apiVersion)
[docs]
def get_workload(self, workload_name: str) -> Workload:
"""
Gets a workload from the complete state by its name.
Args:
workload_name (str): The name of the workload to retrieve.
Returns:
Workload: The workload with the specified name,
or None if not found.
"""
if (
workload_name
in self._complete_state.desiredState.workloads.workloads.keys()
):
proto_workload = (
self._complete_state.desiredState.workloads.workloads[
workload_name
]
)
workload = Workload(workload_name)
workload._from_proto(proto_workload)
return workload
return None
[docs]
def get_workloads(self) -> list[Workload]:
"""
Gets a list of workloads from the complete state.
Returns:
list[Workload]: A list of workloads in the complete state.
"""
workloads = []
for (
wl_name,
proto_workload,
) in self._complete_state.desiredState.workloads.workloads.items():
workload = Workload(wl_name)
workload._from_proto(proto_workload)
workloads.append(workload)
return workloads
[docs]
def get_workload_states(self) -> WorkloadStateCollection:
"""
Gets the workload states.
Returns:
WorkloadStateCollection: The collection of workload states.
"""
workload_state_collection = WorkloadStateCollection()
workload_state_collection._from_proto(
self._complete_state.workloadStates
)
return workload_state_collection
[docs]
def get_agents(self) -> dict[str, dict]:
"""
Gets the connected agents and their attributes.
Returns:
dict[str, dict]: A dict with the agents and their attributes.
"""
agents = {}
for name, attributes in self._complete_state.agents.agents.items():
agents[name] = {
"cpu_usage": int(attributes.cpu_usage.cpu_usage),
"free_memory": attributes.free_memory.free_memory,
}
return agents
[docs]
def set_configs(self, configs: dict) -> None:
"""
Sets the configurations in the complete state.
Args:
configs (dict): The configurations to set in the complete state.
"""
self._complete_state.desiredState.configs.configs.clear()
for key, value in configs.items():
self._complete_state.desiredState.configs.configs[key].CopyFrom(
_to_config_item(value)
)
[docs]
def get_configs(self) -> dict:
"""
Gets the configurations from the complete state.
Returns:
dict: The configurations from the complete state
"""
def _from_config_item(
item: _ank_base.ConfigItem,
) -> Union[str, list, dict]:
if item.HasField("String"):
return item.String
if item.HasField("array"):
return [
_from_config_item(value) for value in item.array.values
]
if item.HasField("object"):
return {
key: _from_config_item(value)
for key, value in item.object.fields.items()
}
return None # pragma: no cover
configs = {}
for (
key,
value,
) in self._complete_state.desiredState.configs.configs.items():
configs[key] = _from_config_item(value)
return configs
[docs]
def to_dict(self) -> dict:
"""
Returns the CompleteState as a dictionary
Returns:
dict: The CompleteState as a dictionary.
"""
data = {
"desired_state": {
"api_version": self.get_api_version(),
"workloads": {},
"configs": self.get_configs(),
},
"workload_states": {},
"agents": {},
}
for workload in self.get_workloads():
data["desired_state"]["workloads"][
workload.name
] = workload.to_dict()
wl_states = self.get_workload_states().get_as_dict()
for agent_name, exec_states in wl_states.items():
data["workload_states"][agent_name] = {}
for workload_name, exec_states_id in exec_states.items():
data["workload_states"][agent_name][workload_name] = {}
for workload_id, exec_state in exec_states_id.items():
data["workload_states"][agent_name][workload_name][
workload_id
] = exec_state.to_dict()
data["agents"] = self.get_agents()
return data
def _to_proto(self) -> _ank_base.CompleteState:
"""
Returns the CompleteState as a proto message.
Returns:
_ank_base.CompleteState: The protobuf message representing
the complete state.
"""
return self._complete_state