from sic_framework import SICComponentManager, SICConfMessage, SICMessage, utils
from sic_framework.core.component_python2 import SICComponent
from sic_framework.core.connector import SICConnector
if utils.PYTHON_VERSION_IS_2:
import qi
[docs]
class PepperRightBumperMessage(SICMessage):
"""
Message containing right bumper state.
:ivar int value: Bumper state - 1 when pressed, 0 when released.
"""
[docs]
def __init__(self, value):
"""
Initialize right bumper message.
:param int value: Bumper state (1 = pressed, 0 = released).
"""
super(PepperRightBumperMessage, self).__init__()
self.value = value
[docs]
class PepperRightBumperSensor(SICComponent):
"""
Sensor component for Pepper's right bumper.
Monitors the right bumper sensor on Pepper robot and emits events when the
bumper is pressed or released. The bumper is located on the right side of
Pepper's base platform.
This sensor subscribes to NAOqi's ALMemory ``RightBumperPressed`` event and
publishes :class:`PepperRightBumperMessage` instances through the SIC framework.
Example usage::
def on_right_bumper(msg):
if msg.value == 1:
print("Right bumper pressed!")
else:
print("Right bumper released!")
pepper.right_bumper.register_callback(on_right_bumper)
"""
[docs]
def __init__(self, *args, **kwargs):
"""
Initialize the right bumper sensor.
Establishes connection to NAOqi session and ALMemory service.
:param args: Variable length argument list passed to parent.
:param kwargs: Arbitrary keyword arguments passed to parent.
"""
super(PepperRightBumperSensor, self).__init__(*args, **kwargs)
self.session = qi.Session()
self.session.connect("tcp://127.0.0.1:9559")
# Connect to ALMemory service for sensor events
self.memory_service = self.session.service("ALMemory")
# Track signal connection IDs for cleanup
self.ids = []
[docs]
@staticmethod
def get_conf():
"""
Get default configuration for this sensor.
:returns: Default (empty) configuration message.
:rtype: SICConfMessage
"""
return SICConfMessage()
[docs]
@staticmethod
def get_output():
"""
Get the output message type this sensor produces.
:returns: PepperRightBumperMessage class.
:rtype: type
"""
return PepperRightBumperMessage
[docs]
def onBumperChanged(self, value):
"""
Callback invoked when bumper state changes.
:param int value: New bumper state (1 = pressed, 0 = released).
"""
self.output_message(PepperRightBumperMessage(value))
[docs]
def start(self):
"""
Start the sensor and subscribe to bumper events.
Connects to the NAOqi ``RightBumperPressed`` event and begins emitting
messages when the bumper state changes.
"""
super(PepperRightBumperSensor, self).start()
self.bumper = self.memory_service.subscriber("RightBumperPressed")
id = self.bumper.signal.connect(self.onBumperChanged)
self.ids.append(id)
[docs]
def stop(self, *args):
# No long-running worker loop; mark stopped immediately so cleanup runs.
self._stopped.set()
super(PepperRightBumperSensor, self).stop(*args)
def _cleanup(self):
try:
for id in self.ids:
self.bumper.signal.disconnect(id)
except Exception:
pass
try:
self.session.close()
except Exception:
pass
[docs]
class PepperRightBumper(SICConnector):
"""
Connector for accessing Pepper's right bumper sensor.
Provides a high-level interface to the :class:`PepperRightBumperSensor` component.
Access this through the Pepper device's ``right_bumper`` property.
"""
component_class = PepperRightBumperSensor
if __name__ == "__main__":
SICComponentManager([PepperRightBumperSensor])