from sic_framework import SICComponentManager, SICConfMessage, SICMessage, utils
from sic_framework.core.component_python2 import SICComponent
from sic_framework.core.connector import SICConnector
if utils.PYTHON_VERSION_IS_2:
import qi
[docs]
class PepperLeftBumperMessage(SICMessage):
"""
Message containing left bumper state.
:ivar int value: Bumper state - 1 when pressed, 0 when released.
"""
[docs]
def __init__(self, value):
"""
Initialize left bumper message.
:param int value: Bumper state (1 = pressed, 0 = released).
"""
super(PepperLeftBumperMessage, self).__init__()
self.value = value
[docs]
class PepperLeftBumperSensor(SICComponent):
"""
Sensor component for Pepper's left bumper.
Monitors the left bumper sensor on Pepper robot and emits events when the
bumper is pressed or released. The bumper is located on the left side of
Pepper's base platform.
This sensor subscribes to NAOqi's ALMemory ``LeftBumperPressed`` event and
publishes :class:`PepperLeftBumperMessage` instances through the SIC framework.
Example usage::
def on_left_bumper(msg):
if msg.value == 1:
print("Left bumper pressed!")
else:
print("Left bumper released!")
pepper.left_bumper.register_callback(on_left_bumper)
"""
[docs]
def __init__(self, *args, **kwargs):
"""
Initialize the left bumper sensor.
Establishes connection to NAOqi session and ALMemory service.
:param args: Variable length argument list passed to parent.
:param kwargs: Arbitrary keyword arguments passed to parent.
"""
super(PepperLeftBumperSensor, self).__init__(*args, **kwargs)
self.session = qi.Session()
self.session.connect("tcp://127.0.0.1:9559")
# Connect to ALMemory service for sensor events
self.memory_service = self.session.service("ALMemory")
# Track signal connection IDs for cleanup
self.ids = []
[docs]
@staticmethod
def get_conf():
"""
Get default configuration for this sensor.
:returns: Default (empty) configuration message.
:rtype: SICConfMessage
"""
return SICConfMessage()
[docs]
@staticmethod
def get_output():
"""
Get the output message type this sensor produces.
:returns: PepperLeftBumperMessage class.
:rtype: type
"""
return PepperLeftBumperMessage
[docs]
def onBumperChanged(self, value):
"""
Callback invoked when bumper state changes.
:param int value: New bumper state (1 = pressed, 0 = released).
"""
self.output_message(PepperLeftBumperMessage(value))
[docs]
def start(self):
"""
Start the sensor and subscribe to bumper events.
Connects to the NAOqi ``LeftBumperPressed`` event and begins emitting
messages when the bumper state changes.
"""
super(PepperLeftBumperSensor, self).start()
self.bumper = self.memory_service.subscriber("LeftBumperPressed")
id = self.bumper.signal.connect(self.onBumperChanged)
self.ids.append(id)
[docs]
def stop(self, *args):
# No long-running worker loop; mark stopped immediately so cleanup runs.
self._stopped.set()
super(PepperLeftBumperSensor, self).stop(*args)
def _cleanup(self):
try:
for id in self.ids:
self.bumper.signal.disconnect(id)
except Exception:
pass
try:
self.session.close()
except Exception:
pass
[docs]
class PepperLeftBumper(SICConnector):
"""
Connector for accessing Pepper's left bumper sensor.
Provides a high-level interface to the :class:`PepperLeftBumperSensor` component.
Access this through the Pepper device's ``left_bumper`` property.
"""
component_class = PepperLeftBumperSensor
if __name__ == "__main__":
SICComponentManager([PepperLeftBumperSensor])