"""
component_python2.py
This module contains the SICComponent class, which is the base class for all components in the Social Interaction Cloud.
"""
import threading
import time
from abc import ABCMeta, abstractmethod
import six
from sic_framework.core.utils import is_sic_instance
from . import sic_logging, utils
from .message_python2 import (
SICConfMessage,
SICControlRequest,
SICPingRequest,
SICPongMessage,
)
[docs]
class SICComponent:
"""
Abstract class for Components that provide essential functions for Social Interaction Cloud applications.
:param ready_event: Threading event to signal when the component is ready. If None, creates a new Event.
:type ready_event: threading.Event, optional
:param stop_event: Threading event to signal when the component should stop. If None, creates a new Event.
:type stop_event: threading.Event, optional
:param conf: Configuration parameters for the component. If None, uses default configuration.
:type conf: dict, optional
"""
# 1. Class constants
# Make SICComponent an abstract class in Python 2.
# Ensures any subclass must implement all abstract methods denoted by @abstractmethod
__metaclass__ = ABCMeta
COMPONENT_STARTUP_TIMEOUT = 30
"""
Timeout in seconds for component startup.
This controls how long a SICConnector should wait when requesting a component to start.
Increase this value for components that need more time to initialize (e.g., robots
that need to stand up or models that need to load to GPU).
"""
COMPONENT_STOP_TIMEOUT = 2
"""
Timeout in seconds for a component stop.
This controls how long a SICConnector should wait when requesting a component to stop.
Increase this value for components that need more time to stop.
"""
# 2. Special methods
[docs]
def __init__(
self,
ready_event=None,
stop_event=None,
conf=None,
input_channel=None,
component_channel=None,
req_reply_channel=None,
client_id="",
endpoint="",
ip="",
redis=None
):
self.client_id = client_id
# Redis and logger initialization
try:
self._redis = redis
self.logger = sic_logging.get_sic_logger(
name=self.get_component_name(), client_id=self.client_id, redis=self._redis
)
self.logger.debug("Initialized Redis and logger")
except Exception as e:
raise e
self._ip = ip
self.component_endpoint = endpoint
# _ready_event is set once the component has started, signals to the component manager that the component is ready.
self._ready_event = ready_event if ready_event else threading.Event()
# _signal_to_stop is set when the component should stop
self._signal_to_stop = stop_event if stop_event else threading.Event()
# _stopped is set when the component has stopped
self._stopped = threading.Event()
# Components constrained to one input, request_reply, output channel
self.input_channel = input_channel
self.component_channel = component_channel
self.request_reply_channel = req_reply_channel
# Threads for the message and request handlers
self.message_handler_thread = None
self.request_handler_thread = None
self.params = None
self._threads = []
self.set_config(conf)
# 3. Class methods
[docs]
@classmethod
def get_component_name(cls):
"""
Get the display name of this component.
Returns the name of the subclass that implements this class (e.g. "DesktopCameraSensor")
:return: The component's display name (typically the class name)
:rtype: str
"""
return cls.__name__
# 4. Public instance methods
[docs]
def start(self):
"""
Start the component. This method registers a request handler, signals the component is ready,
and logs that the component has started.
Subclasses should call this method from their overridden start()
method to get the framework's default startup behavior.
"""
self.logger.debug("Registering request handler")
# register a request handler to handle requests
self.request_handler_thread = self._redis.register_request_handler(
self.request_reply_channel, self._handle_request, name="{}_request_handler".format(self.component_endpoint)
)
self.logger.debug("Request handler registered")
self.logger.debug("Registering message handler for input channel {}".format(self.input_channel))
# Create a closure for the message handler to register on the channel
def message_handler(message):
# Route through _handle_message to ensure type validation occurs
return self._handle_message(message)
self.message_handler_thread = self._redis.register_message_handler(
self.input_channel, message_handler, name="{}_message_handler".format(self.component_endpoint)
)
self.logger.debug("Message handler registered")
# communicate the service is set up and listening to its inputs
self._ready_event.set()
self.logger.info("Successfully started!")
[docs]
def stop(self, *args):
"""
Set the stop event to signal the component to stop.
Awaits for the component to stop and checks that the _stopped event is set.
"""
self._signal_to_stop.set()
if self._stopped.wait(timeout=self.COMPONENT_STOP_TIMEOUT):
self.logger.debug("Component's _stopped event set successfully")
else:
self.logger.warning("Component's _stopped event was not set within the specified timeout time")
[docs]
def set_config(self, new=None):
"""
Set the configuration for this component.
Calls _parse_conf() to parse the configuration message.
:param new: The new configuration. If None, uses the default configuration.
:type new: SICConfMessage, optional
"""
if new:
conf = new
else:
conf = self.get_conf()
self._parse_conf(conf)
[docs]
def on_request(self, request):
"""
Define the handler for Component specific requests. Must return a SICMessage as a reply to the request.
:param request: The request for this component.
:type request: SICRequest
:return: The reply
:rtype: SICMessage
"""
raise NotImplementedError("You need to define a request handler.")
[docs]
def on_message(self, message=""):
"""
Define the handler for input messages.
:param message: The message to handle.
:type message: SICMessage
:return: The reply
:rtype: SICMessage
"""
raise NotImplementedError("You need to define a message handler for component {}".format(self.component_endpoint))
[docs]
def output_message(self, message):
"""
Send a message on the output channel of this component.
Stores the component name in the message to allow for debugging.
:param message: The message to send.
:type message: SICMessage
"""
message._previous_component_name = self.get_component_name()
self._redis.send_message(self.component_channel, message)
[docs]
@staticmethod
@abstractmethod
def get_output():
"""
Define the output type of the component.
Must be implemented by the subclass.
:return: SIC message
:rtype: Type[SICMessage]
"""
raise NotImplementedError("You need to define service output.")
[docs]
@staticmethod
def get_conf():
"""
Define the expected configuration of the component using SICConfMessage.
:return: a SICConfMessage or None
:rtype: SICConfMessage
"""
return SICConfMessage()
# 5. Protected methods
[docs]
def _start(self):
"""
Wrapper for the user-implemented start method that provides error handling and logging.
This method calls the user's start() implementation and ensures any exceptions are
properly logged before being re-raised to the caller.
"""
try:
self.start()
except Exception as e:
self.logger.exception(e)
raise e
[docs]
def _handle_message(self, message):
"""
Handle incoming messages.
Validates the message against this Component's declared inputs (get_inputs) before
dispatching to the user-implemented on_message method. Messages that do not match
any declared input types are ignored.
:param message: The message to handle.
:type message: SICMessage
:return: The reply to the message, if any.
:rtype: SICMessage | None
"""
# First check if the message is of a valid type
try:
expected_inputs = self.get_inputs()
except Exception:
expected_inputs = []
# Normalize to list
if not isinstance(expected_inputs, (list, tuple)):
expected_inputs = [expected_inputs] if expected_inputs else []
# If no expected inputs declared, forward as-is
if expected_inputs:
is_valid = any(is_sic_instance(message, input_cls) for input_cls in expected_inputs)
if not is_valid:
# Ignore unexpected message types to prevent component crashes
self.logger.warning(
"Ignoring message of type {} not in expected inputs {}".format(
type(message).__name__, [c.__name__ for c in expected_inputs]
)
)
return None
return self.on_message(message)
[docs]
def _handle_request(self, request):
"""
Handle control requests such as SICPingRequests by calling generic Component methods.
Component specific requests are passed to the normal on_request handler.
:param request: The request to handle.
:type request: SICRequest
:return: The reply to the request.
:rtype: SICMessage
"""
self.logger.debug(
"Handling request {}".format(request.get_message_name())
)
if is_sic_instance(request, SICPingRequest):
return SICPongMessage()
if not is_sic_instance(request, SICControlRequest):
return self.on_request(request)
raise TypeError("Unknown request type {}".format(type(request)))
[docs]
def _parse_conf(self, conf):
"""
Parse configuration messages (SICConfMessage).
This method is called by set_config() to parse the configuration message.
:param conf: Configuration message to parse
:type conf: SICConfMessage
"""
assert is_sic_instance(conf, SICConfMessage), (
"Configuration message should be of type SICConfMessage, "
"is {type_conf}".format(type_conf=type(conf))
)
if conf == self.params:
self.logger.info("New configuration is identical to current configuration.")
return
self.params = conf
[docs]
def _get_timestamp(self):
"""
Get the current timestamp from the Redis server.
:return: The current timestamp
:rtype: float
"""
return self._redis.time()