Source code for sic_framework.devices.pepper

from __future__ import print_function

import argparse
import os
import subprocess

from sic_framework.core.component_manager_python2 import SICComponentManager
from sic_framework.devices.common_naoqi.naoqi_camera import (
    DepthPepperCamera,
    DepthPepperCameraSensor,
    StereoPepperCamera,
    StereoPepperCameraSensor,
)
from sic_framework.devices.common_naoqi.pepper_motion_streamer import (
    PepperMotionStreamer,
    PepperMotionStreamerService,
)
from sic_framework.devices.common_naoqi.pepper_tablet import (
    NaoqiTablet,
    NaoqiTabletComponent,
)
from sic_framework.devices.common_naoqi.pepper_top_tactile_sensor import (
    PepperTopTactile,
    PepperTopTactileSensor,
)
from sic_framework.devices.device import SICLibrary
from sic_framework.devices.naoqi_shared import *

# this is where dependency binaries are downloaded to on the Pepper machine
_LIB_DIRECTORY = "/home/nao/sic_framework_2/social-interaction-cloud-main/lib"

_LIBS_TO_INSTALL = [
    SICLibrary(
        "redis",
        lib_path="/home/nao/sic_framework_2/social-interaction-cloud-main/lib/redis",
        lib_install_cmd="pip install --user redis-3.5.3-py2.py3-none-any.whl",
    ),
    SICLibrary(
        "PyTurboJPEG",
        lib_path="/home/nao/sic_framework_2/social-interaction-cloud-main/lib/libturbojpeg/PyTurboJPEG-master",
        lib_install_cmd="pip install --user .",
    ),
    SICLibrary(
        "Pillow",
        download_cmd="curl -O https://files.pythonhosted.org/packages/3a/ec/82d468c17ead94734435c7801ec77069926f337b6aeae1be0a07a24bb024/Pillow-6.2.2-cp27-cp27mu-manylinux1_i686.whl",
        lib_path=_LIB_DIRECTORY,
        lib_install_cmd="pip install --user Pillow-6.2.2-cp27-cp27mu-manylinux1_i686.whl",
    ),
    SICLibrary(
        "six",
        download_cmd="curl -O https://files.pythonhosted.org/packages/b7/ce/149a00dd41f10bc29e5921b496af8b574d8413afcd5e30dfa0ed46c2cc5e/six-1.17.0-py2.py3-none-any.whl",
        lib_path=_LIB_DIRECTORY,
        lib_install_cmd="pip install --user six-1.17.0-py2.py3-none-any.whl",
    ),
    SICLibrary(
        "numpy",
        download_cmd="curl -O https://files.pythonhosted.org/packages/fd/54/aee23cfc1cdca5064f9951eefd3c5b51cff0cecb37965d4910779ef6b792/numpy-1.16.6-cp27-cp27mu-manylinux1_i686.whl",
        req_version="1.16",
        lib_path=_LIB_DIRECTORY,
        lib_install_cmd="pip install --user numpy-1.16.6-cp27-cp27mu-manylinux1_i686.whl",
    ),
]


[docs] class Pepper(Naoqi): """ Wrapper for Pepper device to easily access its components (connectors) """
[docs] def __init__( self, ip, sic_version=None, stereo_camera_conf=None, depth_camera_conf=None, pepper_motion_conf=None, **kwargs ): super().__init__( ip, robot_type="pepper", venv=False, username="nao", passwords=["pepper", "nao"], # device path is where this script is located on the actual Pepper machine device_path="/home/nao/sic_framework_2/social-interaction-cloud-main/sic_framework/devices", test_device_path="/home/nao/sic_in_test/social-interaction-cloud/sic_framework/devices", sic_version=sic_version, **kwargs ) self.configs[StereoPepperCamera] = stereo_camera_conf self.configs[DepthPepperCamera] = depth_camera_conf self.configs[PepperMotionStreamer] = pepper_motion_conf
[docs] def check_sic_install(self): """ Runs a script on Pepper to see if the sic_framework folder is there. """ _, stdout, _, exit_status = self.ssh_command( """ pip show social-interaction-cloud; """ ) remote_stdout = stdout.read().decode() if exit_status != 0: # sic is not installed return False elif "sic_in_test" in remote_stdout: # test version of SIC is installed self.logger.info( "Test version of SIC is installed, uninstalling and reinstalling latest version" ) return False else: self.logger.info("SIC is already installed, checking versions") # get the version of SIC installed on Pepper pepper_version = "" for line in remote_stdout.splitlines(): print(line) if line.startswith("Version:"): pepper_version = line.split(":")[1].strip() break self.logger.info("SIC version on Pepper: {}".format(pepper_version)) # if you don't pass sic_version, it will grab your local version self.logger.info("SIC local version: {}".format(self.sic_version)) if pepper_version == self.sic_version: self.logger.info("SIC already installed on Pepper and versions match") return True else: self.logger.warning( "SIC is installed on Pepper but does not match the local version or the version you passed! Reinstalling SIC on Pepper" ) self.logger.warning( "(Check to make sure you also have the latest version of SIC installed!)" ) return False
[docs] def sic_install(self): """ Installs SIC on the Pepper This function: 1. gets rid of old directories for clean install 2. curl github repository 3. pip install --no-deps git repo 4. install dependencies from _LIBS_TO_INSTALL """ _, stdout, stderr, exit_status = self.ssh_command( """ rm -rf /home/nao/framework; if [ -d /home/nao/sic_framework_2 ]; then rm -rf /home/nao/sic_framework_2; fi; mkdir /home/nao/sic_framework_2; cd /home/nao/sic_framework_2; curl -L -o sic_repo.zip https://github.com/Social-AI-VU/social-interaction-cloud/archive/refs/tags/v{version}.zip; unzip sic_repo.zip; mv social-interaction-cloud-{version} social-interaction-cloud-main; cd social-interaction-cloud-main; pip install --user -e . --no-deps; if pip list | grep -w 'social-interaction-cloud' > /dev/null 2>&1 ; then echo "SIC successfully installed" fi; """.format( version=self.sic_version ) ) if not "SIC successfully installed" in stdout.read().decode(): raise Exception( "Failed to install sic. Standard error stream from install command: {}".format( stderr.read().decode() ) ) self.logger.info("Installing package dependencies...") _, stdout_pip_freeze, _, exit_status = self.ssh_command("pip freeze") installed_libs = stdout_pip_freeze.readlines() for lib in _LIBS_TO_INSTALL: self.logger.info("Checking if library {} is installed...".format(lib.name)) if not self.check_if_lib_installed(installed_libs, lib): self.logger.info( "Library {} is NOT installed, installing now...".format(lib.name) ) self.install_lib(lib)
[docs] def create_test_environment(self): """ Creates a test environment on the Pepper To use test environment, you must pass in a repo to the device initialization. For example: - Pepper(ip, dev_test=True, test_repo=PATH_TO_REPO) OR - Pepper(ip, dev_test=True) If you do not pass in a repo, it will assume the repo to test is already installed in a test environment on the Pepper. Instead of creating a virtual environment, we will just copy the repo over to the test directory and install from there. """ def uninstall_old_repo(): """ Uninstall the old version of social-interaction-cloud on Alphamini """ _, stdout, _, exit_status = self.ssh_command( """ pip uninstall social-interaction-cloud -y """ ) def install_new_repo(): """ Install the new repo on Pepper """ # zip up dev repo and scp over self.logger.info("Zipping up dev repo") zipped_path = utils.zip_directory(self.test_repo) zipped_path = self.test_repo + ".zip" self.logger.info("Zipped path: {}".format(zipped_path)) # get the basename of the repo repo_name = os.path.basename(self.test_repo) self.logger.info("Removing old sic_in_test folder on Pepper") # create the sic_in_test folder on Nao _, stdout, _, exit_status = self.ssh_command( """ cd ~; rm -rf sic_in_test; mkdir sic_in_test; """.format( repo_name=repo_name ) ) self.logger.info("Transferring zip file over to Pepper") def progress4_callback(filename, size, sent, peername): print( "\r({}:{}) {} progress: {:.2f}%".format( peername[0], peername[1], filename.decode("utf-8"), (float(sent) / float(size)) * 100, ), end="", ) # scp transfer file over with self.SCPClient( self.ssh.get_transport(), progress4=progress4_callback ) as scp: try: scp.put(zipped_path, "/home/nao/sic_in_test/") except Exception as e: self.logger.error( "Error transferring zip file over to Pepper: {}".format(e) ) raise e self.logger.info("Unzipping repo and installing on Pepper") _, stdout, _, exit_status = self.ssh_command( """ cd ~/sic_in_test; unzip {repo_name}; cd {repo_name}; pip install --user -e . --no-deps; """.format( repo_name=repo_name ) ) # check to see if the repo was installed successfully _, stdout, _, exit_status = self.ssh_command( """ pip show social-interaction-cloud; """ ) if exit_status != 0: raise RuntimeError("Failed to install social-interaction-cloud") if self.test_repo: self.logger.info("Installing test repo on Pepper") self.logger.warning( "This process may take a minute or two... Please hold tight!" ) uninstall_old_repo() install_new_repo() else: self.logger.info( "No test repo provided, assuming test repo is already installed" ) return True
@property def stereo_camera(self): return self._get_connector(StereoPepperCamera) @property def depth_camera(self): return self._get_connector(DepthPepperCamera) @property def tablet_display_url(self): return self._get_connector(NaoqiTablet)
[docs] def motion_streaming(self, input_source=None): return self._get_connector(PepperMotionStreamer, input_source=input_source)
@property def tactile_sensor(self): return self._get_connector(PepperTopTactile)
if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( "--redis_ip", type=str, required=True, help="IP address where Redis is running" ) parser.add_argument( "--client_id", type=str, required=True, help="Client that is using this device" ) args = parser.parse_args() os.environ["DB_IP"] = args.redis_ip pepper_components = shared_naoqi_components + [ # NaoqiLookAtComponent, NaoqiTabletComponent, DepthPepperCameraSensor, StereoPepperCameraSensor, PepperMotionStreamerService, PepperTopTactileSensor, ] SICComponentManager(pepper_components, client_id=args.client_id, name="Pepper")