Make real time transcription demo with OpenAI Whisper.
This commit is contained in:
commit
4c0d6821ff
34
README.md
Normal file
34
README.md
Normal file
@ -0,0 +1,34 @@
|
||||
# Real Time Whisper Transcription
|
||||
|
||||

|
||||
|
||||
This is a demo of real time speech to text with OpenAI's Whisper model. It works by constantly recording audio in a thread and concatenating the raw bytes over multiple recordings.
|
||||
|
||||
To install dependencies simply run
|
||||
```
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
in an environment of your choosing.
|
||||
|
||||
Whisper also requires the command-line tool [`ffmpeg`](https://ffmpeg.org/) to be installed on your system, which is available from most package managers:
|
||||
|
||||
```
|
||||
# on Ubuntu or Debian
|
||||
sudo apt update && sudo apt install ffmpeg
|
||||
|
||||
# on Arch Linux
|
||||
sudo pacman -S ffmpeg
|
||||
|
||||
# on MacOS using Homebrew (https://brew.sh/)
|
||||
brew install ffmpeg
|
||||
|
||||
# on Windows using Chocolatey (https://chocolatey.org/)
|
||||
choco install ffmpeg
|
||||
|
||||
# on Windows using Scoop (https://scoop.sh/)
|
||||
scoop install ffmpeg
|
||||
```
|
||||
|
||||
For more information on Whisper please see https://github.com/openai/whisper
|
||||
|
||||
The code in this repository is public domain.
|
5
requirements.txt
Normal file
5
requirements.txt
Normal file
@ -0,0 +1,5 @@
|
||||
pyaudio
|
||||
SpeechRecognition
|
||||
--extra-index-url https://download.pytorch.org/whl/cu116
|
||||
torch
|
||||
git+https://github.com/openai/whisper.git
|
130
transcribe_demo.py
Normal file
130
transcribe_demo.py
Normal file
@ -0,0 +1,130 @@
|
||||
#! python3.7
|
||||
|
||||
import argparse
|
||||
import io
|
||||
import os
|
||||
import speech_recognition as sr
|
||||
import whisper
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from queue import Queue
|
||||
from tempfile import NamedTemporaryFile
|
||||
from time import sleep
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--model", default="medium", help="Model to use",
|
||||
choices=["tiny", "base", "small", "medium", "large"])
|
||||
parser.add_argument("--non_english", action='store_true',
|
||||
help="Don't use the english model.")
|
||||
parser.add_argument("--energy_threshold", default=1000,
|
||||
help="Energy level for mic to detect.", type=int)
|
||||
parser.add_argument("--record_timeout", default=2,
|
||||
help="How real time the recording is in seconds.", type=float)
|
||||
parser.add_argument("--phrase_timeout", default=3,
|
||||
help="How much empty space between recordings before we "
|
||||
"consider it a new line in the transcription.", type=float)
|
||||
args = parser.parse_args()
|
||||
|
||||
model = args.model
|
||||
if args.model != "large" and not args.non_english:
|
||||
model = model + ".en"
|
||||
audio_model = whisper.load_model(model)
|
||||
|
||||
record_timeout = args.record_timeout
|
||||
phrase_timeout = args.phrase_timeout
|
||||
|
||||
temp_file = NamedTemporaryFile().name
|
||||
transcription = ['']
|
||||
|
||||
# The last time a recording was retreived from the queue.
|
||||
phrase_time = None
|
||||
# Current raw audio bytes.
|
||||
last_sample = bytes()
|
||||
# Thread safe Queue for passing data from the threaded recording callback.
|
||||
data_queue = Queue()
|
||||
|
||||
# We use SpeechRecognizer to record our audio because it has a nice feauture where it can detect when speech ends.
|
||||
recorder = sr.Recognizer()
|
||||
recorder.energy_threshold = args.energy_threshold
|
||||
# Definitely do this, dynamic energy compensation lowers the energy threshold dramtically to a point where the SpeechRecognizer never stops recording.
|
||||
recorder.dynamic_energy_threshold = False
|
||||
|
||||
source = sr.Microphone(sample_rate=16000)
|
||||
with source:
|
||||
recorder.adjust_for_ambient_noise(source)
|
||||
|
||||
def record_callback(_, audio:sr.AudioData) -> None:
|
||||
"""
|
||||
Threaded callback function to recieve audio data when recordings finish.
|
||||
audio: An AudioData containing the recorded bytes.
|
||||
"""
|
||||
# Grab the raw bytes and push it into the thread safe queue.
|
||||
data = audio.get_raw_data()
|
||||
data_queue.put(data)
|
||||
|
||||
# Create a background thread that will pass us raw audio bytes.
|
||||
# We could do this manually but SpeechRecognizer provides a nice helper.
|
||||
recorder.listen_in_background(source, record_callback, phrase_time_limit=record_timeout)
|
||||
|
||||
# Cue the user that we're ready to go.
|
||||
print("Model loaded.\n")
|
||||
|
||||
while True:
|
||||
try:
|
||||
now = datetime.utcnow()
|
||||
# Pull raw recorded audio from the queue.
|
||||
if not data_queue.empty():
|
||||
phrase_complete = False
|
||||
# If enough time has passed between recordings, consider the phrase complete.
|
||||
# Clear the current working audio buffer to start over with the new data.
|
||||
if phrase_time and now - phrase_time > timedelta(seconds=phrase_timeout):
|
||||
last_sample = bytes()
|
||||
phrase_complete = True
|
||||
# This is the last time we received new audio data from the queue.
|
||||
phrase_time = now
|
||||
|
||||
# Concatenate our current audio data with the latest audio data.
|
||||
while not data_queue.empty():
|
||||
data = data_queue.get()
|
||||
last_sample += data
|
||||
|
||||
# Use AudioData to convert the raw data to wav data.
|
||||
audio_data = sr.AudioData(last_sample, source.SAMPLE_RATE, source.SAMPLE_WIDTH)
|
||||
wav_data = io.BytesIO(audio_data.get_wav_data())
|
||||
|
||||
# Write wav data to the temporary file as bytes.
|
||||
with open(temp_file, 'w+b') as f:
|
||||
f.write(wav_data.read())
|
||||
|
||||
# Read the transcription.
|
||||
result = audio_model.transcribe(temp_file)
|
||||
text = result['text'].strip()
|
||||
|
||||
# If we detected a pause between recordings, add a new item to our transcripion.
|
||||
# Otherwise edit the existing one.
|
||||
if phrase_complete:
|
||||
transcription.append(text)
|
||||
else:
|
||||
transcription[-1] = text
|
||||
|
||||
# Clear the console to reprint the updated transcription.
|
||||
os.system('cls' if os.name=='nt' else 'clear')
|
||||
for line in transcription:
|
||||
print(line)
|
||||
# Flush stdout.
|
||||
print('', end='', flush=True)
|
||||
|
||||
# Infinite loops are bad for processors, must sleep.
|
||||
sleep(0.25)
|
||||
except KeyboardInterrupt:
|
||||
break
|
||||
|
||||
print("\n\nTranscription:")
|
||||
for line in transcription:
|
||||
print(line)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Loading…
Reference in New Issue
Block a user