Source code for sic_framework.devices.common_mini.mini_speaker

import pyaudio

from sic_framework import SICComponentManager
from sic_framework.core.component_python2 import SICComponent
from sic_framework.core.connector import SICConnector
from sic_framework.core.message_python2 import (
    AudioMessage, SICConfMessage, SICMessage
)


[docs] class MiniSpeakersConf(SICConfMessage):
[docs] def __init__(self, sample_rate=44100, channels=1): self.sample_rate = sample_rate self.channels = channels
[docs] class MiniSpeakerComponent(SICComponent): COMPONENT_STARTUP_TIMEOUT = 5
[docs] def __init__(self, *args, **kwargs): super(MiniSpeakerComponent, self).__init__(*args, **kwargs) self.device = pyaudio.PyAudio() # open an audio stream self.stream = self.device.open( format=pyaudio.paInt16, channels=self.params.channels, rate=self.params.sample_rate, input=False, output=True, )
[docs] @staticmethod def get_conf(): return MiniSpeakersConf()
[docs] @staticmethod def get_inputs(): return [AudioMessage]
[docs] @staticmethod def get_output(): return SICMessage
[docs] def on_message(self, message): self.stream.write(message.waveform)
[docs] def on_request(self, request): self.stream.write(request.waveform) return SICMessage()
[docs] def stop(self, *args): # No long-running worker loop; mark as stopped immediately so cleanup runs. self._stopped.set() super(MiniSpeakerComponent, self).stop(*args)
def _cleanup(self): self.logger.info("Stopped speakers") try: self.stream.close() except Exception: pass try: self.device.terminate() except Exception: pass
[docs] class MiniSpeaker(SICConnector): component_class = MiniSpeakerComponent
if __name__ == "__main__": SICComponentManager([MiniSpeakerComponent])