from __future__ import print_function
import argparse
import os
from sic_framework.core.component_manager_python2 import SICComponentManager
from sic_framework.devices.common_naoqi.nao_motion_streamer import (
NaoqiMotionStreamer,
NaoqiMotionStreamerService,
)
from sic_framework.devices.naoqi_shared import *
[docs]
class Nao(Naoqi):
"""
Wrapper for NAO device to easily access its components (connectors)
"""
[docs]
def __init__(self, ip, sic_version=None, dev_test=False, test_repo=None, **kwargs):
"""
Initialize the Nao device wrapper.
:param ip: The IP address of the NAO robot.
:type ip: str
:param sic_version: The version of the SIC framework to use. Default is None which uses the same version as your local environment.
:type sic_version: str or None, optional
:param dev_test: Whether to use the device in development/testing mode. Default is False.
:type dev_test: bool, optional
:param test_repo: Path to the test repository that you want zipped and installed on the Nao. Default is None.
:type test_repo: str or None, optional
"""
super(Nao, self).__init__(
ip,
robot_type="nao",
venv=True,
device_path="/data/home/nao/.venv_sic/lib/python2.7/site-packages/sic_framework/devices",
sic_version=sic_version,
dev_test=dev_test,
test_device_path="/home/nao/sic_in_test/social-interaction-cloud/sic_framework/devices",
test_repo=test_repo,
username="nao",
passwords="nao",
**kwargs
)
[docs]
def check_sic_install(self):
"""
Runs a script on Nao to see if SIC is installed there
"""
if self.dev_test:
# if we are testing a development version, assume it is already installed properly
return True
_, stdout, _, exit_status = self.ssh_command(
"""
# if there is a virtual environment, activate it
if [ -f ~/.venv_sic/bin/activate ]; then
source ~/.venv_sic/bin/activate;
else
# SIC not already installed if venv does not exist, so exit
exit 1;
fi;
if pip list | grep -w 'social-interaction-cloud' > /dev/null 2>&1 ; then
echo "SIC already installed";
# upgrade the social-interaction-cloud package
pip install --upgrade social-interaction-cloud=={version} --no-deps
else
echo "SIC is not installed";
fi;
""".format(
version=self.sic_version
)
)
output = stdout.read().decode()
if "SIC already installed" in output:
return True
else:
return False
[docs]
def sic_install(self):
"""
Runs the install script specific to the Nao
"""
_, stdout, stderr, exit_status = self.ssh_command(
"""
if [ ! -f ~/.local/bin/virtualenv ]; then
pip install --user virtualenv;
fi;
# create virtual environment
/home/nao/.local/bin/virtualenv ~/.venv_sic;
source ~/.venv_sic/bin/activate;
# link OpenCV to the virtualenv
ln -s /usr/lib/python2.7/site-packages/cv2.so ~/.venv_sic/lib/python2.7/site-packages/cv2.so;
# install required packages
pip install social-interaction-cloud=={version} --no-deps;
pip install Pillow PyTurboJPEG numpy redis six;
if pip list | grep -w 'social-interaction-cloud' > /dev/null 2>&1; then
echo "SIC successfully installed";
fi;
""".format(
version=self.sic_version
)
)
output = stdout.read().decode()
error = stderr.read().decode()
if not "SIC successfully installed" in output:
raise Exception(
"Failed to install sic. Standard error stream from install command: {}".format(
error
)
)
[docs]
def create_test_environment(self):
"""
Creates a test environment on the Nao
To use test environment, you must pass in a repo to the device initialization. For example:
- Nao(ip, dev_test=True, test_repo=PATH_TO_REPO) OR
- Nao(ip, dev_test=True)
If you do not pass in a repo, it will assume the repo to test is already installed in a test environment on the Nao.
This function:
- checks to see if test environment exists
- if test_venv exists and no repo is passed in (self.test_repo), return True (no need to do anything)
- if test_venv exists but a new repo has been passed in:
1. uninstall old version of social-interaction-cloud on Nao
2. zip the provided repo
3. scp zip file over to nao, to 'sic_to_test' folder
4. unzip repo and install
- if test_venv does not exist:
1. check to make sure a test repo has been passed in to device initialization. If not, raise RuntimeError
2. if repo has been passed in, create a new .test_venv and install repo
"""
def init_test_venv():
"""
Initialize a new test virtual environment on Nao
"""
# start with a clean slate just to be sure
_, stdout, _, exit_status = self.ssh_command(
"""
rm -rf ~/.test_venv;
# create virtual environment
/home/nao/.local/bin/virtualenv ~/.test_venv;
source ~/.test_venv/bin/activate;
# link OpenCV to the virtualenv
ln -s /usr/lib/python2.7/site-packages/cv2.so ~/.test_venv/lib/python2.7/site-packages/cv2.so;
# install required packages and perform a clean sic installation
pip install Pillow PyTurboJPEG numpy redis six;
"""
)
# test to make sure the virtual environment was created
_, stdout, _, exit_status = self.ssh_command(
"""
source ~/.test_venv/bin/activate;
"""
)
if exit_status != 0:
raise RuntimeError("Failed to create test virtual environment")
def uninstall_old_repo():
"""
Uninstall the old version of social-interaction-cloud on Alphamini
"""
_, stdout, _, exit_status = self.ssh_command(
"""
source ~/.test_venv/bin/activate;
pip uninstall social-interaction-cloud -y
"""
)
def install_new_repo():
"""
Install the new repo on Nao
"""
self.logger.info("Zipping up dev repo")
zipped_path = utils.zip_directory(self.test_repo)
# get the basename of the repo
repo_name = os.path.basename(self.test_repo)
# create the sic_in_test folder on Nao
_, stdout, _, exit_status = self.ssh_command(
"""
cd ~;
rm -rf sic_in_test;
mkdir sic_in_test;
"""
)
self.logger.info("Transferring zip file over to Nao")
def progress4_callback(filename, size, sent, peername):
print(
"\r({}:{}) {} progress: {:.2f}%".format(
peername[0],
peername[1],
filename.decode("utf-8"),
(float(sent) / float(size)) * 100,
),
end="",
)
# scp transfer file over
with self.SCPClient(
self.ssh.get_transport(), progress4=progress4_callback
) as scp:
try:
scp.put(zipped_path, "/home/nao/sic_in_test/")
except Exception as e:
self.logger.error(
"Error transferring zip file over to Pepper: {}".format(e)
)
raise e
self.logger.info("Unzipping repo and installing on Nao")
_, stdout, _, exit_status = self.ssh_command(
"""
source ~/.test_venv/bin/activate;
cd ~/sic_in_test;
unzip {repo_name};
cd {repo_name};
pip install -e . --no-deps;
""".format(
repo_name=repo_name
)
)
# check to see if the repo was installed successfully
if exit_status != 0:
raise RuntimeError("Failed to install social-interaction-cloud")
# check to see if test environment already exists
_, stdout, _, exit_status = self.ssh_command(
"""
source ~/.test_venv/bin/activate;
"""
)
if exit_status == 0 and not self.test_repo:
self.logger.info(
"Test environment already created on Nao and no new dev repo provided... skipping test_venv setup"
)
return True
elif exit_status == 0 and self.test_repo:
self.logger.info(
"Test environment already created on Nao and new dev repo provided... uninstalling old repo and installing new one"
)
self.logger.warning(
"This process may take a minute or two... Please hold tight!"
)
uninstall_old_repo()
install_new_repo()
elif exit_status == 1 and self.test_repo:
# test environment not created, so create one
self.logger.info(
"Test environment not created on Nao and new dev repo provided... creating test environment and installing new repo"
)
self.logger.warning(
"This process may take a minute or two... Please hold tight!"
)
init_test_venv()
install_new_repo()
elif exit_status == 1 and not self.test_repo:
self.logger.error(
"No test environment present on Nao and no new dev repo provided... raising RuntimeError"
)
raise RuntimeError("Need to provide repo to create test environment")
else:
self.logger.error(
"Activating test environment on Nao resulted in unknown exit status: {}".format(
exit_status
)
)
raise RuntimeError(
"Unknown error occurred while creating test environment on Nao"
)
[docs]
def motion_streaming(self, input_source=None):
return self._get_connector(NaoqiMotionStreamer, input_source=input_source)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--redis_ip", type=str, required=True, help="IP address where Redis is running"
)
parser.add_argument(
"--client_id", type=str, required=True, help="Client that is using this device"
)
parser.add_argument("--redis_pass", type=str, help="The redis password")
args = parser.parse_args()
os.environ["DB_IP"] = args.redis_ip
if args.redis_pass:
os.environ["DB_PASS"] = args.redis_pass
nao_components = shared_naoqi_components + [NaoqiMotionStreamerService]
SICComponentManager(nao_components, client_id=args.client_id, name="Nao")