#!/usr/bin/python3 import socket import select import time from queue import Queue import json import threading import speech_recognition import wave from pyogg.opus_decoder import OpusDecoder class AudioSource: def __init__(self): # Thread safe Queue for passing data from the threaded recording # callback. self.data_queue = Queue() def is_done(self): return True # ----------------------------------------------------------------------------- # Microphone # How real time the recording is in seconds. record_timeout = 2 class MicrophoneAudioSource(AudioSource): def __init__(self): super().__init__() self._recorder = speech_recognition.Recognizer() self._recorder.energy_threshold = 1200 # Definitely do this, dynamic energy compensation lowers the energy # threshold dramatically to a point where the SpeechRecognizer # never stops recording. self._recorder.dynamic_energy_threshold = False self._source = speech_recognition.Microphone(sample_rate=16000) with self._source: self._recorder.adjust_for_ambient_noise(self._source) def record_callback(_, audio:speech_recognition.AudioData) -> None: """ Threaded callback function to receive audio data when recordings finish. audio: An AudioData containing the recorded bytes. """ # Grab the raw bytes and push it into the thread safe queue. data = audio.get_raw_data() self.data_queue.put(bytearray(data)) # Create a background thread that will pass us raw audio bytes. # We could do this manually but SpeechRecognizer provides a nice helper. self._stopper = self._recorder.listen_in_background( self._source, record_callback, phrase_time_limit=record_timeout) def stop(self): assert(self._stopper) self._stopper() self._recorder = None self._stopper = None self._source = None def is_done(self): return self._recorder == None # ----------------------------------------------------------------------------- # Opus stream # For debugging # wave_out = wave.open("wave2.wav", "wb") # wave_out.setnchannels(1) # wave_out.setframerate(16000) # wave_out.setsampwidth(2) class OpusStreamAudioSource(AudioSource): def __init__(self, sock): super().__init__() self._socket = sock self._opus_decoder = OpusDecoder() self._opus_decoder.set_channels(1) self._opus_decoder.set_sampling_frequency(16000) # Fetch user info. user_info_tmp = self._read_packet(self._socket) self._user_info = json.loads(user_info_tmp.decode("utf-8")) print("User connection...") print(json.dumps(self._user_info, indent=4)) self._is_done = False # Start input thread. self._input_thread = threading.Thread( target=self._input_thread_func, daemon=True) self._input_thread.start() def _read_packet(self, sock): try: input_buffer = b'' #print("Reading packet size...") while len(input_buffer) < 4: input_buffer = input_buffer + sock.recv(1) if not input_buffer: raise Exception("Failed to read size of packet.") packet_size = int.from_bytes(input_buffer, "little") #print("Packet size: ", packet_size) input_buffer = b'' while len(input_buffer) < packet_size: input_buffer = input_buffer + sock.recv(1) if not input_buffer: raise Exception("Failed to read packet.") return input_buffer except Exception as e: # FIXME: Use socket-specific exception type. return None def _input_thread_func(self): print("input thread start") try: while not self._is_done: next_packet = self._read_packet(self._socket) if next_packet: # If we don't use bytearray here to copy, we run into a weird # exception about the memory not being writeable. decoded_data = self._opus_decoder.decode(bytearray(next_packet)) # For debugging. #wave_out.writeframes(decoded_data) # We need to copy decoded_data here or we end up with # recycled buffers in our queue, which leads to broken # audio. self.data_queue.put(bytearray(decoded_data)) else: break except Exception as e: # Probably disconnected. We don't care. Just clean up. # FIXME: Limit exception to socket errors. pass print("input thread done") self._is_done = True def stop(self): self._is_done = True # We won't join() the input thread because we don't want to sit around # and wait for a packet. It'll die on its own, so whatever. def is_done(self): return self._is_done