#!/usr/bin/python3 # Recent phrases to include in the text buffer before the current transcription. recent_phrase_count = 8 # Seconds of silence before we start a new phrase. phrase_timeout = 3 # Higher is more restrictive on what it lets pass through. no_speech_prob_threshold = 0.2 import numpy as np import speech_recognition import whisper import torch import wave from datetime import datetime, timedelta import json _audio_model = whisper.load_model("medium.en") # "large" # For debugging... # wave_out = wave.open("wave.wav", "wb") # wave_out.setnchannels(1) # wave_out.setframerate(16000) # wave_out.setsampwidth(2) class Transcriber: def __init__(self): self._audio_source = None # Audio data for the current phrase. self._current_data = b'' self.phrases = [""] # Time since the last data came in for the current phrase. self._phrase_time = datetime.utcnow() def set_source(self, source): self._audio_source = source def phrase_probably_silent(self): """Whisper hallucinates a LOT on silence, so let's just ignore stuff that's mostly silence. First line of defense here.""" threshold = 100 threshold_pass = 0 threshold_fail = 0 avg = 0 for k in self._current_data: avg += k if(abs(k)) > threshold: threshold_pass += 1 else: threshold_fail += 1 avg = avg / len(self._current_data) threshold_pct = threshold_pass / len(self._current_data) if threshold_pct < 0.1: return True return False def update(self): now = datetime.utcnow() if self._audio_source: if not self._audio_source.data_queue.empty(): # We got some new data. Let's process it! # If enough time has passed between recordings, consider the # last phrase complete and start a new one. Clear the current # working audio buffer to start over with the new data. if self._phrase_time and now - self._phrase_time > timedelta(seconds=phrase_timeout): # Only add a new phrase if we actually have data in the last # one. if self.phrases[-1] != "": self.phrases.append("") self._current_data = b'' self._phrase_time = now # Get all the new data since last tick, new_data = [] while not self._audio_source.data_queue.empty(): new_packet = self._audio_source.data_queue.get() new_data.append(new_packet) new_data_joined = b''.join(new_data) # For debugging... #wave_out.writeframes(new_data_joined) # Append it to the current buffer. self._current_data = self._current_data + new_data_joined if self.phrase_probably_silent(): self.phrases[-1] = "" else: # Convert in-ram buffer to something the model can use # directly without needing a temp file. Convert data from 16 # bit wide integers to floating point with a width of 32 # bits. Clamp the audio stream frequency to a PCM wavelength # compatible default of 32768hz max. audio_np = np.frombuffer( self._current_data, dtype=np.int16).astype(np.float32) / 32768.0 # Run the transcription model, and extract the text. result = _audio_model.transcribe( audio_np, fp16=torch.cuda.is_available()) # Filter out text segments with a high no_speech_prob. combined_text = "" for seg in result["segments"]: if seg["no_speech_prob"] <= no_speech_prob_threshold: combined_text += seg["text"] text = combined_text.strip() self.phrases[-1] = text print("phrases: ", json.dumps(self.phrases, indent=4)) # Automatically drop audio sources when we're finished with them. if self._audio_source.is_done(): self._audio_source = None