"""
connector.py
This module contains the SICConnector class, the user interface to connect to components.
"""
import logging
import time
from abc import ABCMeta
import six
import sys
from sic_framework.core.sensor_python2 import SICSensor
from sic_framework.core.service_python2 import SICService
from sic_framework.core.utils import is_sic_instance, create_data_stream_id
from . import utils
from .component_manager_python2 import (
SICNotStartedMessage,
SICStartComponentRequest,
SICStopComponentRequest,
SICComponentStartedMessage
)
from .message_python2 import SICMessage, SICPingRequest, SICRequest, SICSuccessMessage
from . import sic_logging
from .sic_redis import SICRedisConnection
from sic_framework.core.sic_application import SICApplication
[docs]
class ComponentNotStartedError(Exception):
"""
An exception to indicate that a component failed to start.
"""
pass
[docs]
class SICConnector(object):
"""
The user interface to connect to components wherever they are running.
:param ip: The IP address of the component to connect to.
:type ip: str, optional
:param conf: The configuration for the connector.
:type conf: SICConfMessage, optional
"""
__metaclass__ = ABCMeta
# define how long an "instant" reply should take at most (ping sometimes takes more than 150ms)
_PING_TIMEOUT = 1
[docs]
def __init__(self,
ip="localhost",
conf=None,
input_source=None):
assert isinstance(ip, str), "IP must be string"
self.app = SICApplication()
# connect to Redis
self._redis = self.app.get_redis_instance()
self.name = "{component}Connector".format(component=self.__class__.__name__)
self.logger = sic_logging.get_sic_logger(
name=self.name, client_id=self.app.client_ip, redis=self._redis
)
# if the component is running on the same machine as the Connector
if ip in ["localhost", "127.0.0.1"]:
# get the ip address of the current machine on the network
ip = self.app.client_ip
self.component_name = self.component_class.get_component_name()
self.component_ip = ip
self.component_endpoint = self.component_name + ":" + self.component_ip
if input_source is None:
# If an input source is not provided, assume the client ID (IP address) is the input channel (i.e. Component is a Sensor/Actuator)
# First we create a component channel with the client ID as the input source
# Then we create an input channel with the component channel as the input source, so that the input channel is unique to the component
self.component_channel = create_data_stream_id(self.component_endpoint, self.app.client_ip)
self.input_channel = self.component_channel + ":input"
else:
# if the input channel is provided, assume the input source is a SICConnector
if not isinstance(input_source, SICConnector):
self.logger.error("Input source must be a SICConnector")
self.app.shutdown()
# If the input source is a SICConnector, we use the component channel of the input source as the input channel (already unique to the component)
self.input_channel = input_source.get_component_channel()
self.component_channel = create_data_stream_id(self.component_endpoint, self.input_channel)
self.request_reply_channel = self.component_channel + ":request_reply"
self._callback_threads = []
self._conf = conf
# make sure we can start the component and ping it
try:
self._start_component()
self.logger.debug("Received SICComponentStartedMessage, component successfully started")
assert self._ping(), "Component failed to ping"
except Exception as e:
self.logger.error(e)
raise RuntimeError(e)
self._callback_threads = []
self.app.register_connector(self)
self.logger.info("Component initialization complete")
@property
def component_class(self):
"""
The component class this connector is for.
:return: The component class this connector is for
:rtype: type[SICComponent]
"""
raise NotImplementedError("Abstract member component_class not set.")
[docs]
def send_message(self, message):
"""
Send a message to the component.
:param message: The message to send.
:type message: SICMessage
"""
# Update the timestamp, as it should be set by the device of origin
message._timestamp = self._get_timestamp()
self._redis.send_message(self.input_channel, message)
[docs]
def register_callback(self, callback):
"""
Subscribe a callback to be called when there is new data available.
:param callback: the function to execute.
:type callback: function
"""
try:
ct = self._redis.register_message_handler(self.get_component_channel(), callback, name="{}_callback".format(self.component_endpoint))
except Exception as e:
self.logger.error("Error registering callback: {}".format(e))
raise e
self._callback_threads.append(ct)
[docs]
def request(self, request, timeout=100.0, block=True):
"""
Send a request to the Component.
Waits until the reply is received. If the reply takes longer than `timeout` seconds to arrive,
a TimeoutError is raised. If block is set to false, the reply is ignored and the function
returns immediately.
:param request: The request to send to the component.
:type request: SICRequest
:param timeout: A timeout in case the action takes too long.
:type timeout: float
:param block: If false, immediately returns None after sending the request.
:type block: bool
:return: the SICMessage reply from the component, or None if blocking=False
:rtype: SICMessage | None
"""
self.logger.debug("Sending request: {} over channel: {}".format(request, self.request_reply_channel))
if isinstance(request, type):
self.logger.error(
"You probably forgot to initiate the class. For example, use NaoRestRequest() instead of NaoRestRequest."
)
assert utils.is_sic_instance(request, SICRequest), (
"Cannot send requests that do not inherit from "
"SICRequest (type: {req})".format(req=type(request))
)
# Update the timestamp, as it is not yet set (normally be set by the device of origin, e.g a camera)
request._timestamp = self._get_timestamp()
return self._redis.request(
self.request_reply_channel, request, timeout=timeout, block=block
)
[docs]
def stop_component(self):
"""
Send a StopComponentRequest to the respective ComponentManager, called on exit.
"""
self.logger.debug("Connector sending StopComponentRequest to ComponentManager")
stop_result = self._redis.request(self.component_ip, SICStopComponentRequest(self.component_channel, self.component_name), timeout=5)
if stop_result is None:
self.logger.error("Stop request timed out")
raise TimeoutError("Stop request timed out")
if not is_sic_instance(stop_result, SICSuccessMessage):
self.logger.error("Stop request failed")
raise RuntimeError("Stop request failed")
# close callback threads
self.logger.debug("Closing callback threads")
for ct in self._callback_threads[:]:
self._redis.unregister_callback(ct)
[docs]
def get_component_channel(self):
"""
Get the output channel of the component.
"""
return self.component_channel
[docs]
def _ping(self):
"""
Ping the component to check if it is alive.
:return: True if the component is alive, False otherwise.
:rtype: bool
"""
try:
self.request(SICPingRequest(), timeout=self._PING_TIMEOUT)
self.logger.debug("Received ping response")
return True
except TimeoutError:
self.logger.error("Timeout error when trying to ping component {}".format(self.component_class.get_component_name()))
return False
[docs]
def _start_component(self):
"""
Request the component to be started.
:return: The component we requested to be started
:rtype: SICComponent
"""
self.logger.info(
"Component is not already alive, requesting {} from manager {}".format(
self.component_class.get_component_name(),
self.component_ip,
),
)
component_request = SICStartComponentRequest(
component_name=self.component_class.get_component_name(),
endpoint=self.component_endpoint,
input_channel=self.input_channel,
component_channel=self.component_channel,
request_reply_channel=self.request_reply_channel,
client_id=self.app.client_ip,
conf=self._conf,
)
try:
# if successful, the component manager will send a SICComponentStartedMessage,
# which contains the ID of the output and req/reply channel
return_message = self._redis.request(
self.component_ip,
component_request,
timeout=self.component_class.COMPONENT_STARTUP_TIMEOUT,
)
if is_sic_instance(return_message, SICNotStartedMessage):
raise ComponentNotStartedError(
"\n\nComponent did not start, error should be logged above. ({})".format(
return_message.message
)
)
elif is_sic_instance(return_message, SICComponentStartedMessage):
self.logger.debug("Received SICComponentStartedMessage, component successfully started")
else:
self.logger.critical("Received unknown message type from component manager: {}".format(type(return_message)))
self.app.shutdown()
except TimeoutError as e:
# ? Why use six.raise_from?
six.raise_from(
TimeoutError(
"Could not connect to {}. Is SIC running on the device (ip:{})?".format(
self.component_class.get_component_name(), self.component_ip
)
),
None,
)
except Exception as e:
logging.error("Unknown exception occured while trying to start {name} component: {e}".format(name=self.component_class.get_component_name(), e=e))
def _get_timestamp(self):
return self._redis.time()