Source code for sic_framework.devices.common_naoqi.pepper_motion_streamer

import threading
import time

from sic_framework import (
    SICComponentManager,
    SICConfMessage,
    SICMessage,
    SICRequest,
    utils,
)
from sic_framework.core.component_python2 import SICComponent
from sic_framework.core.connector import SICConnector
from sic_framework.devices.common_naoqi.common_naoqi_motion import NaoqiMotionTools

if utils.PYTHON_VERSION_IS_2:
    import qi
    from naoqi import ALProxy


[docs] class StartStreaming(SICRequest): """ Request to start streaming joint positions for specified joints. """
[docs] def __init__(self, joints): """ Initialize a request to start streaming positions for selected joints. For more information on joint chains, see robot documentation: - Nao: http://doc.aldebaran.com/2-8/family/nao_technical/bodyparts_naov6.html#nao-chains - Pepper: http://doc.aldebaran.com/2-8/family/pepper_technical/bodyparts_pep.html :param list[str] joints: One or more joint chains or individual joints (e.g., ["Body"] or ["LArm", "HeadYaw"]). """ super(StartStreaming, self).__init__() self.joints = joints
[docs] class StopStreaming(SICRequest): pass
[docs] class PepperMotionStream(SICMessage):
[docs] def __init__(self, joints, angles, velocity): """ :param list[str] joints: Joint names in the order of the angles. :param list[float] angles: Joint angles in radians. :param velocity """ self.joints = joints self.angles = angles self.velocity = velocity
[docs] class PepperMotionStreamerConf(SICConfMessage): """ Configuration for the Pepper motion streaming service. """
[docs] def __init__( self, stiffness=0.6, speed=0.75, stream_stiffness=0, use_sensors=False, samples_per_second=20, ): """ Note: Use stiffness, not stream_stiffness, to control the stiffness of the robot when consuming a stream of joint postions. :param float stiffness: Control how much power the robot should use to reach the given joint angles :param float speed: Set the fraction of the maximum speed used to reach the target position. :param stream_stiffness: Control the stiffness of the robot when streaming its joint positions. :param bool use_sensors: If true, sensor angles will be returned, otherwise command angles are used. :param int samples_per_second: How many times per second the joint positions are sampled. """ SICConfMessage.__init__(self) self.stiffness = stiffness self.speed = speed self.stream_stiffness = stream_stiffness self.use_sensors = use_sensors self.samples_per_second = samples_per_second
[docs] class PepperMotionStreamerService(SICComponent, NaoqiMotionTools):
[docs] def __init__(self, *args, **kwargs): SICComponent.__init__(self, *args, **kwargs) self.session = qi.Session() self.session.connect("tcp://127.0.0.1:9559") NaoqiMotionTools.__init__(self, qi_session=self.session) self.motion = self.session.service("ALMotion") self.stiffness = 0 self.samples_per_second = self.params.samples_per_second self.do_streaming = threading.Event() # A list of joint names (not chains) self.joints = self.generate_joint_list(["Body"]) self.stream_thread = threading.Thread(target=self.stream_motion) self.stream_thread.name = self.get_component_name() self.stream_thread.start()
[docs] @staticmethod def get_conf(): """ Return the default configuration for this component. :returns: Default configuration instance. :rtype: PepperMotionStreamerConf """ return PepperMotionStreamerConf()
[docs] @staticmethod def get_inputs(): return [PepperMotionStream, StartStreaming, StopStreaming]
[docs] def on_request(self, request): if request == StartStreaming: self.joints = self.generate_joint_list(request.joints) self.do_streaming.set() return SICMessage() if request == StopStreaming: self.do_streaming.clear() return SICMessage()
[docs] def on_message(self, message): """ Move the joints and base of the robot according to PepperMotionStream message """ if self.stiffness != self.params.stiffness: self.motion.setStiffnesses(self.joints, self.params.stiffness) self.stiffness = self.params.stiffness # ? Will this happen one at a time or in parallel? # move the joints self.motion.setAngles(message.joints, message.angles, self.params.speed) # also move the base of the robot x, y, theta = message.velocity self.motion.move(x, y, theta)
[docs] @staticmethod def get_output(): return PepperMotionStream
[docs] def stream_motion(self): # Set the stiffness value of a list of joint chain. # For Nao joint chains are: Head, RArm, LArm, RLeg, LLeg try: while not self._signal_to_stop.is_set(): # check both do_streaming and _signal_to_stop periodically self.do_streaming.wait(1) if not self.do_streaming.is_set(): continue if self.stiffness != 0: self.motion.setStiffnesses(self.joints, 0.0) self.stiffness = 0 angles = self.motion.getAngles( self.joints, self.params.use_sensors ) # use_sensors=False velocity = self.motion.getRobotVelocity() self.output_message(PepperMotionStream(self.joints, angles, velocity)) time.sleep(1 / float(self.samples_per_second)) except Exception as e: self.logger.exception(e) self.stop()
[docs] def stop(self, *args): """ Stop the Pepper motion streamer. """ self.session.close() self._stopped.set() super(PepperMotionStreamerService, self).stop()
[docs] class PepperMotionStreamer(SICConnector): component_class = PepperMotionStreamerService
if __name__ == "__main__": SICComponentManager([PepperMotionStreamerService])