"""
connector.py
This module contains the SICConnector class, the user interface to connect to components.
"""
import logging
import time
from abc import ABCMeta
import six
import sys
from sic_framework.core.sensor_python2 import SICSensor
from sic_framework.core.service_python2 import SICService
from sic_framework.core.utils import is_sic_instance
from . import utils
from .component_manager_python2 import SICNotStartedMessage, SICStartComponentRequest
from .message_python2 import SICMessage, SICPingRequest, SICRequest, SICStopRequest
from . import sic_logging
from .sic_redis import SICRedis
[docs]
class ComponentNotStartedError(Exception):
"""
An exception to indicate that a component failed to start.
"""
pass
[docs]
class SICConnector(object):
"""
The user interface to connect to components wherever they are running.
:param ip: The IP address of the component to connect to.
:type ip: str, optional
:param log_level: The logging level to use for the connector.
:type log_level: logging.LOGLEVEL, optional
:param conf: The configuration for the connector.
:type conf: SICConfMessage, optional
"""
__metaclass__ = ABCMeta
# define how long an "instant" reply should take at most (ping sometimes takes more than 150ms)
_PING_TIMEOUT = 1
[docs]
def __init__(self,
ip="localhost",
log_level=logging.INFO,
conf=None,
input_source=None):
self._redis = SICRedis()
assert isinstance(ip, str), "IP must be string"
# connect to Redis
self._redis = SICRedis()
# client ID is the IP of whatever machine is running this connector
self.client_id = utils.get_ip_adress()
self._log_level = log_level
self.name = "{component}Connector".format(component=self.__class__.__name__)
self.logger = sic_logging.get_sic_logger(
name=self.name, client_id=self.client_id, redis=self._redis
)
self._redis.parent_logger = self.logger
# if the component is running on the same machine as the Connector
if ip in ["localhost", "127.0.0.1"]:
# get the ip address of the current machine on the network
ip = utils.get_ip_adress()
self.component_name = self.component_class.get_component_name()
self.component_ip = ip
self.component_id = self.component_name + ":" + self.component_ip
# if the input channel is not provided, assume the client ID (IP address) is the input channel (i.e. Component is a Sensor)
if input_source is None:
self._input_channel = ip
else:
if not isinstance(input_source, SICConnector):
self.logger.error("Input source must be a SICConnector")
sys.exit(1)
self._input_channel = input_source.get_output_channel()
self._callback_threads = []
self._conf = conf
# these are set once the component manager has started the component
self._request_reply_channel = None
self._output_channel = None
# make sure we can start the component and ping it
try:
self._start_component()
self.logger.debug("Component started")
assert self._ping()
except Exception as e:
self.logger.error(e)
raise RuntimeError(e)
self.logger.debug("Component initialization complete")
@property
def component_class(self):
"""
The component class this connector is for.
:return: The component class this connector is for
:rtype: type[SICComponent]
"""
raise NotImplementedError("Abstract member component_class not set.")
[docs]
def send_message(self, message):
"""
Send a message to the component.
:param message: The message to send.
:type message: SICMessage
"""
# Update the timestamp, as it should be set by the device of origin
message._timestamp = self._get_timestamp()
self._redis.send_message(self._input_channel, message)
[docs]
def register_callback(self, callback):
"""
Subscribe a callback to be called when there is new data available.
:param callback: the function to execute.
:type callback: function
"""
try:
ct = self._redis.register_message_handler(self.get_output_channel(), callback)
except Exception as e:
self.logger.error("Error registering callback: {}".format(e))
raise e
self._callback_threads.append(ct)
[docs]
def request(self, request, timeout=100.0, block=True):
"""
Send a request to the Component.
Waits until the reply is received. If the reply takes longer than `timeout` seconds to arrive,
a TimeoutError is raised. If block is set to false, the reply is ignored and the function
returns immediately.
:param request: The request to send to the component.
:type request: SICRequest
:param timeout: A timeout in case the action takes too long.
:type timeout: float
:param block: If false, immediately returns None after sending the request.
:type block: bool
:return: the SICMessage reply from the component, or None if blocking=False
:rtype: SICMessage | None
"""
self.logger.debug("Sending request: {} over channel: {}".format(request, self._request_reply_channel))
if isinstance(request, type):
self.logger.error(
"You probably forgot to initiate the class. For example, use NaoRestRequest() instead of NaoRestRequest."
)
assert utils.is_sic_instance(request, SICRequest), (
"Cannot send requests that do not inherit from "
"SICRequest (type: {req})".format(req=type(request))
)
# Update the timestamp, as it is not yet set (normally be set by the device of origin, e.g a camera)
request._timestamp = self._get_timestamp()
return self._redis.request(
self._request_reply_channel, request, timeout=timeout, block=block
)
[docs]
def stop(self):
"""
Send a stop request to the component and close the redis connection.
"""
self.logger.debug("Sending StopRequest to component")
self._redis.send_message(self._request_reply_channel, SICStopRequest())
if hasattr(self, "_redis"):
self._redis.close()
[docs]
def get_output_channel(self):
"""
Get the output channel of the component.
"""
return self._output_channel
[docs]
def _ping(self):
"""
Ping the component to check if it is alive.
:return: True if the component is alive, False otherwise.
:rtype: bool
"""
try:
self.request(SICPingRequest(), timeout=self._PING_TIMEOUT)
self.logger.debug("Received ping response")
return True
except TimeoutError:
self.logger.error("Timeout error when trying to ping component {}".format(self.component_class.get_component_name()))
return False
[docs]
def _start_component(self):
"""
Request the component to be started.
:return: The component we requested to be started
:rtype: SICComponent
"""
self.logger.info(
"Component is not already alive, requesting {} from manager {}".format(
self.component_class.get_component_name(),
self.component_ip,
),
)
component_request = SICStartComponentRequest(
component_name=self.component_class.get_component_name(),
log_level=self._log_level,
input_channel=self._input_channel,
client_id=self.client_id,
conf=self._conf,
)
try:
# if successful, the component manager will send a SICComponentStartedMessage,
# which contains the ID of the output and req/reply channel
return_message = self._redis.request(
self.component_ip,
component_request,
timeout=self.component_class.COMPONENT_STARTUP_TIMEOUT,
)
if is_sic_instance(return_message, SICNotStartedMessage):
raise ComponentNotStartedError(
"\n\nComponent did not start, error should be logged above. ({})".format(
return_message.message
)
)
else:
# set the output and request/reply channels
self._output_channel = return_message.output_channel
self._request_reply_channel = return_message.request_reply_channel
except TimeoutError as e:
# ? Why use six.raise_from?
six.raise_from(
TimeoutError(
"Could not connect to {}. Is SIC running on the device (ip:{})?".format(
self.component_class.get_component_name(), self.component_ip
)
),
None,
)
except Exception as e:
logging.error("Unknown exception occured while trying to start {name} component: {e}".format(name=self.component_class.get_component_name(), e=e))
def _get_timestamp(self):
# TODO this needs to be synchronized with all devices, because if a nao is off by a second or two
# its data will align wrong with other sources
# possible solution: do redis.time, and use a custom get time functions that is aware of the offset
return time.time()
# TODO: maybe put this in constructor to do a graceful exit on crash?
# register cleanup to disconnect redis if an exception occurs anywhere during exection
# TODO FIX cannot register multiple exepthooks
# sys.excepthook = self.cleanup_after_except
# #
# def cleanup_after_except(self, *args):
# self.stop()
# # call original except hook after stopping
# sys.__excepthook__(*args)
# TODO: maybe also helps for a graceful exit?
def __del__(self):
"""
Call stop() on the connector when it is deleted.
"""
try:
self.stop()
except Exception as e:
self.logger.error("Error in clean shutdown: {}".format(e))