Source code for ankaios_sdk._components.control_interface

# Copyright (c) 2024 Elektrobit Automotive GmbH
#
# This program and the accompanying materials are made available under the
# terms of the Apache License, Version 2.0 which is available at
# https://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# SPDX-License-Identifier: Apache-2.0

"""
This script defines the ControlInterface class that handles the writing
and reading of data to and from the Ankaios control interface.

Classes
-------

- ControlInterface:
    Handles the interaction with the Ankaios control interface.

Enums
-----

- ControlInterfaceState:
    Represents the state of the control interface.

Usage
-----

- Create a Control Interface instance, connect and disconnect.
    .. code-block:: python

        ci = ControlInterface(<callbacks from Ankaios>)
        ci.connect()
        ...
        ci.disconnect()

- Change the state of the control interface.
    .. code-block:: python

        ci.change_state(ControlInterfaceState.CONNECTED)
"""


__all__ = ["ControlInterface", "ControlInterfaceState"]


import os
import select
import time
import threading
from typing import Callable
from enum import Enum
from google.protobuf.internal.encoder import _VarintBytes
from google.protobuf.internal.decoder import _DecodeVarint

from .._protos import _control_api
from .request import Request
from .response import Response, ResponseException
from ..exceptions import ControlInterfaceException, ConnectionClosedException
from ..utils import DEFAULT_CONTROL_INTERFACE_PATH, get_logger, ANKAIOS_VERSION


[docs] class ControlInterfaceState(Enum): """ The state of the control interface. """ INITIALIZED = 1 "(int): Connection established state." TERMINATED = 2 "(int): Connection stopped state." AGENT_DISCONNECTED = 3 "(int): Agent disconnected state." CONNECTION_CLOSED = 4 "(int): Connection closed state."
[docs] def __str__(self) -> str: """ Returns the string representation of the state. Returns: str: The state as a string. """ return self.name
# pylint: disable=too-many-instance-attributes
[docs] class ControlInterface: """ This class handles the interaction with the Ankaios control interface. It provides methods to send and receive data to and from the control interface pipes. Attributes: path (str): The path to the control interface. """ ANKAIOS_CONTROL_INTERFACE_BASE_PATH = "/run/ankaios/control_interface" "(str): The base path for the Ankaios control interface."
[docs] def __init__(self, add_response_callback: Callable, state_changed_callback: Callable ) -> None: """ Initialize the ControlInterface object. This is used to interact with the control interface. Args: add_response_callback (Callable): The callback function to add a response to the Ankaios class. state_changed_callback (Callable): The callback function to to call when the state of the control interface changes. """ self.path = DEFAULT_CONTROL_INTERFACE_PATH self._input_file = None self._output_file = None # The state of the control interface must not be changed directly. # Use the change_state method instead. self._state = ControlInterfaceState.TERMINATED self._read_thread = None self._disconnect_event = threading.Event() self._add_response_callback = add_response_callback self._state_changed_callback = state_changed_callback self._logger = get_logger()
@property def state(self) -> ControlInterfaceState: """ Returns the current state of the control interface. Returns: ControlInterfaceState: The state of the control interface. """ return self._state
[docs] def connect(self) -> None: """ Connect to the control interface by starting to read from the input fifo and opening the output fifo. Raises: AnkaiosConnectionException: If an error occurred. """ if self._state == ControlInterfaceState.INITIALIZED: raise ControlInterfaceException("Already connected.") if not os.path.exists( f"{self.ANKAIOS_CONTROL_INTERFACE_BASE_PATH}/input"): raise ControlInterfaceException( "Control interface input fifo does not exist." ) if not os.path.exists( f"{self.ANKAIOS_CONTROL_INTERFACE_BASE_PATH}/output"): raise ControlInterfaceException( "Control interface output fifo does not exist." ) # pylint: disable=consider-using-with try: self._output_file = open( f"{self.ANKAIOS_CONTROL_INTERFACE_BASE_PATH}/output", "ab" ) except Exception as e: self._logger.error("Error while opening output fifo: %s", e) raise ControlInterfaceException( "Error while opening output fifo." ) from e self._read_thread = threading.Thread( target=self._read_from_control_interface, daemon=True ) self._read_thread.start() self.change_state(ControlInterfaceState.INITIALIZED) self._send_initial_hello()
[docs] def disconnect(self) -> None: """ Disconnect from the control interface. """ if not self._state == ControlInterfaceState.INITIALIZED: self._logger.debug("Already disconnected.") return self._logger.debug("Disconnecting..") self._disconnect_event.set() if self._read_thread is not None: self._read_thread.join(timeout=2) if self._read_thread.is_alive(): self._logger.error("Read thread did not stop.") self._read_thread = None self._cleanup()
def _cleanup(self) -> None: """ Clean up the resources. """ self.change_state(ControlInterfaceState.TERMINATED) if self._output_file is not None: self._output_file.close() self._output_file = None # The input file will be closed by the reading thread. # If the thread gets terminated or it's stuck, the input file # will be closed here. No cover because it's an exceptional case. if self._input_file is not None: # pragma: no cover self._input_file.close() self._input_file = None self._logger.debug("Cleanup happened")
[docs] def change_state(self, state: ControlInterfaceState) -> None: """ Change the state of the control interface. Args: state (ControlInterfaceState): The new state. """ if state == self._state: self._logger.debug("State is already %s.", state) return self._state = state self._state_changed_callback(state)
# pylint: disable=too-many-statements, too-many-branches def _read_from_control_interface(self) -> None: """ Reads from the control interface input fifo. This is meant to be run in a separate thread. The responses are then sent to the Ankaios class to be handled. Raises: AnkaiosConnectionException: If an error occurs while reading the fifo. """ # The pragma: no cover is used on small checks that are not expected # to fail. This method is difficult to test and testing each check # would be redundant. # pylint: disable=invalid-name MOST_SIGNIFICANT_BIT_MASK = 0b10000000 # pylint: disable=consider-using-with try: self._input_file = open( f"{self.ANKAIOS_CONTROL_INTERFACE_BASE_PATH}/input", "rb" ) except Exception as e: self._logger.error("Error while opening input fifo: %s", e) self.disconnect() raise ControlInterfaceException( "Error while opening input fifo." ) from e os.set_blocking(self._input_file.fileno(), False) try: self._logger.info("Started reading from the input pipe.") while not self._disconnect_event.is_set(): # The loop continues when data is available or when the # timeout of 1 second is reached. ready, _, _ = select.select([self._input_file], [], [], 1) if not ready: # pragma: no cover continue # Buffer for reading in the byte size of the proto msg varint_buffer = bytearray() while not self._disconnect_event.is_set(): # Consume byte for byte next_byte = self._input_file.read(1) if not next_byte: # pragma: no cover break varint_buffer += next_byte # Check if we reached the last byte if next_byte[0] & MOST_SIGNIFICANT_BIT_MASK == 0: break if not varint_buffer: self.change_state( ControlInterfaceState.AGENT_DISCONNECTED) self._logger.warning( "Nothing to read from the input fifo pipe." ) self._agent_gone_routine() continue # Decode the varint and receive the proto msg length msg_len, _ = _DecodeVarint(varint_buffer, 0) # Buffer for the proto msg itself msg_buf = bytearray() for _ in range(msg_len): # Read the message according to the length next_byte = self._input_file.read(1) if not next_byte: # pragma: no cover break msg_buf += next_byte try: response = Response(bytes(msg_buf)) except ResponseException as e: # pragma: no cover self._logger.error("Error while reading: %s", e) continue except ConnectionClosedException as e: # pragma: no cover self._logger.error("Connection closed: %s", e) self.change_state(ControlInterfaceState.CONNECTION_CLOSED) break self._add_response_callback(response) except Exception as e: # pylint: disable=broad-exception-caught self._logger.error("Error while reading fifo file: %s", e) finally: self._input_file.close() self._input_file = None self._cleanup() def _agent_gone_routine(self) -> None: """ Method will be called when the agent is gone. It will attempt to write the hello message to the agent until the agent is connected. """ AGENT_RECONNECT_INTERVAL = 1 # seconds while self.state == ControlInterfaceState.AGENT_DISCONNECTED: try: self._send_initial_hello() except BrokenPipeError as _: self._logger.warning( "Waiting for the agent.." ) time.sleep(AGENT_RECONNECT_INTERVAL) else: self.change_state(ControlInterfaceState.INITIALIZED) break def _write_to_pipe(self, to_ankaios: _control_api.ToAnkaios) -> None: """ Writes the ToAnkaios proto message to the control interface output fifo. Args: to_ankaios (_control_api.ToAnkaios): The ToAnkaios proto message. Raises: AnkaiosConnectionException: If the output pipe is None. """ if self._output_file is None: self._logger.error( "Could not write to pipe, output file handler is None." ) raise ControlInterfaceException( "Could not write to pipe, output file handler is None." ) # Adds the byte length of the proto msg self._output_file.write(_VarintBytes(to_ankaios.ByteSize())) # Adds the proto msg itself self._output_file.write(to_ankaios.SerializeToString()) self._output_file.flush()
[docs] def write_request(self, request: Request) -> None: """ Writes the request into the control interface output fifo. Args: request (Request): The request object to be written. Raises: AnkaiosConnectionException: If not connected. """ if not self._state == ControlInterfaceState.INITIALIZED: raise ControlInterfaceException( "Could not write to pipe, not connected.") request_to_ankaios = _control_api.ToAnkaios( request=request._to_proto() ) self._write_to_pipe(request_to_ankaios)
def _send_initial_hello(self) -> None: """ Send an initial hello message with the version to the control interface. Raises: AnkaiosConnectionException: If an error occurred. """ initial_hello = _control_api.ToAnkaios( hello=_control_api.Hello( protocolVersion=str(ANKAIOS_VERSION) ) ) self._write_to_pipe(initial_hello) self._logger.debug("Sent initial hello message with the version %s", ANKAIOS_VERSION)