Source code for sic_framework.devices.device

from __future__ import print_function

import os.path
import tarfile
import tempfile
import threading
import time

from sic_framework.core import sic_logging, utils
from sic_framework.core.connector import SICConnector
from sic_framework.core.sic_redis import SICRedis


[docs] class SICLibrary(object): """ A library to be installed on a remote device. """
[docs] def __init__( self, name, lib_path="", download_cmd="", req_version=None, lib_install_cmd="" ): self.name = name self.lib_path = lib_path self.download_cmd = download_cmd self.req_version = req_version self.lib_install_cmd = lib_install_cmd
[docs] def exclude_pyc(tarinfo): if tarinfo.name.endswith(".pyc"): return None else: return tarinfo
[docs] class SICDevice(object): """ Abstract class to facilitate property initialization for SICConnector properties. This way components of a device can easily be used without initializing all device components manually. """ def __new__(cls, *args, **kwargs): """ Choose specific imports dependend on the type of device. Reasoning: Alphamini does not support these imports; they are only needed for remotely installing packages on robots from the local machine """ instance = super(SICDevice, cls).__new__(cls) if cls.__name__ in ("Nao", "Pepper", "Alphamini"): import six if six.PY3: global pathlib, paramiko, SCPClient import pathlib import paramiko from scp import SCPClient return instance
[docs] def __init__(self, ip, sic_version=None, username=None, passwords=None, port=22): """ Connect to the device and ensure an up to date version of the framework is installed :param ip: the ip adress of the device :param username: the ssh login name :param passwords: the (list) of passwords to use """ self._redis = SICRedis() self.device_ip = ip self._client_id = utils.get_ip_adress() self.logger = sic_logging.get_sic_logger( name="{}DeviceManager".format(self.__class__.__name__), client_id=self._client_id ) try: self.set_reservation() except Exception as e: self.logger.error("Error setting reservation: {}".format(e)) raise e self.connectors = dict() self.configs = dict() self.port = port self._PING_TIMEOUT = 3 self.sic_version = sic_version self.stop_event = threading.Event() self.SCPClient = None # if no sic_version is specified, use the same version of the local sic if self.sic_version is None: from importlib.metadata import version self.sic_version = version("social-interaction-cloud") try: self.SCPClient = SCPClient except: pass self.logger.info("Initializing device with ip: {ip}".format(ip=ip)) if username is not None: if not isinstance(passwords, list): passwords = [passwords] if not utils.ping_server(self.device_ip, port=self.port, timeout=3): raise RuntimeError( "Could not connect to device on ip {}. Please check if it is reachable.".format( self.device_ip ) ) self.ssh = paramiko.SSHClient() self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # allow_agent=False, look_for_keys=False to disable asking for keyring (just use the password) for p in passwords: try: self.ssh.connect( self.device_ip, port=self.port, username=username, password=p, timeout=3, allow_agent=False, look_for_keys=False, ) break except ( paramiko.ssh_exception.AuthenticationException, paramiko.ssh_exception.BadAuthenticationType, ): pass else: raise paramiko.ssh_exception.AuthenticationException( "Could not authenticate to device, please check ip adress and/or credentials. (Username: {} Passwords: {})".format( username, passwords ) )
[docs] def set_reservation(self): """ Set a reservation for the device to prevent other users from using it. """ self.logger.info("Attempting to reserve device {} for client {}".format(self.device_ip, self._client_id)) # skip reservation for localhost if self.device_ip == "localhost" or self.device_ip == "127.0.0.1": return True existing_reservation = self._redis.get_reservation(self.device_ip) if existing_reservation is not None: self.logger.warning("Device {} is already reserved by another client".format(self.device_ip)) self.logger.info("Checking if the other client is still connected...") # get the client ID who has reserved the device other_client_id = existing_reservation self.logger.debug("Other client ID: {}".format(other_client_id)) # check if the other client is still connected if other_client_id == self._client_id: # no need to reserve the device again self.logger.info("Device {} is already reserved by this client".format(self.device_ip)) return True elif self._redis.ping_client(other_client_id) is False: self.logger.info("The other client is not connected to SIC, releasing the device") self._redis.remove_client(other_client_id) else: raise Exception("Device {} is already reserved by another client".format(self.device_ip)) self.logger.info("Reserving device {}".format(self.device_ip)) reservation_result = self._redis.set_reservation(self.device_ip, self._client_id) if reservation_result != 1: raise Exception("Reservation for device {} failed: {}".format(self.device_ip, reservation_result)) else: self.logger.info("Device {} has been reserved for this client".format(self.device_ip))
[docs] def get_last_modified(self, root, paths): last_modified = 0 for file_or_folder in paths: file_or_folder = root + file_or_folder if os.path.isdir(file_or_folder): sub_last_modified = max( os.path.getmtime(root) for root, _, _ in os.walk(file_or_folder) ) last_modified = max(sub_last_modified, last_modified) elif os.path.isfile(file_or_folder): last_modified = max(os.path.getmtime(file_or_folder), last_modified) assert last_modified > 0, "Could not find any files to transfer." last_modified = time.ctime(last_modified).replace(" ", "_").replace(":", "-") return last_modified
[docs] def auto_install(self): """ Install the SICFramework on the device. :return: """ # Find framework root folder root = str(pathlib.Path(__file__).parent.parent.parent.resolve()) # assert os.path.basename(root) == "framework", "Could not find SIC 'framework' directory." # List of selected files and directories to be zipped and transferred selected_files = [ "/setup.py", "/conf", "/lib", "/sic_framework/core", "/sic_framework/devices", "/sic_framework/__init__.py", ] last_modified = self.get_last_modified(root, selected_files) # Create a signature for the framework framework_signature = "~/framework/sic_version_signature_{}_{}".format( utils.get_ip_adress(), last_modified ) # Check if the framework signature file exists stdin, stdout, stderr = self.ssh.exec_command( "ls {}".format(framework_signature) ) file_exists = len(stdout.readlines()) > 0 if file_exists: self.logger.info("Up to date framework is installed on the remote device.") return # prefetch slow pip freeze command _, stdout_pip_freeze, _ = self.ssh.exec_command("pip freeze") def progress(filename, size, sent): self.logger.info( "\r {} progress: {}".format( filename.decode("utf-8"), round(float(sent) / float(size) * 100, 2) ), end="", ) self.logger.info("Copying framework to the remote device.") with SCPClient(self.ssh.get_transport(), progress=progress) as scp: # Copy the framework to the remote computer with tempfile.NamedTemporaryFile( suffix="_sic_files.tar.gz", delete=False ) as f: with tarfile.open(fileobj=f, mode="w:gz") as tar: for file in selected_files: tar.add(root + file, arcname=file, filter=exclude_pyc) f.flush() self.ssh.exec_command("mkdir ~/framework") scp.put(f.name, remote_path="~/framework/sic_files.tar.gz") self.logger.info() # newline after progress bar # delete=False for windows compatibility, must delete file manually os.unlink(f.name) # Unzip the file on the remote server # use --touch to prevent files from having timestamps of 1970 which intefere with python caching stdin, stdout, stderr = self.ssh.exec_command( "cd framework && tar --touch -xvf sic_files.tar.gz" ) err = stderr.readlines() if len(err) > 0: self.logger.error("".join(err)) raise RuntimeError( "\n\nError while extracting library on remote device. Please consult manual installation instructions." ) # Remove the zipped file self.ssh.exec_command("rm ~/framework/sic_files.tar.gz") # Check and/or install the framework and libraries on the remote computer self.logger.info("Checking if libraries are installed on the remote device.") # stdout_pip_freeze is prefetched above because it is slow # remote_libs = stdout_pip_freeze.readlines() # for lib in _LIBS_TO_INSTALL: # if not lib.check_if_lib_installed(remote_libs): # lib.install(self.ssh) # Remove signatures from the remote computer # add own signature to the remote computer self.ssh.exec_command("rm ~/framework/sic_version_signature_*") self.ssh.exec_command("touch {}".format(framework_signature))
[docs] def ssh_command(self, command, create_thread=False, **kwargs): """ Executes the given command and logs any errors from the SSH session. Args: command (str): command to run on ssh client **kwargs: Additional keyword arguments to pass to ssh.exec_command (e.g., get_pty=False, timeout=30) Returns: tuple: (stdin, stdout, stderr) file-like objects from the SSH session Raises: Various SSH exceptions if connection fails """ try: self.logger.debug("Executing command: {command}".format(command=command)) stdin, stdout, stderr = self.ssh.exec_command(command, **kwargs) if create_thread: self.logger.debug("Creating thread to monitor remote command") def monitor_call(): # check if command has exited or if there is standard output while not stdout.channel.exit_status_ready(): if stdout.channel.recv_ready(): line = stdout.channel.recv(1024).decode("utf-8") self.logger.debug(line) else: # get exit status of the command status = stdout.channel.recv_exit_status() # log exit status and output self.logger.debug( "SSH command exited with status: {status}".format( status=status ) ) self.logger.debug( "SSH command output: {output}".format( output=stdout.read().decode("utf-8") ) ) self.logger.debug( "SSH command error: {error}".format( error=stderr.read().decode("utf-8") ) ) # if remote thread exits before local main thread, report to user. if ( threading.main_thread().is_alive() and not self.stop_event.is_set() ): raise RuntimeError( "Remote SIC program has stopped unexpectedly.\nSee sic.log for details" ) thread = threading.Thread(target=monitor_call) thread.name = "remote_SIC_process_monitor" thread.start() return thread else: # Check stderr for any errors status = stdout.channel.recv_exit_status() error_output = stderr.read().decode("utf-8") if error_output: self.logger.debug( "SSH command produced errors: {error_output}".format( error_output=error_output ) ) return stdin, stdout, stderr, status except paramiko.AuthenticationException as e: self.logger.error( "Authentication failed when trying to execute ssh command: {e}".format( e=e ) ) raise except paramiko.SSHException as e: self.logger.error( "SSH exception occurred when trying to execute command: {e}".format(e=e) ) raise except Exception as e: self.logger.error( "Unexpected error while executing ssh command: {e}".format(e=e) ) raise
[docs] def check_if_lib_installed(self, pip_freeze, lib): """ Check to see if a python library name + version is in the 'pip freeze' output of a remote device. """ for cur_lib in pip_freeze: cur_lib = cur_lib.replace("\n", "") cur_lib_name, cur_lib_ver = cur_lib.split("==") if lib.name == cur_lib_name: self.logger.debug( "Found package: {} with version {}".format( cur_lib_name, cur_lib_ver ) ) # check to make sure version matches if there is a version requirement if lib.req_version: if lib.req_version in cur_lib_ver: self.logger.debug( "{} version matches: remote {} == required {}".format( lib.name, cur_lib_ver, lib.req_version ) ) return True else: self.logger.debug( "{} version mismatch: remote {} != required {}".format( lib.name, cur_lib_ver, lib.req_version ) ) return False return True return False
[docs] def install_lib(self, lib): """ Download and install Python library on this remote device """ self.logger.info("Installing {} on remote device ".format(lib.name)) # download the binary first if necessary, as is the case with Pepper if lib.download_cmd: stdin, stdout, stderr, exit_status = self.ssh_command( """cd {} && {} """.format(lib.lib_path, lib.download_cmd) ) if exit_status != 0: err = "".join(stderr.readlines()) self.logger.error( "Command: cd {} && {} \n Gave error:".format( lib.lib_path, lib.download_cmd ) ) self.logger.error(err) raise RuntimeError("Error while downloading library on remote device.") # install the library stdin, stdout, stderr, exit_status = self.ssh_command( "cd {} && {}".format(lib.lib_path, lib.lib_install_cmd) ) if "Successfully installed" not in stdout.read().decode(): err = "".join(stderr.readlines()) self.logger.error( "Command: cd {} && {} \n Gave error:".format( lib.lib_path, lib.lib_install_cmd ) ) self.logger.error(err) raise RuntimeError( "Error while installing library on remote device. Please consult manual installation instructions." ) else: self.logger.info("Successfully installed {} package".format(lib.name))
[docs] def _get_connector(self, component_connector, **kwargs): """ Get the active connection the component, or initialize it if it is not yet connected to. :param component_connector: The component connector class to start, e.g. NaoCamera :return: SICConnector """ assert issubclass( component_connector, SICConnector ), "Component connector must be a SICConnector" if component_connector not in self.connectors: conf = self.configs.get(component_connector, None) try: self.connectors[component_connector] = component_connector( self.device_ip, conf=conf, **kwargs ) except TimeoutError as e: raise TimeoutError( "Could not connect to {} on device {}.".format( component_connector.component_class.get_component_name(), self.device_ip, ) ) return self.connectors[component_connector]
if __name__ == "__main__": ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # allow_agent=False, look_for_keys=False to disable asking for keyring (just use the password) ssh.connect( "192.168.0.151", port=22, username="nao", password="nao", timeout=5, allow_agent=False, look_for_keys=False, ) # Unzip the file on the remote server stdin, stdout, stderr = ssh.exec_command("apt update") for i in range(10): line = stdout.readline() print(line) print(stderr.readline()) # empty line means command is done if len(line) == 0: break time.sleep(1)