# Copyright (c) 2024 Elektrobit Automotive GmbH
#
# This program and the accompanying materials are made available under the
# terms of the Apache License, Version 2.0 which is available at
# https://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# SPDX-License-Identifier: Apache-2.0
"""
This script defines the CompleteState class for managing
the state of the Ankaios cluster.
Classes
-------
- CompleteState:
Represents the complete state of the Ankaios cluster.
Usage
-----
- Create a CompleteState instance:
.. code-block:: python
complete_state = CompleteState()
- Get the API version of the complete state:
.. code-block:: python
api_version = complete_state.get_api_version()
- Add a workload to the complete state:
.. code-block:: python
complete_state.add_workload(workload)
- Get a workload from the complete state:
.. code-block:: python
workload = complete_state.get_workload("nginx")
- Get the entire list of workloads from the complete state:
.. code-block:: python
workloads = complete_state.get_workloads()
- Get the connected agents:
.. code-block:: python
agents = complete_state.get_agents()
- Get the workload states:
.. code-block:: python
workload_states = complete_state.get_workload_states()
- Create a CompleteState instance from a Manifest:
.. code-block:: python
complete_state = CompleteState.from_manifest(manifest)
"""
__all__ = ["CompleteState"]
from typing import Union
from .._protos import _ank_base
from .workload import Workload
from .workload_state import WorkloadStateCollection
from .manifest import Manifest
from ..utils import SUPPORTED_API_VERSION
[docs]
class CompleteState:
"""
A class to represent the complete state.
"""
[docs]
def __init__(self) -> None:
"""
Initializes an empty CompleteState instance with the given API version.
"""
self._complete_state = _ank_base.CompleteState()
self._set_api_version(SUPPORTED_API_VERSION)
self._workloads: list[Workload] = []
self._workload_state_collection = WorkloadStateCollection()
self._configs = {}
[docs]
def __str__(self) -> str:
"""
Returns the string representation of the complete state.
Returns:
str: The string representation of the complete state.
"""
return str(self._to_proto())
def _set_api_version(self, version: str) -> None:
"""
Sets the API version for the complete state.
Args:
version (str): The API version to set.
"""
self._complete_state.desiredState.apiVersion = version
[docs]
def get_api_version(self) -> str:
"""
Gets the API version of the complete state.
Returns:
str: The API version of the complete state.
"""
return str(self._complete_state.desiredState.apiVersion)
[docs]
def add_workload(self, workload: Workload) -> None:
"""
Adds a workload to the complete state.
Args:
workload (Workload): The workload to add.
"""
self._workloads.append(workload)
self._complete_state.desiredState.workloads.\
workloads[workload.name].CopyFrom(workload._to_proto())
[docs]
def get_workload(self, workload_name: str) -> Workload:
"""
Gets a workload from the complete state by its name.
Args:
workload_name (str): The name of the workload to retrieve.
Returns:
Workload: The workload with the specified name,
or None if not found.
"""
for wl in self._workloads:
if wl.name == workload_name:
return wl
return None
[docs]
def get_workloads(self) -> list[Workload]:
"""
Gets a list of workloads from the complete state.
Returns:
list[Workload]: A list of workloads in the complete state.
"""
return self._workloads
[docs]
def get_workload_states(self) -> WorkloadStateCollection:
"""
Gets the workload states.
Returns:
WorkloadStateCollection: The collection of workload states.
"""
return self._workload_state_collection
[docs]
def get_agents(self) -> dict[str, dict]:
"""
Gets the connected agents and their attributes.
Returns:
dict[str, dict]: A dict with the agents and their attributes.
"""
agents = {}
for name, attributes in self._complete_state.agents.agents.items():
agents[name] = {
"cpu_usage": int(attributes.cpu_usage.cpu_usage),
"free_memory": attributes.free_memory.free_memory,
}
return agents
[docs]
def set_configs(self, configs: dict) -> None:
"""
Sets the configurations in the complete state.
Args:
configs (dict): The configurations to set in the complete state.
"""
def _to_config_item(item: Union[str, list, dict]
) -> _ank_base.ConfigItem:
config_item = _ank_base.ConfigItem()
if isinstance(item, str):
config_item.String = item
elif isinstance(item, list):
for value in [_to_config_item(value) for value in item]:
config_item.array.values.append(value)
elif isinstance(item, dict):
for key, value in item.items():
config_item.object.fields[key]. \
CopyFrom(_to_config_item(value))
return config_item
self._configs = configs
self._complete_state.desiredState.configs.configs.clear()
for key, value in self._configs.items():
self._complete_state.desiredState.configs.configs[key].CopyFrom(
_to_config_item(value)
)
[docs]
def get_configs(self) -> dict:
"""
Gets the configurations from the complete state.
Returns:
dict: The configurations from the complete state
"""
return self._configs
[docs]
@staticmethod
def from_manifest(manifest: Manifest) -> 'CompleteState':
"""
Creates a CompleteState instance from a Manifest.
Args:
manifest (Manifest): The manifest to create the
complete state from.
"""
state = CompleteState()
state._complete_state = _ank_base.CompleteState()
dict_state = manifest._manifest
state._set_api_version(
dict_state.get("apiVersion", state.get_api_version())
)
state._workloads = []
if dict_state.get("workloads") is not None:
for workload_name, workload_dict in \
dict_state.get("workloads").items():
state.add_workload(
Workload._from_dict(workload_name, workload_dict)
)
if dict_state.get("configs") is not None:
state.set_configs(dict_state.get("configs"))
return state
[docs]
def to_dict(self) -> dict:
"""
Returns the CompleteState as a dictionary
Returns:
dict: The CompleteState as a dictionary.
"""
data = {
"desired_state": {
"api_version": self.get_api_version(),
"workloads": {},
"configs": self._configs
},
"workload_states": {},
"agents": {}
}
for wl in self._workloads:
data["desired_state"]["workloads"][wl.name] = \
wl.to_dict()
wl_states = self._workload_state_collection.get_as_dict()
for agent_name, exec_states in wl_states.items():
data["workload_states"][agent_name] = {}
for workload_name, exec_states_id in exec_states.items():
data["workload_states"][agent_name][workload_name] = {}
for workload_id, exec_state in exec_states_id.items():
data["workload_states"][agent_name][workload_name][
workload_id] = exec_state.to_dict()
data["agents"] = self.get_agents()
return data
def _to_proto(self) -> _ank_base.CompleteState:
"""
Returns the CompleteState as a proto message.
Returns:
_ank_base.CompleteState: The protobuf message representing
the complete state.
"""
return self._complete_state
def _from_proto(self, proto: _ank_base.CompleteState) -> None:
"""
Converts the proto message to a CompleteState object.
Args:
proto (_ank_base.CompleteState): The protobuf message representing
the complete state.
"""
def _from_config_item(item: _ank_base.ConfigItem
) -> Union[str, list, dict]:
if item.HasField("String"):
return item.String
if item.HasField("array"):
return [_from_config_item(value)
for value in item.array.values]
if item.HasField("object"):
return {key: _from_config_item(value)
for key, value in item.object.fields.items()}
return None # pragma: no cover
self._complete_state = proto
self._workloads = []
for workload_name, proto_workload in self._complete_state. \
desiredState.workloads.workloads.items():
workload = Workload(workload_name)
workload._from_proto(proto_workload)
self._workloads.append(workload)
self._workload_state_collection._from_proto(
self._complete_state.workloadStates
)
configs = {}
for key, value in self._complete_state.desiredState. \
configs.configs.items():
configs[key] = _from_config_item(value)
self.set_configs(configs)