from sic_framework import (
SICActuator,
SICComponentManager,
SICConfMessage,
SICMessage,
SICRequest,
utils,
)
from sic_framework.core.connector import SICConnector
if utils.PYTHON_VERSION_IS_2:
import qi
[docs]
class NaoBlinkingRequest(SICRequest):
"""
Request to enable or disable autonomous blinking.
:param bool value: True to enable, False to disable autonomous blinking.
"""
[docs]
def __init__(self, value):
super(NaoBlinkingRequest, self).__init__()
self.value = value
[docs]
class NaoBackgroundMovingRequest(SICRequest):
"""
Request to enable or disable autonomous background movement.
:param bool value: True to enable, False to disable background movement.
"""
[docs]
def __init__(self, value):
super(NaoBackgroundMovingRequest, self).__init__()
self.value = value
[docs]
class NaoListeningMovementRequest(SICRequest):
"""
Request to enable or disable listening movements (small motions indicating attention).
:param bool value: True to enable, False to disable listening movements.
"""
[docs]
def __init__(self, value):
super(NaoListeningMovementRequest, self).__init__()
self.value = value
[docs]
class NaoSpeakingMovementRequest(SICRequest):
"""
Request to enable or disable autonomous speaking movements.
:param bool value: True to enable, False to disable.
:param str mode: Speaking movement mode ("random" or "contextual").
:raises ValueError: If mode is not one of the supported options. see http://doc.aldebaran.com/2-5/naoqi/interaction/autonomousabilities/alspeakingmovement.html#speaking-movement-mode
"""
[docs]
def __init__(self, value, mode=None):
super(NaoSpeakingMovementRequest, self).__init__()
self.value = value
self.mode = mode
[docs]
class NaoRestRequest(SICRequest):
pass
[docs]
class NaoWakeUpRequest(SICRequest):
pass
[docs]
class NaoSetAutonomousLifeRequest(SICRequest):
"""
Request to set the state of the Autonomous Life module.
For further details, see: http://doc.aldebaran.com/2-5/ref/life/state_machine_management.html#autonomouslife-states
:param str state: Target state ("solitary", "interactive", "safeguard", or "disabled").
"""
[docs]
def __init__(self, state="solitary"):
super(NaoSetAutonomousLifeRequest, self).__init__()
self.state = state
[docs]
class NaoBasicAwarenessRequest(SICRequest):
"""
Request to enable or disable basic awareness and configure its parameters.
:param bool value: True to enable, False to disable basic awareness.
:param list[tuple[str, bool]] stimulus_detection: Optional list of (stimulus_name, enable) tuples.
:param str engagement_mode: Engagement mode setting.
:param str tracking_mode: Tracking mode setting.
"""
[docs]
def __init__(
self, value, stimulus_detection=None, engagement_mode=None, tracking_mode=None
):
super(NaoBasicAwarenessRequest, self).__init__()
self.value = value
self.stimulus_detection = stimulus_detection if stimulus_detection else []
self.engagement_mode = engagement_mode
self.tracking_mode = tracking_mode
[docs]
class NaoqiAutonomousActuator(SICActuator):
"""
Actuator managing NAOqi autonomous abilities.
Provides an interface for enabling or disabling autonomous features like blinking, awareness, and background movements, as well as wakeUp and rest actions.
:ivar qi.Session session: Connection to the NAOqi framework.
"""
[docs]
def __init__(self, *args, **kwargs):
super(NaoqiAutonomousActuator, self).__init__(*args, **kwargs)
self.session = qi.Session()
self.session.connect("tcp://127.0.0.1:9559")
# Connect to AL proxies
self.blinking = self.session.service("ALAutonomousBlinking")
self.background_movement = self.session.service("ALBackgroundMovement")
self.basic_awareness = self.session.service("ALBasicAwareness")
self.listening_movement = self.session.service("ALListeningMovement")
self.speaking_movement = self.session.service("ALSpeakingMovement")
self.autonomous_life = self.session.service("ALAutonomousLife")
self.motion = self.session.service("ALMotion")
[docs]
@staticmethod
def get_conf():
return SICConfMessage()
[docs]
@staticmethod
def get_output():
return SICMessage
[docs]
def execute(self, message):
"""
Execute the given autonomous ability request.
:param SICRequest message: Request specifying which autonomous ability to adjust.
:returns: Acknowledgement message after execution.
:rtype: SICMessage
:raises Exception: If a requested service or mode is invalid.
"""
if message == NaoRestRequest:
self.motion.rest()
elif message == NaoWakeUpRequest:
self.motion.wakeUp()
elif message == NaoBlinkingRequest:
self.blinking.setEnabled(message.value)
elif message == NaoBackgroundMovingRequest:
self.background_movement.setEnabled(message.value)
elif message == NaoListeningMovementRequest:
self.listening_movement.setEnabled(message.value)
elif message == NaoSpeakingMovementRequest:
self.speaking_movement.setEnabled(message.value)
if message.mode:
self.speaking_movement.setMode(message.mode)
elif message == NaoSetAutonomousLifeRequest:
self.autonomous_life.setState(message.state)
elif message == NaoBasicAwarenessRequest:
self.basic_awareness.setEnabled(message.value)
for name, val in message.stimulus_detection:
self.basic_awareness.setStimulusDetectionEnabled(name, val)
if message.engagement_mode:
self.basic_awareness.setEngagementMode(message.engagement_mode)
if message.tracking_mode:
self.basic_awareness.setTrackingMode(message.tracking_mode)
return SICMessage()
[docs]
def stop(self, *args):
"""
Stop the actuator and close the NAOqi session.
"""
self.session.close()
self._stopped.set()
super(NaoqiAutonomousActuator, self).stop()
[docs]
class NaoqiAutonomous(SICConnector):
component_class = NaoqiAutonomousActuator
if __name__ == "__main__":
SICComponentManager([NaoqiAutonomousActuator])