# Copyright (c) 2024 Elektrobit Automotive GmbH
#
# This program and the accompanying materials are made available under the
# terms of the Apache License, Version 2.0 which is available at
# https://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# SPDX-License-Identifier: Apache-2.0
"""
This script defines the Response class and its associated types for handling
responses from the control interface. It includes methods for parsing the
received messages and converting them into appropriate Python objects.
Classes
--------
- :class:`Response`:
Represents a response from the control interface.
- :class:`UpdateStateSuccess`:
Represents a response for a successful update state request.
- :class:`LogEntry`:
Represents a log entry from a workload instance.
- :class:`LogsStopResponse`:
Represents a response for marking the end of the log stream from a
workload instance.
- :class:`EventEntry`:
Represents an event.
Enums
-----
- :class:`ResponseType`:
Enumeration for the different types of response. It includes responses
like ERROR, CONNECTION_CLOSED, COMPLETE_STATE and so on.
Union Types
-----------
- :class:`LogResponse`:
Union type for log responses, which can be either :class:`LogEntry` or
:class:`LogsStopResponse`.
Usage
-----
- Get response content:
.. code-block:: python
response = Response()
(content_type, content) = response.get_content()
- Check if the request_id matches:
.. code-block:: python
response = Response()
if response.check_request_id("1234"):
print("Request ID matches")
- Convert the update state success to a dictionary:
.. code-block:: python
update_state_success = UpdateStateSuccess()
update_state_success.to_dict()
"""
__all__ = [
"Response",
"ResponseType",
"UpdateStateSuccess",
"LogEntry",
"LogsStopResponse",
"LogResponse",
"EventEntry",
]
from dataclasses import dataclass
from typing import Any, Union
from enum import Enum
from .._protos import _ank_base, _control_api
from ..exceptions import ResponseException
from ..utils import get_logger
from .complete_state import CompleteState
from .workload_state import WorkloadInstanceName
logger = get_logger()
[docs]
class Response:
"""
Represents a response received from the Ankaios system.
:var bytes buffer:
The received message buffer.
:var ResponseType content_type:
The type of the response content.
:var content:
The content of the response, which can store any
response type.
"""
[docs]
def __init__(self, message_buffer: bytes) -> None:
"""
Initializes the Response object with the received message buffer.
:param message_buffer: The received message buffer.
:type message_buffer: bytes
"""
self.buffer = message_buffer
self._response = None
self.content_type: ResponseType = None
self.content = None
self._parse_response()
def _parse_response(self) -> None:
"""
Parses the received message buffer into a protobuf response message.
:raises ResponseException:
If there is an error parsing the message buffer.
"""
from_ankaios = _control_api.FromAnkaios()
try:
# Deserialize the received proto msg
from_ankaios.ParseFromString(self.buffer)
except Exception as e:
logger.error("Error parsing the received message: %s", e)
raise ResponseException(f"Parsing error: '{e}'") from e
if from_ankaios.HasField("response"):
self._response = from_ankaios.response
self._from_proto()
elif from_ankaios.HasField("controlInterfaceAccepted"):
self.content_type = ResponseType.CONTROL_INTERFACE_ACCEPTED
elif from_ankaios.HasField("connectionClosed"):
self.content_type = ResponseType.CONNECTION_CLOSED
self.content = from_ankaios.connectionClosed.reason
else:
raise ResponseException( # pragma: no cover
"Invalid response type."
)
logger.debug(
"Got response of type '%s' with request id '%s'",
self.content_type,
self.get_request_id(),
)
# pylint: disable=too-many-branches
def _from_proto(self) -> None:
"""
Converts the parsed protobuf message to a Response object.
This can be either an error, a complete state,
or an update state success.
:raises ResponseException: If the response type is invalid.
"""
if self._response.HasField("error"):
self.content_type = ResponseType.ERROR
self.content = self._response.error.message
elif self._response.HasField("completeStateResponse"):
if self._response.completeStateResponse.HasField("alteredFields"):
self.content_type = ResponseType.EVENT_RESPONSE
self.content = EventEntry._from_response(
self._response.completeStateResponse
)
else:
self.content_type = ResponseType.COMPLETE_STATE
self.content = CompleteState(
_proto=self._response.completeStateResponse.completeState
)
elif self._response.HasField("UpdateStateSuccess"):
update_state_msg = self._response.UpdateStateSuccess
self.content_type = ResponseType.UPDATE_STATE_SUCCESS
self.content = UpdateStateSuccess()
for workload in update_state_msg.addedWorkloads:
workload_name, workload_id, agent_name = workload.split(".")
self.content.added_workloads.append(
WorkloadInstanceName(
agent_name, workload_name, workload_id
)
)
for workload in update_state_msg.deletedWorkloads:
workload_name, workload_id, agent_name = workload.split(".")
self.content.deleted_workloads.append(
WorkloadInstanceName(
agent_name, workload_name, workload_id
)
)
elif self._response.HasField("logEntriesResponse"):
self.content_type = ResponseType.LOGS_ENTRY
self.content = []
for log_entry in self._response.logEntriesResponse.logEntries:
self.content.append(LogEntry._from_entries(log_entry))
elif self._response.HasField("logsRequestAccepted"):
self.content_type = ResponseType.LOGS_REQUEST_ACCEPTED
workload_names = self._response.logsRequestAccepted.workloadNames
self.content = [
WorkloadInstanceName(
workload.agentName, workload.workloadName, workload.id
)
for workload in workload_names
]
elif self._response.HasField("logsStopResponse"):
self.content_type = ResponseType.LOGS_STOP_RESPONSE
self.content = [
LogsStopResponse._from_stop_response(
self._response.logsStopResponse
)
]
elif self._response.HasField("logsCancelAccepted"):
self.content_type = ResponseType.LOGS_CANCEL_ACCEPTED
self.content = None
elif self._response.HasField("eventsCancelAccepted"):
self.content_type = ResponseType.EVENT_CANCEL_ACCEPTED
self.content = None
else:
raise ResponseException("Invalid response type.")
[docs]
def get_request_id(self) -> str:
"""
Gets the request id of the response.
:returns: The request id of the response.
:rtype: str
"""
if self.content_type in [
ResponseType.CONTROL_INTERFACE_ACCEPTED,
ResponseType.CONNECTION_CLOSED,
]:
return None
return self._response.requestId
[docs]
def get_content(self) -> tuple["ResponseType", Any]:
"""
Gets the content of the response. It can be either:
- a string (error / connection closed)
- a CompleteState object
- an UpdateStateSuccess object
- a list of log entires
- a log stop response
:returns: the content type and the content.
:rtype: tuple[ResponseType, any]
"""
return (self.content_type, self.content)
[docs]
class ResponseType(Enum):
"""Enumeration for the different types of response."""
ERROR = 1
"(int): Got an error from Ankaios."
CONTROL_INTERFACE_ACCEPTED = 2
"(int): Control interface connection accepted."
COMPLETE_STATE = 3
"(int): Got the complete state."
UPDATE_STATE_SUCCESS = 4
"(int): Got a successful update state response."
LOGS_ENTRY = 5
"(int): Got logs entry."
LOGS_REQUEST_ACCEPTED = 6
"(int): Logs request accepted, waiting for logs."
LOGS_STOP_RESPONSE = 7
"(int): Got logs stop response."
LOGS_CANCEL_ACCEPTED = 8
"(int): Logs cancel request accepted."
EVENT_RESPONSE = 9
"(int): Got an event response."
EVENT_CANCEL_ACCEPTED = 10
"(int): Event cancel request accepted."
CONNECTION_CLOSED = 20
"(int): Connection closed by the server."
[docs]
def __str__(self) -> str:
"""
Converts the ResponseType to a string.
:returns: The string representation of the ResponseType.
:rtype: str
"""
return self.name.lower()
[docs]
class UpdateStateSuccess:
"""
Represents an object that holds the added and deleted workloads.
This is automatically returned whenever a state update is successful.
:var list[`WorkloadInstanceName`] added_workloads:
The list of added workloads.
:var list[`WorkloadInstanceName`] deleted_workloads:
The list of deleted workloads.
"""
[docs]
def __init__(self) -> None:
"""
Initializes the UpdateStateSuccess.
"""
self.added_workloads = []
self.deleted_workloads = []
[docs]
def to_dict(self) -> dict:
"""
Converts the UpdateStateSuccess to a dictionary.
:returns: The dictionary representation.
:rtype: dict
"""
return {
"added_workloads": [
instance_name.to_dict()
for instance_name in self.added_workloads
],
"deleted_workloads": [
instance_name.to_dict()
for instance_name in self.deleted_workloads
],
}
[docs]
def __str__(self) -> str:
"""
Converts the UpdateStateSuccess to a string.
:returns: The string representation.
:rtype: str
"""
added_workloads = [
str(instance_name) for instance_name in self.added_workloads
]
deleted_workloads = [
str(instance_name) for instance_name in self.deleted_workloads
]
return (
f"Added workloads: {added_workloads}, "
f"Deleted workloads: {deleted_workloads}"
)
[docs]
@dataclass
class LogEntry:
"""
Represents a log entry from a workload instance.
"""
workload_instance_name: WorkloadInstanceName
"""
The name of the workload instance from which the log entry was received.
"""
message: str
"""The log message."""
[docs]
def __str__(self) -> str:
"""
Converts the LogEntry to a string.
:returns: The string representation of the LogEntry.
:rtype: str
"""
return (
f"Log from {self.workload_instance_name.workload_name}."
f"{self.workload_instance_name.workload_id}."
f"{self.workload_instance_name.agent_name}: "
f"{self.message}"
)
@staticmethod
def _from_entries(log: _ank_base.LogEntry) -> "LogEntry":
"""
Creates a `LogEntry` from the proto alternative.
:param log: The proto `LogEntry` to be converted.
:type log: `_ank_base.LogEntry`
:returns: The converted `LogEntry`.
:rtype: `LogEntry`
"""
return LogEntry(
WorkloadInstanceName(
log.workloadName.agentName,
log.workloadName.workloadName,
log.workloadName.id,
),
log.message,
)
[docs]
@dataclass
class LogsStopResponse:
"""
Represents a response for marking the end of the log stream from a
workload instance.
"""
workload_instance_name: WorkloadInstanceName
"""
The name of the workload instance from which no more logs will be sent.
"""
[docs]
def __str__(self) -> str:
"""
Converts the LogsStopResponse to a string.
:returns: The string representation of the LogsStopResponse.
:rtype: str
"""
return (
f"Stopped receiving logs from "
f"{self.workload_instance_name.workload_name}."
f"{self.workload_instance_name.workload_id}."
f"{self.workload_instance_name.agent_name}."
)
@staticmethod
def _from_stop_response(
log: _ank_base.LogsStopResponse,
) -> "LogsStopResponse":
"""
Creates a `LogsStopResponse` from the proto alternative.
:param log: The proto `LogsStopResponse` to be converted.
:type log: `_ank_base.LogsStopResponse`
:returns: The converted `LogsStopResponse`.
:rtype: `LogsStopResponse`
"""
return LogsStopResponse(
WorkloadInstanceName(
log.workloadName.agentName,
log.workloadName.workloadName,
log.workloadName.id,
)
)
LogResponse = Union[LogEntry, LogsStopResponse]
[docs]
@dataclass
class EventEntry:
"""
Represents an event notification.
"""
complete_state: CompleteState
"""The complete state of the event indicating the state changes."""
added_fields: list[str]
"""The list of added fields of the state."""
updated_fields: list[str]
"""The list of updated fields of the state."""
removed_fields: list[str]
"""The list of removed fields of the state."""
[docs]
def __str__(self) -> str:
"""
Converts the EventEntry to a string.
:returns: The string representation of the EventEntry.
:rtype: str
"""
ret = "Event:\n"
if self.added_fields:
ret += f" Added fields: {self.added_fields}\n"
if self.updated_fields:
ret += f" Updated fields: {self.updated_fields}\n"
if self.removed_fields:
ret += f" Deleted fields: {self.removed_fields}\n"
return ret
@staticmethod
def _from_response(
response: _ank_base.CompleteStateResponse,
) -> "EventEntry":
"""
Creates a `EventEntry` from the proto alternative.
:param response: The proto `EventEntry` to be converted.
:type response: `_ank_base.EventEntry`
:returns: The converted `EventEntry`.
:rtype: `EventEntry`
"""
return EventEntry(
CompleteState(_proto=response.completeState),
response.alteredFields.addedFields,
response.alteredFields.updatedFields,
response.alteredFields.removedFields,
)