whisper-live/transcribe_demo.py

271 lines
8.5 KiB
Python

#! python3.7
# Recent phrases to include in the text buffer before the current transcription.
recent_phrase_count = 8
# How real time the recording is in seconds.
record_timeout = 2
import argparse
import os
import numpy as np
import speech_recognition
import whisper
import torch
from datetime import datetime, timedelta
from queue import Queue
from time import sleep
from sys import platform
import textwrap
import difflib
import pygame
import wave
#from pyogg.opus import OpusEncoder
import socket
import select
import time
import json
import threading
import diffstuff
import audiosource
from transcriber import Transcriber
pygame_font_height = 16
pygame.init()
pygame_display_surface = pygame.display.set_mode((1280, pygame_font_height * 2))
pygame.display.set_caption("Transcription")
pygame_font = pygame.font.Font("/home/kiri/.fonts/Sigmar-Regular.ttf", pygame_font_height)
opus_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print("binding")
opus_server_socket.bind(("127.0.0.1", 9967))
print("set non blocking")
#opus_server_socket.setblocking(False)
print("listening")
opus_server_socket.listen()
wave_out = wave.open("wave.wav", "wb")
wave_out.setnchannels(1)
wave_out.setframerate(16000)
wave_out.setsampwidth(2)
transcriber = Transcriber()
mic_source = audiosource.MicrophoneAudioSource()
while True:
print("looping")
# Wait for a connection.
while True:
s = select.select([opus_server_socket], [], [], 0)
time.sleep(0.01)
if len(s[0]):
accepted_socket, addr = opus_server_socket.accept()
print(accepted_socket)
break
opusSource = audiosource.OpusStreamAudioSource(accepted_socket)
transcriber.set_source(opusSource)
while not opusSource.is_done():
time.sleep(0.1)
transcriber.update()
exit(0)
def onestepchange(start, dest):
ret = ""
for i, s in enumerate(difflib.ndiff(start, dest)):
# print(i)
# print(s)
if s[0] == '-':
return ret + start[i+1:]
if s[1] == '+':
return ret + s[-1] + start[i:]
ret = ret + s[-1]
if len(ret) > len(start):
return ret
if ret[i] != start[i]:
return ret + start[i:]
return ret
def countsteps(start, dest):
step_count = 0
while start != dest:
start = onestepchange(start, dest)
step_count += 1
return step_count
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--model", default="medium", help="Model to use",
choices=["tiny", "base", "small", "medium", "large"])
parser.add_argument("--non_english", action='store_true',
help="Don't use the english model.")
parser.add_argument("--energy_threshold", default=1000,
help="Energy level for mic to detect.", type=int)
parser.add_argument("--phrase_timeout", default=3,
help="How much empty space between recordings before we "
"consider it a new line in the transcription.", type=float)
if 'linux' in platform:
parser.add_argument("--default_microphone", default='pulse',
help="Default microphone name for SpeechRecognition. "
"Run this with 'list' to view available Microphones.", type=str)
args = parser.parse_args()
# The last time a recording was retrieved from the queue.
phrase_time = None
#data_queue = Queue()
# We use SpeechRecognizer to record our audio because it has a nice feature where it can detect when speech ends.
# Load / Download model
model = args.model
if args.model != "large" and not args.non_english:
model = model + ".en"
audio_model = whisper.load_model(model)
phrase_timeout = args.phrase_timeout
transcription = ['']
# Cue the user that we're ready to go.
print("Model loaded.\n")
# Rolling output text buffer.
# This is the one that animates. Stored as a single string.
rolling_output_text = ""
# This is the one that updates in big chunks at lower frequency.
# Stored as an array of phrases.
output_text = [""]
mic_audio_source = MicrophoneAudioSource()
mic_audio_source.start()
data_queue = mic_audio_source.data_queue
# Rolling audio input buffer.
audio_data = b''
diffsize = 0
while True:
try:
for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()
exit(0)
rolling_text_target = " ".join(output_text)[-160:]
if rolling_text_target != rolling_output_text:
# Chop off the start all at once. It's not needed for the animation to look good.
new_rolling_output_text = onestepchange(rolling_output_text, rolling_text_target)
while rolling_output_text.endswith(new_rolling_output_text):
new_rolling_output_text = onestepchange(new_rolling_output_text, rolling_text_target)
rolling_output_text = new_rolling_output_text
if countsteps(rolling_output_text, rolling_text_target) > 80:
rolling_output_text = rolling_text_target
print(rolling_output_text)
pygame_text_surface = pygame_font.render(rolling_output_text, (0, 0, 0), (255, 255, 255))
pygame_text_rect = pygame_text_surface.get_rect()
pygame_text_rect.center = (640, pygame_font_height)
pygame_text_rect.right = 1280
pygame_display_surface.fill((0, 0, 0))
pygame_display_surface.blit(pygame_text_surface, pygame_text_rect)
pygame.display.update()
diffsize = abs(len(rolling_output_text) - len(rolling_text_target))
else:
now = datetime.utcnow()
# Pull raw recorded audio from the queue.
if not data_queue.empty():
phrase_complete = False
# If enough time has passed between recordings, consider the phrase complete.
# Clear the current working audio buffer to start over with the new data.
#
# FIXME: Shouldn't we cut off the phrase here instead of
# waiting for later?
if phrase_time and now - phrase_time > timedelta(seconds=phrase_timeout):
phrase_complete = True
# This is the last time we received new audio data from the queue.
phrase_time = now
# Combine audio data from queue
audio_data += b''.join(data_queue.queue)
data_queue.queue.clear()
# Convert in-ram buffer to something the model can use directly without needing a temp file.
# Convert data from 16 bit wide integers to floating point with a width of 32 bits.
# Clamp the audio stream frequency to a PCM wavelength compatible default of 32768hz max.
audio_np = np.frombuffer(audio_data, dtype=np.int16).astype(np.float32) / 32768.0
# Run the transcription model, and extract the text.
result = audio_model.transcribe(audio_np, fp16=torch.cuda.is_available())
text = result['text'].strip()
# Update rolling transcription file.
# Start with all our recent-but-complete phrases.
output_text = transcription[-recent_phrase_count:]
# Append the phrase-in-progress. (TODO: Can we make this a
# different color or something?)
output_text.append(text)
# If we're done with the phrase, we can go ahead and stuff
# it into the list and clear out the current audio data
# buffer.
if phrase_complete:
# Append to full transcription.
if text != "":
transcription.append(text)
# Clear audio buffer.
audio_data = b''
# Infinite loops are bad for processors, must sleep. Also, limit the anim speed.
if diffsize > 30:
sleep(0.01)
else:
sleep(0.05)
except KeyboardInterrupt:
break
if __name__ == "__main__":
main()