Source code for ankaios_sdk._components.request

# Copyright (c) 2024 Elektrobit Automotive GmbH
#
# This program and the accompanying materials are made available under the
# terms of the Apache License, Version 2.0 which is available at
# https://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# SPDX-License-Identifier: Apache-2.0

"""
This module defines the Request class for creating and handling
requests to the Ankaios system.

Classes
-------

- Request:
    Represents the base request to the Ankaios system. It is an abstract
    class that should be subclassed for specific request types.
- GetStateRequest:
    Represents a request to get the state of the Ankaios system.
- UpdateStateRequest:
    Represents a request to update the state of the Ankaios system.
- LogsRequest:
    Represents a request to get logs from the Ankaios system.
- LogsCancelRequest:
    Represents a request to stop the real-time log stream from the
    Ankaios system.

Usage
-----

- Create a Request for updating the state:
    .. code-block:: python

        complete_state = CompleteState()
        request = UpdateStateRequest(
            complete_state, masks=["desiredState.workloads"]
        )

- Create a Request for getting the state:
    .. code-block:: python

        request = GetStateRequest(masks=["desiredState.workloads"])

- Create a Request for getting logs for a workload:
    .. code-block:: python

        workload_name: WorkloadInstanceName = ...
        request = LogsRequest(workload_names=[workload_name])

- Create a Request for getting a continuous stream of logs:
    .. code-block:: python

        workload_name: WorkloadInstanceName = ...
        request = LogsRequest(workload_names=[workload_name], follow=True)

- Create a Request for stopping the log stream:
    .. code-block:: python

        request = LogsCancelRequest()

- Get the request ID:
    .. code-block:: python

        request_id = request.get_id()
"""

__all__ = [
    "Request",
    "GetStateRequest",
    "UpdateStateRequest",
    "LogsRequest",
    "LogsCancelRequest",
]

import uuid
from typing import Union
from datetime import datetime
from .._protos import _ank_base
from ..utils import get_logger
from .complete_state import CompleteState
from .workload_state import WorkloadInstanceName


[docs] class Request: """ Represents a request to the Ankaios system. """
[docs] def __init__(self, _id: str = None) -> None: """ Initializes a Request instance. Args: _id (str): The request ID. If None, a new UUID will be generated. Raises: TypeError: If the Request class is instantiated directly. """ if self.__class__ is Request: raise TypeError("Request cannot be instantiated directly.") self._request = _ank_base.Request() self._set_id(_id if _id else str(uuid.uuid4())) self.logger = get_logger()
[docs] def __str__(self) -> str: """ Returns the string representation of the request. Returns: str: The string representation of the request. """ return str(self._to_proto())
[docs] def get_id(self) -> str: """ Gets the request ID. Returns: str: The request ID. """ return self._request.requestId
def _set_id(self, request_id: str) -> None: """ Sets the request ID. Args: request_id (str): The request ID to set. """ self._request.requestId = request_id def _to_proto(self) -> _ank_base.Request: """ Converts the Request object to a proto message. Returns: _ank_base.Request: The protobuf message representing the request. """ return self._request
# pylint: disable=too-few-public-methods, dangerous-default-value
[docs] class GetStateRequest(Request): """ Represents a request for getting the state of the Ankaios system. This request includes an optional list of masks to specify which fields should be included in the response. """ def __init__(self, masks: list = []) -> None: """ Initializes a GetStateRequest instance. Args: masks (list): The masks to set for the request. """ super().__init__() self._request.completeStateRequest.fieldMask[:] = masks self.logger.debug( "Created request of type GetState with id %s", self._request.requestId, )
# pylint: disable=too-few-public-methods, dangerous-default-value
[docs] class UpdateStateRequest(Request): """ Represents a request for updating the state of the Ankaios system. This request includes the new state and an optional list of masks to specify which fields should be updated. """ def __init__( self, complete_state: CompleteState, masks: list = [] ) -> None: """ Initializes an UpdateStateRequest instance. Args: complete_state (CompleteState): The new state to set. masks (list): The masks to set for the request. """ super().__init__() self._request.updateStateRequest.updateMask[:] = masks self._request.updateStateRequest.newState.CopyFrom( complete_state._to_proto() ) self.logger.debug( "Created request of type UpdateState with id %s", self._request.requestId, )
# pylint: disable=too-few-public-methods, dangerous-default-value
[docs] class LogsRequest(Request): """ Represents a request for getting logs from the Ankaios system. """ # pylint: disable=too-many-arguments
[docs] def __init__( self, workload_names: list[WorkloadInstanceName], *, follow: bool = False, tail: int = -1, since: Union[str, datetime] = "", until: Union[str, datetime] = "", ) -> None: """ Initializes an LogsRequest instance. Args: workload_names (list[WorkloadInstanceName]): The workload instance names for which to get logs. follow (bool): If true, the logs will be continuously streamed. tail (int): The number of lines to display from the end of the logs. since (str / datetime): The start time for the logs. If string, it must be in the RFC3339 format. until (str / datetime): The end time for the logs. If string, it must be in the RFC3339 format. Raises: ValueError: If no workload names are provided. """ if len(workload_names) == 0: raise ValueError("At least one workload name must be provided.") super().__init__() self._request.logsRequest.CopyFrom( _ank_base.LogsRequest( workloadNames=[name._to_proto() for name in workload_names], follow=follow, tail=tail, ) ) if since: if isinstance(since, str): self._request.logsRequest.since = since else: self._request.logsRequest.since = since.isoformat() if until: if isinstance(until, str): self._request.logsRequest.until = until else: self._request.logsRequest.until = until.isoformat() self.logger.debug( "Created request of type LogsRequest with id %s", self._request.requestId, )
# pylint: disable=too-few-public-methods, dangerous-default-value
[docs] class LogsCancelRequest(Request): """ Represents a request for stopping the real-time log stream from the Ankaios system. """
[docs] def __init__(self, request_id: str) -> None: """ Initializes an LogsCancelRequest instance. Args: id (str): The request ID. """ super().__init__(_id=request_id) self._request.logsCancelRequest.CopyFrom(_ank_base.LogsCancelRequest()) self.logger.debug( "Created request of type LogsCancelRequest with id %s", self._request.requestId, )