import threading
from sic_framework import SICComponentManager, utils
from sic_framework.core.connector import SICConnector
from sic_framework.core.message_python2 import AudioMessage, SICConfMessage
from sic_framework.core.sensor_python2 import SICSensor
if utils.PYTHON_VERSION_IS_2:
import qi
from naoqi import ALProxy
[docs]
class NaoqiMicrophoneConf(SICConfMessage):
[docs]
def __init__(self):
self.channel_index = 3 # front microphone
self.no_channels = 1
self.sample_rate = 16000
self.index = -1
[docs]
class NaoqiMicrophoneSensor(SICSensor):
COMPONENT_STARTUP_TIMEOUT = 4
[docs]
def __init__(self, *args, **kwargs):
"""
:param Any args: Positional arguments passed to the base class.
:param Any kwargs: Keyword arguments passed to the base class.
:raises RuntimeError: If the callback service cannot be registered.
"""
super(NaoqiMicrophoneSensor, self).__init__(*args, **kwargs)
self.session = qi.Session()
self.session.connect("tcp://127.0.0.1:9559")
self.audio_service = self.session.service("ALAudioDevice")
self.module_name = "SICMicrophoneService"
try:
self.session_id = self.session.registerService(self.module_name, self)
except RuntimeError:
# possbile solution: do not catch runtime error, the registering is already done so
# the self.audio_service.subscribe(self.module_name) should work
raise RuntimeError(
"Naoqi error, restart robot. Cannot re-register ALAudioDevice service, this service is a singleton. "
)
self.audio_service.setClientPreferences(
self.module_name, self.params.sample_rate, self.params.channel_index, 0
)
self.audio_service.subscribe(self.module_name)
self.new_sound_data_available = threading.Event()
[docs]
@staticmethod
def get_conf():
"""
Return the default configuration for this sensor.
:returns: Microphone configuration instance.
:rtype: NaoqiMicrophoneConf
"""
return NaoqiMicrophoneConf()
[docs]
@staticmethod
def get_output():
return AudioMessage
[docs]
def execute(self):
self.new_sound_data_available.wait()
self.new_sound_data_available.clear()
return AudioMessage(self.audio_buffer, sample_rate=self.params.sample_rate)
[docs]
def stop(self, *args):
"""
Stop the microphone sensor.
"""
self.audio_service.unsubscribe(self.module_name)
self.session.unregisterService(self.session_id)
self.session.close()
self._stopped.set()
super(NaoqiMicrophoneSensor, self).stop(*args)
[docs]
def processRemote(self, nbOfChannels, nbOfSamplesByChannel, timeStamp, inputBuffer):
self.audio_buffer = inputBuffer
self.naoqi_timestamp = timeStamp
self.new_sound_data_available.set()
[docs]
class NaoqiMicrophone(SICConnector):
component_class = NaoqiMicrophoneSensor
if __name__ == "__main__":
SICComponentManager([NaoqiMicrophoneSensor])