Source code for ankaios_sdk._components.response

# Copyright (c) 2024 Elektrobit Automotive GmbH
#
# This program and the accompanying materials are made available under the
# terms of the Apache License, Version 2.0 which is available at
# https://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# SPDX-License-Identifier: Apache-2.0

"""
This script defines the Response class and its associated types for handling
responses from the control interface. It includes methods for parsing the
received messages and converting them into appropriate Python objects.

Classes
--------

- :class:`Response`:
    Represents a response from the control interface.
- :class:`UpdateStateSuccess`:
    Represents a response for a successful update state request.
- :class:`LogEntry`:
    Represents a log entry from a workload instance.
- :class:`LogsStopResponse`:
    Represents a response for marking the end of the log stream from a
    workload instance.
- :class:`EventEntry`:
    Represents an event.

Enums
-----

- :class:`ResponseType`:
    Enumeration for the different types of response. It includes responses
    like ERROR, CONNECTION_CLOSED, COMPLETE_STATE and so on.

Union Types
-----------

- :class:`LogResponse`:
    Union type for log responses, which can be either :class:`LogEntry` or
    :class:`LogsStopResponse`.

Usage
-----

- Get response content:
    .. code-block:: python

        response = Response()
        (content_type, content) = response.get_content()

- Check if the request_id matches:
    .. code-block:: python

        response = Response()
        if response.check_request_id("1234"):
            print("Request ID matches")

- Convert the update state success to a dictionary:
    .. code-block:: python

        update_state_success = UpdateStateSuccess()
        update_state_success.to_dict()
"""

__all__ = [
    "Response",
    "ResponseType",
    "UpdateStateSuccess",
    "LogEntry",
    "LogsStopResponse",
    "LogResponse",
    "EventEntry",
]

from dataclasses import dataclass
from typing import Any, Union
from enum import Enum
from .._protos import _ank_base, _control_api
from ..exceptions import ResponseException
from ..utils import get_logger
from .complete_state import CompleteState
from .workload_state import WorkloadInstanceName


logger = get_logger()


[docs] class Response: """ Represents a response received from the Ankaios system. :var bytes buffer: The received message buffer. :var ResponseType content_type: The type of the response content. :var content: The content of the response, which can store any response type. """
[docs] def __init__(self, message_buffer: bytes) -> None: """ Initializes the Response object with the received message buffer. :param message_buffer: The received message buffer. :type message_buffer: bytes """ self.buffer = message_buffer self._response = None self.content_type: ResponseType = None self.content = None self._parse_response()
def _parse_response(self) -> None: """ Parses the received message buffer into a protobuf response message. :raises ResponseException: If there is an error parsing the message buffer. """ from_ankaios = _control_api.FromAnkaios() try: # Deserialize the received proto msg from_ankaios.ParseFromString(self.buffer) except Exception as e: logger.error("Error parsing the received message: %s", e) raise ResponseException(f"Parsing error: '{e}'") from e if from_ankaios.HasField("response"): self._response = from_ankaios.response self._from_proto() elif from_ankaios.HasField("controlInterfaceAccepted"): self.content_type = ResponseType.CONTROL_INTERFACE_ACCEPTED elif from_ankaios.HasField("connectionClosed"): self.content_type = ResponseType.CONNECTION_CLOSED self.content = from_ankaios.connectionClosed.reason else: raise ResponseException( # pragma: no cover "Invalid response type." ) logger.debug( "Got response of type '%s' with request id '%s'", self.content_type, self.get_request_id(), ) # pylint: disable=too-many-branches def _from_proto(self) -> None: """ Converts the parsed protobuf message to a Response object. This can be either an error, a complete state, or an update state success. :raises ResponseException: If the response type is invalid. """ if self._response.HasField("error"): self.content_type = ResponseType.ERROR self.content = self._response.error.message elif self._response.HasField("completeStateResponse"): if self._response.completeStateResponse.HasField("alteredFields"): self.content_type = ResponseType.EVENT_RESPONSE self.content = EventEntry._from_response( self._response.completeStateResponse ) else: self.content_type = ResponseType.COMPLETE_STATE self.content = CompleteState( _proto=self._response.completeStateResponse.completeState ) elif self._response.HasField("UpdateStateSuccess"): update_state_msg = self._response.UpdateStateSuccess self.content_type = ResponseType.UPDATE_STATE_SUCCESS self.content = UpdateStateSuccess() for workload in update_state_msg.addedWorkloads: workload_name, workload_id, agent_name = workload.split(".") self.content.added_workloads.append( WorkloadInstanceName( agent_name, workload_name, workload_id ) ) for workload in update_state_msg.deletedWorkloads: workload_name, workload_id, agent_name = workload.split(".") self.content.deleted_workloads.append( WorkloadInstanceName( agent_name, workload_name, workload_id ) ) elif self._response.HasField("logEntriesResponse"): self.content_type = ResponseType.LOGS_ENTRY self.content = [] for log_entry in self._response.logEntriesResponse.logEntries: self.content.append(LogEntry._from_entries(log_entry)) elif self._response.HasField("logsRequestAccepted"): self.content_type = ResponseType.LOGS_REQUEST_ACCEPTED workload_names = self._response.logsRequestAccepted.workloadNames self.content = [ WorkloadInstanceName( workload.agentName, workload.workloadName, workload.id ) for workload in workload_names ] elif self._response.HasField("logsStopResponse"): self.content_type = ResponseType.LOGS_STOP_RESPONSE self.content = [ LogsStopResponse._from_stop_response( self._response.logsStopResponse ) ] elif self._response.HasField("logsCancelAccepted"): self.content_type = ResponseType.LOGS_CANCEL_ACCEPTED self.content = None elif self._response.HasField("eventsCancelAccepted"): self.content_type = ResponseType.EVENT_CANCEL_ACCEPTED self.content = None else: raise ResponseException("Invalid response type.")
[docs] def get_request_id(self) -> str: """ Gets the request id of the response. :returns: The request id of the response. :rtype: str """ if self.content_type in [ ResponseType.CONTROL_INTERFACE_ACCEPTED, ResponseType.CONNECTION_CLOSED, ]: return None return self._response.requestId
[docs] def get_content(self) -> tuple["ResponseType", Any]: """ Gets the content of the response. It can be either: - a string (error / connection closed) - a CompleteState object - an UpdateStateSuccess object - a list of log entires - a log stop response :returns: the content type and the content. :rtype: tuple[ResponseType, any] """ return (self.content_type, self.content)
[docs] class ResponseType(Enum): """Enumeration for the different types of response.""" ERROR = 1 "(int): Got an error from Ankaios." CONTROL_INTERFACE_ACCEPTED = 2 "(int): Control interface connection accepted." COMPLETE_STATE = 3 "(int): Got the complete state." UPDATE_STATE_SUCCESS = 4 "(int): Got a successful update state response." LOGS_ENTRY = 5 "(int): Got logs entry." LOGS_REQUEST_ACCEPTED = 6 "(int): Logs request accepted, waiting for logs." LOGS_STOP_RESPONSE = 7 "(int): Got logs stop response." LOGS_CANCEL_ACCEPTED = 8 "(int): Logs cancel request accepted." EVENT_RESPONSE = 9 "(int): Got an event response." EVENT_CANCEL_ACCEPTED = 10 "(int): Event cancel request accepted." CONNECTION_CLOSED = 20 "(int): Connection closed by the server."
[docs] def __str__(self) -> str: """ Converts the ResponseType to a string. :returns: The string representation of the ResponseType. :rtype: str """ return self.name.lower()
[docs] class UpdateStateSuccess: """ Represents an object that holds the added and deleted workloads. This is automatically returned whenever a state update is successful. :var list[`WorkloadInstanceName`] added_workloads: The list of added workloads. :var list[`WorkloadInstanceName`] deleted_workloads: The list of deleted workloads. """
[docs] def __init__(self) -> None: """ Initializes the UpdateStateSuccess. """ self.added_workloads = [] self.deleted_workloads = []
[docs] def to_dict(self) -> dict: """ Converts the UpdateStateSuccess to a dictionary. :returns: The dictionary representation. :rtype: dict """ return { "added_workloads": [ instance_name.to_dict() for instance_name in self.added_workloads ], "deleted_workloads": [ instance_name.to_dict() for instance_name in self.deleted_workloads ], }
[docs] def __str__(self) -> str: """ Converts the UpdateStateSuccess to a string. :returns: The string representation. :rtype: str """ added_workloads = [ str(instance_name) for instance_name in self.added_workloads ] deleted_workloads = [ str(instance_name) for instance_name in self.deleted_workloads ] return ( f"Added workloads: {added_workloads}, " f"Deleted workloads: {deleted_workloads}" )
@dataclass class LogEntry: """ Represents a log entry from a workload instance. """ workload_instance_name: WorkloadInstanceName """ The name of the workload instance from which the log entry was received. """ message: str """The log message.""" def __str__(self) -> str: """ Converts the LogEntry to a string. :returns: The string representation of the LogEntry. :rtype: str """ return ( f"Log from {self.workload_instance_name.workload_name}." f"{self.workload_instance_name.workload_id}." f"{self.workload_instance_name.agent_name}: " f"{self.message}" ) @staticmethod def _from_entries(log: _ank_base.LogEntry) -> "LogEntry": """ Creates a `LogEntry` from the proto alternative. :param log: The proto `LogEntry` to be converted. :type log: `_ank_base.LogEntry` :returns: The converted `LogEntry`. :rtype: `LogEntry` """ return LogEntry( WorkloadInstanceName( log.workloadName.agentName, log.workloadName.workloadName, log.workloadName.id, ), log.message, ) @dataclass class LogsStopResponse: """ Represents a response for marking the end of the log stream from a workload instance. """ workload_instance_name: WorkloadInstanceName """ The name of the workload instance from which no more logs will be sent. """ def __str__(self) -> str: """ Converts the LogsStopResponse to a string. :returns: The string representation of the LogsStopResponse. :rtype: str """ return ( f"Stopped receiving logs from " f"{self.workload_instance_name.workload_name}." f"{self.workload_instance_name.workload_id}." f"{self.workload_instance_name.agent_name}." ) @staticmethod def _from_stop_response( log: _ank_base.LogsStopResponse, ) -> "LogsStopResponse": """ Creates a `LogsStopResponse` from the proto alternative. :param log: The proto `LogsStopResponse` to be converted. :type log: `_ank_base.LogsStopResponse` :returns: The converted `LogsStopResponse`. :rtype: `LogsStopResponse` """ return LogsStopResponse( WorkloadInstanceName( log.workloadName.agentName, log.workloadName.workloadName, log.workloadName.id, ) ) LogResponse = Union[LogEntry, LogsStopResponse] @dataclass class EventEntry: """ Represents an event notification. """ complete_state: CompleteState """The complete state of the event indicating the state changes.""" added_fields: list[str] """The list of added fields of the state.""" updated_fields: list[str] """The list of updated fields of the state.""" removed_fields: list[str] """The list of removed fields of the state.""" def __str__(self) -> str: """ Converts the EventEntry to a string. :returns: The string representation of the EventEntry. :rtype: str """ ret = "Event:\n" if self.added_fields: ret += f" Added fields: {self.added_fields}\n" if self.updated_fields: ret += f" Updated fields: {self.updated_fields}\n" if self.removed_fields: ret += f" Deleted fields: {self.removed_fields}\n" return ret @staticmethod def _from_response( response: _ank_base.CompleteStateResponse, ) -> "EventEntry": """ Creates a `EventEntry` from the proto alternative. :param response: The proto `EventEntry` to be converted. :type response: `_ank_base.EventEntry` :returns: The converted `EventEntry`. :rtype: `EventEntry` """ return EventEntry( CompleteState(_proto=response.completeState), response.alteredFields.addedFields, response.alteredFields.updatedFields, response.alteredFields.removedFields, )