Source code for wdoc.utils.loaders.shared_audio

import os
import re
import time
from pathlib import Path

import joblib
from beartype.typing import List, Optional, Union
from loguru import logger

from wdoc.utils.env import env
from wdoc.utils.misc import doc_loaders_cache, file_hasher


[docs] def seconds_to_timecode(inp: Union[str, float, int]) -> str: "used for vtt subtitle conversion" second = float(inp) minute = second // 60 second = second % 60 hour = minute // 60 minute = minute % 60 hour, minute, second = int(hour), int(minute), int(second) return f"{hour:02d}:{minute:02d}:{second:02d}"
[docs] def timecode_to_second(inp: str) -> int: "turns a vtt timecode into seconds" hour, minute, second = map(int, inp.split(":")) return hour * 3600 + minute * 60 + second
[docs] def is_timecode(inp: Union[float, str]) -> bool: try: timecode_to_second(inp) return True except Exception: return False
[docs] def process_vtt_content_for_llm( vtt_content: str, remove_hour_prefix: bool = True ) -> str: """ Process VTT content to make it more suitable for LLMs by reducing timecodes and removing unnecessary formatting. Args: vtt_content: The VTT content to process remove_hour_prefix: Whether to remove "00:" hour prefix if all content is under 99 minutes Returns: Processed text content optimized for LLM consumption """ # Reduce greatly the number of token in the subtitles by removing some less important formatting lines = vtt_content.splitlines() timecode_pattern = re.compile( r"(?:\d{2}:\d{2}:\d{2}\.\d{3})|(?:<\d{2}:\d{2}:\d{2}\.\d{3}>)|(?:</?c>)" ) latest_tc = -1 # store the timecode once every Xs newlines = [] for li in lines: if " --> " in li: li = re.sub(r"\.\d+ -->.*", "", li).strip() # remove duplicate timecodes: tc = timecode_to_second(li) if tc - latest_tc < 15: li = "" else: latest_tc = tc else: li = timecode_pattern.sub("", li).strip() is_tc = is_timecode(li) # We need at least one line, but skeep the lines before the first timecode if not newlines: if is_tc: newlines.append(li) continue # Check no consecutive timecodes (for cached_yt_loader compatibility) elif len(newlines) >= 2: if is_tc and is_timecode(newlines[-1]): # Skip consecutive timecodes to avoid VTT format issues continue if is_tc: newlines.append(li + " ") elif is_timecode(newlines[-1]): newlines[-1] += " " + li.strip() elif li not in newlines[-1]: newlines[-1] += " " + li.strip() if newlines[-1].strip() else li.strip() newlines = [nl.strip() for nl in newlines] # If the total length is less than 99 minutes, we remove the hour mark if remove_hour_prefix and newlines and newlines[-1].startswith("00:"): newlines = [nl[3:] if nl.startswith("00:") else nl for nl in newlines] content = "\n".join(newlines) return content
[docs] def convert_verbose_json_to_timestamped_text(transcript: dict) -> str: # turn the json into vtt, then reuse the code used for youtube chapters buffer = "" for seg in transcript["segments"]: start = seconds_to_timecode(seg["start"]) end = seconds_to_timecode(seg["end"]) text = seg["text"] buffer += f"\n\n{start}.0 --> {end}\n{text}" buffer = buffer.strip() content = process_vtt_content_for_llm(buffer, remove_hour_prefix=False) return content
@doc_loaders_cache.cache(ignore=["audio_path"]) def transcribe_audio_deepgram( audio_path: Union[str, Path], audio_hash: str, deepgram_kwargs: Optional[dict] = None, ) -> dict: "Use whisper to transcribe an audio file" import httpx import deepgram logger.info(f"Calling deepgram to transcribe {audio_path}") assert not env.WDOC_PRIVATE_MODE, ( "Private mode detected, aborting before trying to use deepgram's API" ) assert ( "DEEPGRAM_API_KEY" in os.environ and not os.environ["DEEPGRAM_API_KEY"] == "REDACTED_BECAUSE_WDOC_IN_PRIVATE_MODE" ), "No environment variable DEEPGRAM_API_KEY found" # client try: client = deepgram.DeepgramClient() except Exception as err: raise Exception(f"Error when creating deepgram client: '{err}'") # set options options = dict( # docs: https://playground.deepgram.com/?endpoint=listen&smart_format=true&language=en&model=nova-3 model="nova-3", detect_language=True, # not all features below are available for all languages # intelligence summarize=False, topics=False, intents=False, sentiment=False, # transcription smart_format=True, punctuate=True, paragraphs=True, utterances=True, diarize=True, # redact=None, # replace=None, # search=None, # keywords=None, # filler_words=False, ) if deepgram_kwargs is None: deepgram_kwargs = {} if "language" in deepgram_kwargs and deepgram_kwargs["language"]: del options["detect_language"] options.update(deepgram_kwargs) options = deepgram.PrerecordedOptions(**options) # load file with open(audio_path, "rb") as f: payload = {"buffer": f.read()} # get content t = time.time() content = client.listen.prerecorded.v("1").transcribe_file( payload, options, timeout=httpx.Timeout(300.0, connect=10.0), # timeout for large files ) logger.info(f"Done deepgram transcribing {audio_path} in {int(time.time() - t)}s") d = content.to_dict() return d @doc_loaders_cache.cache(ignore=["audio_path"]) def transcribe_audio_whisper( audio_path: Union[Path, str], audio_hash: str, language: Optional[str], prompt: Optional[str], ) -> dict: "Use whisper to transcribe an audio file" import requests import litellm logger.info(f"Calling openai's whisper to transcribe {audio_path}") if env.WDOC_PRIVATE_MODE: assert env.WDOC_WHISPER_ENDPOINT, ( "WDOC_PRIVATE_MODE is set but no WDOC_WHISPER_ENDPOINT is set. Crashing as it seems like your private request would call a remote API" ) assert env.WDOC_WHISPER_API_KEY == "REDACTED_BECAUSE_WDOC_IN_PRIVATE_MODE", ( "No environment variable WDOC_WHISPER_API_KEY found" ) else: assert ( "OPENAI_API_KEY" in os.environ and not os.environ["OPENAI_API_KEY"] == "REDACTED_BECAUSE_WDOC_IN_PRIVATE_MODE" ) or ( env.WDOC_WHISPER_API_KEY and not env.WDOC_WHISPER_API_KEY == "REDACTED_BECAUSE_WDOC_IN_PRIVATE_MODE" ), "No environment variable OPENAI_API_KEY nor WDOC_WHISPER_API_KEY found" try: t1 = time.time() with open(audio_path, "rb") as audio_file: # Prepare transcription arguments transcription_kwargs = { "model": env.WDOC_WHISPER_MODEL, "file": audio_file, "prompt": prompt, "language": language, "temperature": 0, "response_format": "verbose_json", } # Add custom endpoint and API key if provided if env.WDOC_WHISPER_ENDPOINT: transcription_kwargs["api_base"] = env.WDOC_WHISPER_ENDPOINT logger.debug( f"Using custom whisper endpoint: {env.WDOC_WHISPER_ENDPOINT}" ) if env.WDOC_WHISPER_API_KEY: transcription_kwargs["api_key"] = env.WDOC_WHISPER_API_KEY logger.debug("Using custom whisper API key") try: transcript = litellm.transcription(**transcription_kwargs).json() except Exception as litellm_err: logger.warning( f"litellm.transcription failed with error: {litellm_err}. " f"Falling back to direct requests call to whisper endpoint." ) # Fallback to direct requests call if not env.WDOC_WHISPER_ENDPOINT: raise Exception( "litellm failed and no WDOC_WHISPER_ENDPOINT set for fallback" ) from litellm_err # Prepare the multipart form data files = {"file": audio_file} data = { "model": env.WDOC_WHISPER_MODEL, "response_format": "verbose_json", "temperature": 0, } if prompt: data["prompt"] = prompt if language: data["language"] = language headers = {} if env.WDOC_WHISPER_API_KEY: headers["Authorization"] = f"Bearer {env.WDOC_WHISPER_API_KEY}" # Make the request endpoint_url = ( env.WDOC_WHISPER_ENDPOINT.rstrip("/") + "/v1/audio/transcriptions" ) response = requests.post( endpoint_url, files=files, data=data, headers=headers ) response.raise_for_status() transcript = response.json() t2 = time.time() logger.info(f"Done transcribing {audio_path} in {int(t2 - t1)}s") except Exception as e: if "Maximum content size limit" in str(e): audio_splits = split_too_large_audio(audio_path) # reconstitute appropriate durations transcripts = [] if env.WDOC_WHISPER_PARALLEL_SPLITS: logger.info(f"Processing {len(audio_splits)} audio splits in parallel") def process_audio_split(f: Path) -> dict: """Process a single audio split file.""" h = file_hasher({"path": f}) return transcribe_audio_whisper( audio_path=f, audio_hash=h, language=language, prompt=prompt, ) # Process splits in parallel using joblib transcripts = joblib.Parallel( n_jobs=-1, backend="threading", )(joblib.delayed(process_audio_split)(f) for f in audio_splits) else: logger.warning( "Using sequential processing for whisper over audio splits" ) for f in audio_splits: h = file_hasher({"path": f}) temp = transcribe_audio_whisper( audio_path=f, audio_hash=h, language=language, prompt=prompt, ) transcripts.append(temp) if len(transcripts) == 1: return transcripts[0] logger.info(f"Combining {len(transcripts)} audio splits into a single json") ref = transcripts.pop(0) if ref["words"] is not None: logger.warning( "Warning: the transcript contains a 'words' output, which will be discarded as the combination of word timestamps is not yet supported." ) ref["words"] = None for itrans, trans in enumerate(transcripts): assert trans["task"] == ref["task"] if trans["language"] != ref["language"]: logger.warning( f"Warning: the language of the reference split audio ({ref['language']}) is not the same as the language of the current split ({trans['language']})" ) if trans["words"] is not None: logger.warning( "Warning: the transcript contains a 'words' output, which will be discarded as the combination of word timestamps is not yet supported." ) trans["words"] = None temp = trans["segments"] for it, t in enumerate(temp): temp[it]["end"] += ref["duration"] temp[it]["start"] += ref["duration"] ref["segments"].extend(temp) ref["duration"] += trans["duration"] ref["text"] += " [note: audio was split here] " + trans["text"] return ref else: raise return transcript
[docs] def split_too_large_audio( audio_path: Union[Path, str], ) -> List[Path]: """Whisper has a file size limit of about 25mb. If we hit that limit, we split the audio file into multiple 30 minute files, then combine the outputs """ import ffmpeg audio_path = Path(audio_path) logger.info( f"Splitting large audio file '{audio_path}' into 30minute segment because it's too long for whisper" ) split_folder = audio_path.parent / (audio_path.stem + "_splits") split_folder.mkdir(exist_ok=False) ext = audio_path.suffix ffmpeg.input(str(audio_path.absolute())).output( str((split_folder / f"split__%03d.{ext}").absolute()), c="copy", f="segment", segment_time=1600, # 30 minute by default ).run() split_files = [f for f in split_folder.iterdir()] assert split_files return split_files