from __future__ import print_function
from abc import ABCMeta, abstractmethod
import os
import posixpath
import shlex
from sic_framework.core import sic_redis, utils
from sic_framework.core.message_python2 import SICPingRequest, SICPongMessage, SICStopServerRequest
from sic_framework.core.utils import MAGIC_STARTED_COMPONENT_MANAGER_TEXT
from sic_framework.devices.common_naoqi.nao_motion_streamer import *
from sic_framework.devices.common_naoqi.naoqi_autonomous import *
from sic_framework.devices.common_naoqi.naoqi_button import (
NaoqiButton,
NaoqiButtonSensor,
)
from sic_framework.devices.common_naoqi.naoqi_camera import *
from sic_framework.devices.common_naoqi.naoqi_leds import *
from sic_framework.devices.common_naoqi.naoqi_lookat import (
NaoqiLookAt,
NaoqiLookAtComponent,
)
from sic_framework.devices.common_naoqi.naoqi_microphone import *
from sic_framework.devices.common_naoqi.naoqi_motion import *
from sic_framework.devices.common_naoqi.naoqi_motion_recorder import *
from sic_framework.devices.common_naoqi.naoqi_speakers import *
from sic_framework.devices.common_naoqi.naoqi_stiffness import *
from sic_framework.devices.common_naoqi.naoqi_text_to_speech import *
from sic_framework.devices.common_naoqi.naoqi_tracker import (
NaoqiTracker,
NaoqiTrackerActuator,
)
from sic_framework.devices.device import SICDeviceManager
shared_naoqi_components = [
NaoqiTopCameraSensor,
NaoqiBottomCameraSensor,
NaoqiMicrophoneSensor,
NaoqiMotionActuator,
NaoqiTextToSpeechActuator,
NaoqiMotionRecorderActuator,
NaoqiStiffnessActuator,
NaoqiAutonomousActuator,
NaoqiLEDsActuator,
NaoqiSpeakerComponent,
NaoqiButtonSensor,
NaoqiTrackerActuator,
NaoqiLookAtComponent,
]
[docs]
class Naoqi(SICDeviceManager):
__metaclass__ = ABCMeta
[docs]
def __init__(
self,
ip,
robot_type,
venv,
device_path,
sic_version=None,
dev_test=False,
test_device_path="",
test_repo=None,
bypass_install=False,
top_camera_conf=None,
bottom_camera_conf=None,
mic_conf=None,
motion_conf=None,
tts_conf=None,
motion_record_conf=None,
motion_stream_conf=None,
stiffness_conf=None,
speaker_conf=None,
lookat_conf=None,
username=None,
passwords=None,
):
super().__init__(
ip,
sic_version=sic_version,
username=username,
passwords=passwords,
)
# Set the component configs
self.configs[NaoqiTopCamera] = top_camera_conf
self.configs[NaoqiBottomCamera] = bottom_camera_conf
self.configs[NaoqiMicrophone] = mic_conf
self.configs[NaoqiMotion] = motion_conf
self.configs[NaoqiTextToSpeech] = tts_conf
self.configs[NaoqiMotionRecorder] = motion_record_conf
self.configs[NaoqiMotionStreamer] = motion_stream_conf
self.configs[NaoqiStiffness] = stiffness_conf
self.configs[NaoqiSpeaker] = speaker_conf
self.configs[NaoqiLookAt] = lookat_conf
self.robot_type = robot_type
self.dev_test = dev_test
self.test_repo = test_repo
self.bypass_install = bypass_install
assert robot_type in [
"nao",
"pepper",
], "Robot type must be either 'nao' or 'pepper'"
redis_hostname, _ = sic_redis.get_redis_db_ip_password()
if redis_hostname == "127.0.0.1" or redis_hostname == "localhost":
# get own public ip address for the device to use
redis_hostname = utils.get_ip_adress()
# set start and stop scripts
if dev_test:
robot_wrapper_file = test_device_path + "/" + robot_type
else:
robot_wrapper_file = device_path + "/" + robot_type
self.start_cmd = """
# export environment variables so that it can find the naoqi library
export PYTHONPATH=/opt/aldebaran/lib/python2.7/site-packages;
export LD_LIBRARY_PATH=/opt/aldebaran/lib/naoqi;
python2 {robot_wrapper_file}.py --redis_ip={redis_host} --client_id={client_id};
""".format(
robot_wrapper_file=robot_wrapper_file, redis_host=redis_hostname, client_id=self._client_id
)
# if this robot is expected to have a virtual environment, activate it
if dev_test and venv:
self.start_cmd = (
"""
source ~/.test_venv/bin/activate;
"""
+ self.start_cmd
)
elif venv:
self.start_cmd = (
"""
source ~/.venv_sic/bin/activate;
"""
+ self.start_cmd
)
self.stop_cmd = """
echo 'Killing all previous robot wrapper processes';
# pkill returns 1 when no process matched; treat that as success.
pkill -f "python2 {robot_wrapper_file}.py" || true
""".format(
robot_wrapper_file=robot_wrapper_file
)
# stop SIC
self.ssh_command(self.stop_cmd)
time.sleep(0.1)
self.logger.info("Checking to see if SIC is installed on remote device...")
# make sure SIC is installed
if self.dev_test:
self.create_test_environment()
elif self.bypass_install or self.check_sic_install():
self.logger.info(
"SIC is already installed on Naoqi device {}! starting SIC...".format(
self.device_ip
)
)
else:
self.logger.info(
"SIC is not installed on Naoqi device {}, installing now".format(
self.device_ip
)
)
self.sic_install()
# start SIC
self.logger.info(
"Starting SIC on {} with redis ip {}".format(
self.robot_type, redis_hostname
)
)
self.run_sic()
[docs]
@abstractmethod
def check_sic_install():
"""
Naos and Peppers have different ways of verifying SIC is installed.
"""
pass
[docs]
@abstractmethod
def sic_install():
"""
Naos and Peppers have different ways of installing SIC.
"""
pass
[docs]
def run_sic(self):
"""
Starts SIC on the device.
"""
self.ssh_command(self.start_cmd, create_thread=True, get_pty=False)
self.logger.debug(
"Attempting to ping remote ComponentManager to see if it has started"
)
# try to ping remote ComponentManager to see if it has started
ping_tries = 3
for i in range(ping_tries):
try:
response = self._redis.request(
self.device_ip, SICPingRequest(), timeout=self._PING_TIMEOUT, block=True
)
if response == SICPongMessage():
break
except TimeoutError:
self.logger.debug(
"ComponentManager on ip {} hasn't started yet... retrying ping {} more times".format(
self.device_ip, ping_tries - 1 - i
)
)
else:
raise RuntimeError(
"Could not start SIC on remote device\nSee sic.log for details"
)
self.logger.debug("ComponentManager on ip {} has started!".format(self.device_ip))
[docs]
def stop_device(self):
"""
Stops the device and all its components.
Makes sure the process is killed and the device is stopped.
"""
# Mark that we're intentionally stopping the remote process so the
# SSH monitor thread doesn't report it as "unexpected".
try:
self.stop_event.set()
except Exception:
pass
# send StopRequest to ComponentManager
self._redis.request(self.device_ip, SICStopServerRequest())
# make sure the process is killed
stdin, stdout, stderr, status = self.ssh_command(self.stop_cmd)
# Some SSH servers/channels may not provide an exit status (-1). If there is
# no stderr output, treat it as a best-effort stop during shutdown.
err = ""
try:
err = stderr.read().decode("utf-8")
except Exception:
err = ""
if status not in (0, 1) and not (status == -1 and not err.strip()):
self.logger.error("Failed to stop device, exit code: {status}".format(status=status))
self.logger.error(err)
[docs]
def upload_file(self, local_path, remote_path):
"""
Upload a file to the Naoqi device using SCP.
:param local_path: Path to the local file to upload.
:type local_path: str
:param remote_path: Destination path on the robot. Must be under /home/nao.
:type remote_path: str
:raises ValueError: If the local file does not exist or the remote path is invalid.
:raises RuntimeError: If SCP is unavailable, the SSH connection is missing, or the upload fails.
"""
if not local_path or not isinstance(local_path, str):
raise ValueError("A valid local_path string is required.")
if not remote_path or not isinstance(remote_path, str):
raise ValueError("A valid remote_path string is required.")
if not os.path.isfile(local_path):
raise ValueError("Local path '{}' does not exist or is not a file.".format(local_path))
if not remote_path.startswith("/home/nao"):
raise ValueError("Destination path must start with '/home/nao'. Provided: '{}'".format(remote_path))
if not self.SCPClient:
raise RuntimeError("SCPClient is not available. Cannot upload file.")
if not hasattr(self, "ssh"):
raise RuntimeError("SSH connection has not been initialized. Cannot upload file.")
remote_is_dir = remote_path.endswith("/") or remote_path == "/home/nao"
if remote_is_dir:
remote_dir = remote_path
remote_file_path = posixpath.join(remote_dir, os.path.basename(local_path))
else:
remote_dir = posixpath.dirname(remote_path) or "/home/nao"
remote_file_path = remote_path
if not remote_dir.startswith("/home/nao"):
raise ValueError("Destination directory must remain within '/home/nao'. Computed: '{}'".format(remote_dir))
mkdir_cmd = "mkdir -p {}".format(shlex.quote(remote_dir))
_, _, stderr, status = self.ssh_command(mkdir_cmd)
if status != 0:
error_output = stderr.read().decode("utf-8")
raise RuntimeError(
"Failed to create remote directory '{}': {}".format(remote_dir, error_output.strip())
)
# ensure destination file does not already exist
check_cmd = "test -e {}".format(shlex.quote(remote_file_path))
_, _, _, status = self.ssh_command(check_cmd)
if status == 0:
self.logger.info(
"Skipping upload: destination file '%s' already exists on the Naoqi device.",
remote_file_path,
)
return
try:
with self.SCPClient(self.ssh.get_transport()) as scp:
destination = remote_dir if remote_is_dir else remote_file_path
scp.put(local_path, destination)
except Exception as exc:
raise RuntimeError(
"Failed to upload '{}' to '{}': {}".format(local_path, remote_file_path, exc)
)
self.logger.info("Uploaded '{}' to '{}' on the Naoqi device.".format(local_path, remote_file_path))
@property
def top_camera(self):
return self._get_connector(NaoqiTopCamera)
@property
def bottom_camera(self):
return self._get_connector(NaoqiBottomCamera)
@property
def mic(self):
return self._get_connector(NaoqiMicrophone)
@property
def motion(self):
return self._get_connector(NaoqiMotion)
@property
def tts(self):
return self._get_connector(NaoqiTextToSpeech)
@property
def motion_record(self):
return self._get_connector(NaoqiMotionRecorder)
@property
def stiffness(self):
return self._get_connector(NaoqiStiffness)
@property
def autonomous(self):
return self._get_connector(NaoqiAutonomous)
@property
def leds(self):
return self._get_connector(NaoqiLEDs)
@property
def speaker(self):
return self._get_connector(NaoqiSpeaker)
@property
def buttons(self):
return self._get_connector(NaoqiButton)
@property
def tracker(self):
return self._get_connector(NaoqiTracker)
@property
def look_at(self):
return self._get_connector(NaoqiLookAt)
def __del__(self):
if hasattr(self, "logfile"):
self.logfile.close()
if __name__ == "__main__":
pass