Source code for sic_framework.devices.naoqi_shared

from __future__ import print_function

from abc import ABCMeta, abstractmethod

from sic_framework.core import sic_redis, utils
from sic_framework.core.message_python2 import SICPingRequest, SICPongMessage
from sic_framework.core.utils import MAGIC_STARTED_COMPONENT_MANAGER_TEXT
from sic_framework.devices.common_naoqi.nao_motion_streamer import *
from sic_framework.devices.common_naoqi.naoqi_autonomous import *
from sic_framework.devices.common_naoqi.naoqi_button import (
    NaoqiButton,
    NaoqiButtonSensor,
)
from sic_framework.devices.common_naoqi.naoqi_camera import *
from sic_framework.devices.common_naoqi.naoqi_leds import *
from sic_framework.devices.common_naoqi.naoqi_lookat import (
    NaoqiLookAt,
    NaoqiLookAtComponent,
)
from sic_framework.devices.common_naoqi.naoqi_microphone import *
from sic_framework.devices.common_naoqi.naoqi_motion import *
from sic_framework.devices.common_naoqi.naoqi_motion_recorder import *
from sic_framework.devices.common_naoqi.naoqi_speakers import *
from sic_framework.devices.common_naoqi.naoqi_stiffness import *
from sic_framework.devices.common_naoqi.naoqi_text_to_speech import *
from sic_framework.devices.common_naoqi.naoqi_tracker import (
    NaoqiTracker,
    NaoqiTrackerActuator,
)
from sic_framework.devices.device import SICDevice

shared_naoqi_components = [
    NaoqiTopCameraSensor,
    NaoqiBottomCameraSensor,
    NaoqiMicrophoneSensor,
    NaoqiMotionActuator,
    NaoqiTextToSpeechActuator,
    NaoqiMotionRecorderActuator,
    NaoqiStiffnessActuator,
    NaoqiAutonomousActuator,
    NaoqiLEDsActuator,
    NaoqiSpeakerComponent,
    NaoqiButtonSensor,
    NaoqiTrackerActuator,
    NaoqiLookAtComponent,
]


[docs] class Naoqi(SICDevice): __metaclass__ = ABCMeta
[docs] def __init__( self, ip, robot_type, venv, device_path, sic_version=None, dev_test=False, test_device_path="", test_repo=None, bypass_install=False, top_camera_conf=None, bottom_camera_conf=None, mic_conf=None, motion_conf=None, tts_conf=None, motion_record_conf=None, motion_stream_conf=None, stiffness_conf=None, speaker_conf=None, lookat_conf=None, username=None, passwords=None, ): super().__init__( ip, sic_version=sic_version, username=username, passwords=passwords, ) # Set the component configs self.configs[NaoqiTopCamera] = top_camera_conf self.configs[NaoqiBottomCamera] = bottom_camera_conf self.configs[NaoqiMicrophone] = mic_conf self.configs[NaoqiMotion] = motion_conf self.configs[NaoqiTextToSpeech] = tts_conf self.configs[NaoqiMotionRecorder] = motion_record_conf self.configs[NaoqiMotionStreamer] = motion_stream_conf self.configs[NaoqiStiffness] = stiffness_conf self.configs[NaoqiSpeaker] = speaker_conf self.configs[NaoqiLookAt] = lookat_conf self.robot_type = robot_type self.dev_test = dev_test self.test_repo = test_repo self.bypass_install = bypass_install assert robot_type in [ "nao", "pepper", ], "Robot type must be either 'nao' or 'pepper'" redis_hostname, _ = sic_redis.get_redis_db_ip_password() if redis_hostname == "127.0.0.1" or redis_hostname == "localhost": # get own public ip address for the device to use redis_hostname = utils.get_ip_adress() # set start and stop scripts if dev_test: robot_wrapper_file = test_device_path + "/" + robot_type else: robot_wrapper_file = device_path + "/" + robot_type self.start_cmd = """ # export environment variables so that it can find the naoqi library export PYTHONPATH=/opt/aldebaran/lib/python2.7/site-packages; export LD_LIBRARY_PATH=/opt/aldebaran/lib/naoqi; python2 {robot_wrapper_file}.py --redis_ip={redis_host} --client_id={client_id}; """.format( robot_wrapper_file=robot_wrapper_file, redis_host=redis_hostname, client_id=self._client_id ) # if this robot is expected to have a virtual environment, activate it if dev_test and venv: self.start_cmd = ( """ source ~/.test_venv/bin/activate; """ + self.start_cmd ) elif venv: self.start_cmd = ( """ source ~/.venv_sic/bin/activate; """ + self.start_cmd ) self.stop_cmd = """ echo 'Killing all previous robot wrapper processes'; pkill -f "python2 {robot_wrapper_file}.py" """.format( robot_wrapper_file=robot_wrapper_file ) # stop SIC self.ssh_command(self.stop_cmd) time.sleep(0.1) self.logger.info("Checking to see if SIC is installed on remote device...") # make sure SIC is installed if self.dev_test: self.create_test_environment() elif self.bypass_install or self.check_sic_install(): self.logger.info( "SIC is already installed on Naoqi device {}! starting SIC...".format( self.device_ip ) ) else: self.logger.info( "SIC is not installed on Naoqi device {}, installing now".format( self.device_ip ) ) self.sic_install() # start SIC self.logger.info( "Starting SIC on {} with redis ip {}".format( self.robot_type, redis_hostname ) ) self.run_sic()
[docs] @abstractmethod def check_sic_install(): """ Naos and Peppers have different ways of verifying SIC is installed. """ pass
[docs] @abstractmethod def sic_install(): """ Naos and Peppers have different ways of installing SIC. """ pass
[docs] def run_sic(self): """ Starts SIC on the device. """ self.ssh_command(self.start_cmd, create_thread=True, get_pty=False) self.logger.debug( "Attempting to ping remote ComponentManager to see if it has started" ) # try to ping remote ComponentManager to see if it has started ping_tries = 3 for i in range(ping_tries): try: response = self._redis.request( self.device_ip, SICPingRequest(), timeout=self._PING_TIMEOUT, block=True ) if response == SICPongMessage(): break except TimeoutError: self.logger.debug( "ComponentManager on ip {} hasn't started yet... retrying ping {} more times".format( self.device_ip, ping_tries - 1 - i ) ) else: raise RuntimeError( "Could not start SIC on remote device\nSee sic.log for details" ) self.logger.debug("ComponentManager on ip {} has started!".format(self.device_ip))
[docs] def stop(self): for connector in self.connectors.values(): connector.stop() self.stop_event.set() self.ssh_command(self.stop_cmd)
@property def top_camera(self): return self._get_connector(NaoqiTopCamera) @property def bottom_camera(self): return self._get_connector(NaoqiBottomCamera) @property def mic(self): return self._get_connector(NaoqiMicrophone) @property def motion(self): return self._get_connector(NaoqiMotion) @property def tts(self): return self._get_connector(NaoqiTextToSpeech) @property def motion_record(self): return self._get_connector(NaoqiMotionRecorder) @property def stiffness(self): return self._get_connector(NaoqiStiffness) @property def autonomous(self): return self._get_connector(NaoqiAutonomous) @property def leds(self): return self._get_connector(NaoqiLEDs) @property def speaker(self): return self._get_connector(NaoqiSpeaker) @property def buttons(self): return self._get_connector(NaoqiButton) @property def tracker(self): return self._get_connector(NaoqiTracker) @property def look_at(self): return self._get_connector(NaoqiLookAt) def __del__(self): if hasattr(self, "logfile"): self.logfile.close()
if __name__ == "__main__": pass