import io
import socket
import threading
import wave
import numpy as np
from sic_framework import SICActuator, SICComponentManager, utils
from sic_framework.core.component_python2 import SICComponent
from sic_framework.core.connector import SICConnector
from sic_framework.core.message_python2 import AudioMessage, SICConfMessage, SICMessage
from sic_framework.core.sensor_python2 import SICSensor
if utils.PYTHON_VERSION_IS_2:
import qi
from naoqi import ALProxy
[docs]
class NaoqiSpeakersConf(SICConfMessage):
"""
Configuration for the NAOqi speaker component.
"""
[docs]
def __init__(self):
"""
Initialize default audio configuration values for the NAOqi speaker component.
"""
self.no_channels = 1
self.sample_rate = 16000
self.index = -1
[docs]
class NaoqiSpeakerComponent(SICComponent):
[docs]
def __init__(self, *args, **kwargs):
"""
Initialize the NAOqi speaker component and connect to ALAudioDevice and ALAudioPlayer.
"""
super(NaoqiSpeakerComponent, self).__init__(*args, **kwargs)
self.session = qi.Session()
self.session.connect("tcp://127.0.0.1:9559")
self.audio_service = self.session.service("ALAudioDevice")
self.audio_player_service = self.session.service("ALAudioPlayer")
self.i = 0
[docs]
@staticmethod
def get_conf():
"""
Return the default configuration for this component.
:returns: Default speaker configuration.
:rtype: NaoqiSpeakersConf
"""
return NaoqiSpeakersConf()
[docs]
@staticmethod
def get_output():
return SICMessage
[docs]
def on_message(self, message):
self.play_sound(message)
[docs]
def on_request(self, request):
self.play_sound(request)
return SICMessage()
[docs]
def play_sound(self, message):
bytestream = message.waveform
frame_rate = message.sample_rate
# Set the parameters for the WAV file
channels = 1 # 1 for mono audio
sample_width = 2 # 2 bytes for 16-bit audio
num_frames = len(bytestream) // (channels * sample_width)
# Create a WAV file in memory
tmp_file = "/tmp/tmp{}.wav".format(self.i)
wav_file = wave.open(tmp_file, "wb")
self.i += 1
wav_file.setparams(
(channels, sample_width, frame_rate, num_frames, "NONE", "not compressed")
)
# Write the bytestream to the WAV file
wav_file.writeframes(bytestream)
# Launchs the playing of a file
self.audio_player_service.playFile(tmp_file)
[docs]
def stop(self, *args):
"""
Stop the NAOqi speaker component.
"""
self.session.close()
self._stopped.set()
super(NaoqiSpeakerComponent, self).stop()
[docs]
class NaoqiSpeaker(SICConnector):
component_class = NaoqiSpeakerComponent
if __name__ == "__main__":
SICComponentManager([NaoqiSpeakerComponent])