203 lines
5.4 KiB
Python
203 lines
5.4 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import argparse
|
|
import datetime
|
|
import os
|
|
import os.path
|
|
import re
|
|
import sys
|
|
import whisper
|
|
|
|
TCD_PATH = "/media/elf/USB DISK/RECORD/"
|
|
tcd_record = re.compile("R(\d{8}-\d{6}).WAV$", re.IGNORECASE)
|
|
CACHE_PATH = os.path.join(os.path.expanduser("~"), ".cache", "tcd")
|
|
CACHE_FILE = os.path.join(CACHE_PATH, "lastfile")
|
|
|
|
# https://stackoverflow.com/questions/5574702/how-do-i-print-to-stderr-in-python
|
|
def maybe_eprint(*args, **kwargs):
|
|
print(*args, file=sys.stderr, **kwargs)
|
|
|
|
|
|
def maybe_not_eprint(*args, **kwargs):
|
|
pass
|
|
|
|
|
|
eprint = maybe_not_eprint
|
|
|
|
|
|
def converttcdfilename(filename):
|
|
g = tcd_record.search(filename)
|
|
if not g:
|
|
print(
|
|
"Filename {} did not match TCD file pattern. How did we get here?".format(
|
|
filename
|
|
)
|
|
)
|
|
sys.exit(1)
|
|
tcd_time = datetime.datetime.strptime(g[1], "%Y%m%d-%H%M%S")
|
|
return tcd_time.strftime("%A, %B %d %Y, %I:%M %p")
|
|
|
|
|
|
def transcribe(filename: str):
|
|
model = whisper.load_model("small")
|
|
try:
|
|
model = whisper.load_model("small").cuda()
|
|
except:
|
|
eprint("Initializing CUDA failed. Falling back to CPU mode.")
|
|
result = model.transcribe(filename, language="en", temperature=0.0)
|
|
assert result["language"] == "en"
|
|
return result["text"].strip()
|
|
|
|
|
|
def transcribe_with_timestamp(filename: str):
|
|
ourdate = converttcdfilename(filename)
|
|
transcription = transcribe(filename)
|
|
print("{}\n{}\n{}".format(ourdate, "-" * len(ourdate), transcription))
|
|
|
|
|
|
def getlastfile():
|
|
if not (os.path.isdir(CACHE_PATH) and os.path.isfile(CACHE_FILE)):
|
|
return None
|
|
with open(CACHE_FILE, "r") as lf:
|
|
last = lf.readline()
|
|
if not last:
|
|
return None
|
|
return last.strip()
|
|
|
|
|
|
def writelast(filename: str):
|
|
if not os.path.isdir(CACHE_PATH):
|
|
os.makedirs(CACHE_PATH)
|
|
with open(CACHE_FILE, "w") as lf:
|
|
lf.write(filename)
|
|
lf.write("\n")
|
|
|
|
|
|
def main():
|
|
global eprint
|
|
parser = argparse.ArgumentParser(
|
|
description="Process Elf's Thought Capture Device"
|
|
)
|
|
parser.add_argument(
|
|
"-l",
|
|
"--list",
|
|
action="store_true",
|
|
help="List all available files on TCD",
|
|
)
|
|
parser.add_argument("-f", "--file", type=str, nargs=1, help="File to parse")
|
|
parser.add_argument(
|
|
"-i",
|
|
"--item",
|
|
nargs=1,
|
|
type=int,
|
|
help="File to parse, by index",
|
|
)
|
|
parser.add_argument(
|
|
"-a",
|
|
"--all",
|
|
action="store_true",
|
|
help="Ignore cached last-read value, process entire TCD.",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-p", "--path", nargs=1, type=str, help="Path to TCD recordings"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-v", "--verbose", action="store_true", help="Show plan and progress."
|
|
)
|
|
|
|
args = vars(parser.parse_args())
|
|
|
|
tcd_path = TCD_PATH
|
|
|
|
eprint = maybe_not_eprint
|
|
if args and "verbose" in args and args["verbose"]:
|
|
eprint = maybe_eprint
|
|
|
|
if (
|
|
args
|
|
and "path" in args
|
|
and args["path"] is not None
|
|
and len(args["path"]) > 0
|
|
):
|
|
tcd_path = args["path"][0]
|
|
eprint("Setting repository path to: {}".format(tcd_path))
|
|
|
|
if not os.path.isdir(tcd_path):
|
|
print("Could not identify path to TCD repository. Is it mounted?")
|
|
sys.exit(1)
|
|
|
|
files = [p for p in os.listdir(tcd_path) if tcd_record.match(p)]
|
|
if len(files) < 1:
|
|
print("No files in TCD repository to process.")
|
|
sys.exit(0)
|
|
|
|
if (
|
|
args
|
|
and "file" in args
|
|
and args["file"] is not None
|
|
and len(args["file"]) > 0
|
|
):
|
|
eprint("Transcribing {}".format(args["file"][0]))
|
|
transcribe(args["file"][0])
|
|
sys.exit(0)
|
|
|
|
if args and "list" in args and args["list"]:
|
|
for count, f in enumerate(files):
|
|
print("{}: {}".format(count + 1, converttcdfilename(f)))
|
|
sys.exit(0)
|
|
|
|
if (
|
|
args
|
|
and "item" in args
|
|
and args["item"] is not None
|
|
and len(args["item"]) > 0
|
|
):
|
|
pos = args["item"][0]
|
|
if pos < 1 or pos > len(files):
|
|
print(
|
|
"There is no item at the index you provided. Legal range is 1 to {}".format(
|
|
len(files)
|
|
)
|
|
)
|
|
sys.exit(1)
|
|
eprint("Transcribing {}".format(args["file"][pos]))
|
|
transcribe(args["file"][pos])
|
|
sys.exit(0)
|
|
|
|
if args and "all" in args and args["all"]:
|
|
eprint("Transcribing all files in repository {}".format(tcd_path))
|
|
for index, f in enumerate(files):
|
|
transcribe_with_timestamp(os.path.join(tcd_path, f))
|
|
if index != len(files):
|
|
print("\n\n")
|
|
sys.exit(0)
|
|
|
|
lastfile = getlastfile()
|
|
files_to_transcribe = [f for f in files]
|
|
files_to_transcribe.sort()
|
|
if lastfile:
|
|
try:
|
|
lastindex = files_to_transcribe.index(lastfile)
|
|
files_to_transcribe = files_to_transcribe[lastindex + 1 :]
|
|
except:
|
|
pass
|
|
|
|
if len(files_to_transcribe) < 1:
|
|
print("NOTICE: No new entries found to transcribe.")
|
|
sys.exit(0)
|
|
|
|
eprint("Transcribing: {}".format(", ".join(files_to_transcribe)))
|
|
for index, f in enumerate(files_to_transcribe):
|
|
transcribe_with_timestamp(os.path.join(tcd_path, f))
|
|
if index != len(files):
|
|
print("\n\n")
|
|
|
|
writelast(files_to_transcribe[-1])
|
|
sys.exit(0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|