#! python3.7 # Recent phrases to include in the text buffer before the current transcription. recent_phrase_count = 8 # How real time the recording is in seconds. record_timeout = 2 import argparse import os import numpy as np import speech_recognition import whisper import torch from datetime import datetime, timedelta from queue import Queue from time import sleep from sys import platform import textwrap import difflib import pygame pygame_font_height = 16 pygame.init() pygame_display_surface = pygame.display.set_mode((1280, pygame_font_height * 2)) pygame.display.set_caption("Transcription") pygame_font = pygame.font.Font("/home/kiri/.fonts/Sigmar-Regular.ttf", pygame_font_height) class AudioSource: def __init__(self): # Thread safe Queue for passing data from the threaded recording callback. self.data_queue = Queue() class MicrophoneAudioSource(AudioSource): def __init__(self): super().__init__() self.recorder = speech_recognition.Recognizer() self.recorder.energy_threshold = 1000 # Definitely do this, dynamic energy compensation lowers the energy # threshold dramatically to a point where the SpeechRecognizer # never stops recording. self.recorder.dynamic_energy_threshold = False self.source = speech_recognition.Microphone(sample_rate=16000) with self.source: self.recorder.adjust_for_ambient_noise(self.source) def record_callback(_, audio:speech_recognition.AudioData) -> None: """ Threaded callback function to receive audio data when recordings finish. audio: An AudioData containing the recorded bytes. """ # Grab the raw bytes and push it into the thread safe queue. print("GOT SOME DATA!!!") data = audio.get_raw_data() self.data_queue.put(data) # Create a background thread that will pass us raw audio bytes. # We could do this manually but SpeechRecognizer provides a nice helper. self.recorder.listen_in_background(self.source, record_callback, phrase_time_limit=record_timeout) print("--------------------------------------------------------------") print("Done setting up mic!") print("--------------------------------------------------------------") # while True: # pygame_text_surface = pygame_font.render("Test test test", (0, 0, 0), (255, 255, 255)) # pygame_text_rect = pygame_text_surface.get_rect() # pygame_text_rect.center = (640, 32) # pygame_display_surface.fill((0, 0, 0)) # pygame_display_surface.blit(pygame_text_surface, pygame_text_rect) # for event in pygame.event.get(): # if event.type == pygame.QUIT: # pygame.quit() # pygame.display.update() # exit(0) def onestepchange(start, dest): ret = "" for i, s in enumerate(difflib.ndiff(start, dest)): # print(i) # print(s) if s[0] == '-': return ret + start[i+1:] if s[1] == '+': return ret + s[-1] + start[i:] ret = ret + s[-1] if len(ret) > len(start): return ret if ret[i] != start[i]: return ret + start[i:] return ret def countsteps(start, dest): step_count = 0 while start != dest: start = onestepchange(start, dest) step_count += 1 return step_count def main(): parser = argparse.ArgumentParser() parser.add_argument("--model", default="medium", help="Model to use", choices=["tiny", "base", "small", "medium", "large"]) parser.add_argument("--non_english", action='store_true', help="Don't use the english model.") parser.add_argument("--energy_threshold", default=1000, help="Energy level for mic to detect.", type=int) parser.add_argument("--phrase_timeout", default=3, help="How much empty space between recordings before we " "consider it a new line in the transcription.", type=float) if 'linux' in platform: parser.add_argument("--default_microphone", default='pulse', help="Default microphone name for SpeechRecognition. " "Run this with 'list' to view available Microphones.", type=str) args = parser.parse_args() # The last time a recording was retrieved from the queue. phrase_time = None #data_queue = Queue() # We use SpeechRecognizer to record our audio because it has a nice feature where it can detect when speech ends. # Load / Download model model = args.model if args.model != "large" and not args.non_english: model = model + ".en" audio_model = whisper.load_model(model) phrase_timeout = args.phrase_timeout transcription = [''] # Cue the user that we're ready to go. print("Model loaded.\n") # Rolling output text buffer. # This is the one that animates. Stored as a single string. rolling_output_text = "" # This is the one that updates in big chunks at lower frequency. # Stored as an array of phrases. output_text = [""] mic_audio_source = MicrophoneAudioSource() data_queue = mic_audio_source.data_queue # Rolling audio input buffer. audio_data = b'' diffsize = 0 while True: try: for event in pygame.event.get(): if event.type == pygame.QUIT: pygame.quit() exit(0) rolling_text_target = " ".join(output_text)[-160:] if rolling_text_target != rolling_output_text: # Chop off the start all at once. It's not needed for the animation to look good. new_rolling_output_text = onestepchange(rolling_output_text, rolling_text_target) while rolling_output_text.endswith(new_rolling_output_text): new_rolling_output_text = onestepchange(new_rolling_output_text, rolling_text_target) rolling_output_text = new_rolling_output_text if countsteps(rolling_output_text, rolling_text_target) > 80: rolling_output_text = rolling_text_target print(rolling_output_text) pygame_text_surface = pygame_font.render(rolling_output_text, (0, 0, 0), (255, 255, 255)) pygame_text_rect = pygame_text_surface.get_rect() pygame_text_rect.center = (640, pygame_font_height) pygame_text_rect.right = 1280 pygame_display_surface.fill((0, 0, 0)) pygame_display_surface.blit(pygame_text_surface, pygame_text_rect) pygame.display.update() diffsize = abs(len(rolling_output_text) - len(rolling_text_target)) else: now = datetime.utcnow() # Pull raw recorded audio from the queue. if not data_queue.empty(): phrase_complete = False # If enough time has passed between recordings, consider the phrase complete. # Clear the current working audio buffer to start over with the new data. # # FIXME: Shouldn't we cut off the phrase here instead of # waiting for later? if phrase_time and now - phrase_time > timedelta(seconds=phrase_timeout): phrase_complete = True # This is the last time we received new audio data from the queue. phrase_time = now # Combine audio data from queue audio_data += b''.join(data_queue.queue) data_queue.queue.clear() # Convert in-ram buffer to something the model can use directly without needing a temp file. # Convert data from 16 bit wide integers to floating point with a width of 32 bits. # Clamp the audio stream frequency to a PCM wavelength compatible default of 32768hz max. audio_np = np.frombuffer(audio_data, dtype=np.int16).astype(np.float32) / 32768.0 # Run the transcription model, and extract the text. result = audio_model.transcribe(audio_np, fp16=torch.cuda.is_available()) text = result['text'].strip() # Update rolling transcription file. # Start with all our recent-but-complete phrases. output_text = transcription[-recent_phrase_count:] # Append the phrase-in-progress. (TODO: Can we make this a # different color or something?) output_text.append(text) # If we're done with the phrase, we can go ahead and stuff # it into the list and clear out the current audio data # buffer. if phrase_complete: # Append to full transcription. if text != "": transcription.append(text) # Clear audio buffer. audio_data = b'' # Infinite loops are bad for processors, must sleep. Also, limit the anim speed. if diffsize > 30: sleep(0.01) else: sleep(0.05) except KeyboardInterrupt: break if __name__ == "__main__": main()