Source code for wdoc.utils.loaders.anki

import json
import re
import shutil
import warnings
from pathlib import Path

import bs4
import uuid6
from beartype.typing import Dict, List, Optional, Tuple, Union
from langchain_core.documents import Document
from langchain_text_splitters import TextSplitter
from loguru import logger
from tqdm.asyncio import tqdm

from wdoc.utils.env import env, is_out_piped
from wdoc.utils.loaders.shared import debug_return_empty
from wdoc.utils.misc import file_hasher, html_to_text, optional_strip_unexp_args

import pandas as pd

clozeregex = re.compile(r"{{c\d+::|}}")  # for removing clozes in anki
anki_replacements_regex = re.compile(r"\{([^}]*)\}")

REG_IMG = re.compile(r"<img .*?src=.*?/?>", flags=re.MULTILINE | re.DOTALL)

REG_SOUNDS = re.compile(
    r"\[sound:\w+\.\w{2,3}\]",
)
REG_LINKS = re.compile(
    r"[A-Za-z0-9]+://[A-Za-z0-9%-_]+(?:/[A-Za-z0-9%-_])*(?:#|\\?)[A-Za-z0-9%-_&=]*",
)

STR_IMAGE_OCR = "{image_ocr_alt}"


[docs] def cloze_stripper(clozed: str) -> str: clozed = clozeregex.sub(" ", clozed) return clozed
[docs] @debug_return_empty @optional_strip_unexp_args def load_anki( verbose: bool, text_splitter: TextSplitter, loaders_temp_dir: Path, anki_profile: Optional[str] = None, anki_deck: Optional[str] = None, anki_notetype: Optional[str] = None, anki_template: Optional[str] = "{allfields}\n" + STR_IMAGE_OCR, anki_tag_filter: Optional[str] = None, anki_tag_render_filter: Optional[str] = None, ) -> List[Document]: import ankipandas as akp if anki_tag_render_filter: assert "{tags}" in anki_template, ( "Can't use anki_tag_render_filter without using {tags} in anki_template" ) try: anki_tag_render_filter = re.compile(anki_tag_render_filter) except Exception as err: raise Exception(f"Failed to compile anki_tag_render_filter: '{err}'") if anki_tag_filter: try: anki_tag_filter = re.compile(anki_tag_filter) except Exception as err: raise Exception(f"Failed to compile anki_tag_filter: '{err}'") if not anki_profile: original_db = akp.find_db() anki_profile = original_db.parent.name logger.info(f"Detected anki profile: '{anki_profile}'") logger.info(f"Loading anki profile: '{anki_profile}'") original_db = akp.find_db(user=anki_profile) name = f"{anki_profile}".replace(" ", "_") random_val = str(uuid6.uuid6()) new_db_path = ( loaders_temp_dir / f"anki_collection_{name.replace('/', '_')}_{random_val}" ) assert not Path(new_db_path).exists(), f"{new_db_path} already existing!" shutil.copy(original_db, new_db_path) col = akp.Collection(path=new_db_path) cards = col.cards.merge_notes() if verbose and not is_out_piped: tqdm.pandas() def pbar(*x, **y): tqdm.pandas(*x, **y) else: pd.DataFrame.progress_apply = pd.DataFrame.apply pd.Series.progress_apply = pd.Series.apply def pbar(*x, **y): pass cards.loc[cards["codeck"] == "", "codeck"] = cards["cdeck"][cards["codeck"] == ""] cards["codeck"] = cards["codeck"].progress_apply(lambda x: x.replace("\x1f", "::")) if anki_deck: cards = cards[cards["codeck"].str.startswith(anki_deck)] cards["nmodel"] = cards["nmodel"].progress_apply(lambda x: x.lower()) if anki_notetype: cards = cards[cards["nmodel"].str.contains(anki_notetype, case=False)] assert not cards.empty, ( f"No cards found after filtering by notetype {anki_notetype}" ) if anki_tag_filter: pbar(desc="Filtering by tags") cards = cards[ cards.progress_apply( (lambda x: any(anki_tag_filter.match(t) for t in x["ntags"])), axis=1 ) ] assert not cards.empty, ( f"No cards found after filtering by tags: {anki_tag_filter}" ) # remove suspended cards = cards[cards["cqueue"] != "suspended"] # merge models and fields for easy handling cards["mid"] = col.cards.mid.loc[cards.index] mid2fields = akp.raw.get_mid2fields(col.db) # make the model fields lowercase mid2fields = { k: (lambda x: [y.lower() for y in x])(v) for k, v in mid2fields.items() } # mod2mid = akp.raw.get_model2mid(col.db) cards["fields_name"] = cards["mid"].progress_apply(lambda x: mid2fields[x]) assert not cards.empty, "empty dataframe!" # remove duplicate, essentially making cards the same thing as notes cards = cards.drop_duplicates(subset="nid", keep="first") notes = cards.reset_index().set_index("nid") # check placeholders validity placeholders = [ph.lower() for ph in anki_replacements_regex.findall(anki_template)] assert placeholders, f"No placeholder found in anki_template '{anki_template}'" for ph in placeholders: for ic, c in notes.iterrows(): if ph not in c["fields_name"] + ["allfields", "tags", STR_IMAGE_OCR[1:-1]]: raise Exception( "A placeholder in anki template didn't match fields of " f"a card.\nCulprit placeholder: {ph}\nTemplate: " f"{anki_template}\nExample card: {c}" ) # prepare field values if "{allfields}" in anki_template: useallfields = True pbar(desc="Parsing allfields value") notes["allfields"] = notes.progress_apply( lambda x: "\n\n".join( [ f"{k.lower()}: '{html_to_text(cloze_stripper(v)).strip()}'" for k, v in zip(x["fields_name"], x["nflds"]) if v.strip() ] ), axis=1, ) else: useallfields = False if STR_IMAGE_OCR in anki_template: useimageocr = True else: useimageocr = False if "{tags}" in anki_template: usetags = True pbar(desc="Formatting tags") notes["tags_formatted"] = notes.progress_apply( lambda x: ( ( "\n" + "\n".join( [ t for t in x["ntags"] if ( anki_tag_render_filter is None or anki_tag_render_filter.match(t) ) ] ).strip() + "\n" ) if x["ntags"] else "" ), axis=1, ) if notes["ntags"].notnull().any(): assert notes["tags_formatted"].notnull().any(), ( "No tags were extracted because of your filter. Crashing to let you recheck your setup." ) else: usetags = False def placeholder_replacer(row: pd.Series) -> Tuple[str, dict]: text = anki_template if useallfields: text = text.replace("{allfields}", row["allfields"]) if usetags: text = text.replace("{tags}", row["tags_formatted"]) for ph in placeholders: if ph == "tags" or ph == "allfields" or ph == STR_IMAGE_OCR[1:-1]: continue field_val = row["nflds"][row["fields_name"].index(ph)] text = text.replace( "{" + ph + "}", html_to_text( cloze_stripper(field_val), ), ) text = text.replace("\\n", "\n").replace("\\xa0", " ") # replace media new_text, medias = replace_media( content=text, media=None, mode="remove_media", strict=False, replace_links=False, ) if medias: assert text != new_text text = new_text if useimageocr: image_keys = [k for k in medias.keys() if "IMAGE" in k] for img_k in image_keys: img = bs4.BeautifulSoup(medias[img_k], "html.parser") title = img.get("title").strip() if img.has_attr("title") else "" alt = img.get("alt").strip() if img.has_attr("alt") else "" ocr_alt = "" if title: ocr_alt += f"\nTitle: '{title}'" if alt: ocr_alt += f"\nAlt: '{alt}'" ocr_alt = ocr_alt.strip() if ocr_alt: text = text.replace( STR_IMAGE_OCR, f"\n<OCR of '{k}'>\n{ocr_alt}\n</OCR of '{k}'>" + STR_IMAGE_OCR, ) text = text.replace(STR_IMAGE_OCR, "").strip() return text, medias pbar(desc="Formatting all cards") notes["medias"] = {} out = notes.progress_apply(placeholder_replacer, axis=1) notes["text"] = [t[0] for t in out] notes["medias"] = [t[1] for t in out] notes["text"] = notes["text"].progress_apply(lambda x: x.strip()) notes = notes[notes["text"].ne("")] # remove empty text # remove notes that contain an image, sound or link # notes = notes[~notes["text"].str.contains("\[IMAGE_")] # notes = notes[~notes["text"].str.contains("\[SOUND_")] # notes = notes[~notes["text"].str.contains("\[LINK_")] notes["text"] = notes["text"].apply(lambda x: x.strip()) notes = notes[notes["text"].ne("")] # remove empty text notes.drop_duplicates(subset="text", inplace=True) notes = notes.sort_index() docs = [] # load each card as a single document for nid, c in notes.iterrows(): assert c["codeck"], f"empty card_deck for nid {nid}" # turn the media into absolute paths medias = c["medias"] to_add = {} for k, v in medias.items(): assert k in c["text"], ( f"missing media '{k}' in text '{c['text']}' of card '{c}'" ) try: src = bs4.BeautifulSoup(v, "html.parser").find("img")["src"] assert src v = Path(original_db).parent / "collection.media" / src v = v.resolve() if v.exists(): if k in c["text"]: h = file_hasher({"path": str(v.absolute())})[:6] placeholder = f"IMAGE_{h}" medias[k] = None to_add[placeholder] = str(v.absolute()) c["text"] = c["text"].replace(k, placeholder) else: medias[k] = str(v.absolute()) except Exception: # it was probably not a file continue medias = {k: v for k, v in medias.items() if v is not None} if to_add: medias.update(to_add) assert all(k in c["text"] for k in to_add.keys()) # better formatting for tags ntags = [ nt # bettter for the tokenizer I guess # nt.replace("_", " ").replace("-", " ").replace("::", " > ") for nt in c["ntags"] ] docs.append( Document( page_content=c["text"], metadata={ "anki_tags": " ".join(ntags), "anki_nid": str(nid), "anki_deck": c["codeck"], "anki_modtime": int(c["cmod"]), "anki_media": json.dumps(medias, ensure_ascii=False), }, ) ) assert docs, "List of loaded anki document is empty!" path = ( f"Anki_profile='{anki_profile}',deck='{anki_deck}',notetype='{anki_notetype}'" ) for i in range(len(docs)): docs[i].metadata["anki_profile"] = anki_profile docs[i].metadata["anki_topdeck"] = anki_deck docs[i].metadata["anki_notetype"] = anki_notetype docs[i].metadata["path"] = path docs[i].metadata["anki_nid"] = " ".join( sorted(docs[i].metadata["anki_nid"].split(" ")) ) # delete temporary db file new_db_path.unlink() Path(str(new_db_path.absolute()) + "-shm").unlink(missing_ok=True) Path(str(new_db_path.absolute()) + "-wal").unlink(missing_ok=True) return docs
[docs] def replace_media( content: str, media: Union[None, Dict], mode: str, strict: bool = True, replace_image: bool = True, replace_links: bool = True, replace_sounds: bool = True, ) -> Tuple[str, Dict]: """ Else: exclude any note that contains in the content: * an image (<img...) * or a sound [sound:... * or a link href / http This is because: 1 as LLMs are non deterministic I preferred to avoid taking the risk of botching the content 2 it costs less token The intended use is to call it first to replace each media by a simple string like [IMAGE_1] and check if it's indeed present in the output of the LLM then replace it back. It uses both bs4 and regex to be sure of itself """ # ignore warnings from beautiful soup that can happen because anki is not exactly html warnings.filterwarnings("ignore", category=UserWarning, module="bs4") assert mode in ["add_media", "remove_media"] assert content.strip() if media is None: media = {} assert isinstance(media, dict) assert any(rule for rule in [replace_sounds, replace_links, replace_image]) if mode == "remove_media": assert not media images = [] sounds = [] links = [] if replace_links: # fix links common issues content = content.replace(":// ", "://") content = content.replace("http ://", "http://") content = content.replace("https ://", "http://") # Images if replace_image and "<img" in content: soup = bs4.BeautifulSoup(content, "html.parser") images_bs4 = [str(img) for img in soup.find_all("img")] # fix bs4 parsing as ending with /> instead of > images_bs4 = [ ( img[:-2] + ">" if ((img not in content) and img[:-2] + ">" in content) else img ) for img in images_bs4 ] images_reg = re.findall(REG_IMG, content) if len(images_bs4) != len(images_reg): if env.WDOC_VERBOSE: logger.warning( f"Different images found:\nbs4: {images_bs4}\nregex: {images_reg}\nContent: {content}" ) if images_bs4 and not images_reg: images = [str(img) for img in images_bs4] elif (not images_bs4) and images_reg: images = [str(img) for img in images_reg] else: images = [str(img) for img in images_bs4] try: assert images, f"no image found but should have. Text is '{content}'" except AssertionError as err: if strict: raise logger.warning(err) for iimg, img in enumerate(images): try: assert img in content, ( f"missing img from content:\nimg: {img}\ncontent: {content}" ) assert re.search(REG_IMG, img), ( f"Regex couldn't identify img: {img}" ) assert not re.search(REG_SOUNDS, img), ( f"Sound regex identifier img: {img}" ) except AssertionError as err: if strict: raise logger.warning(err) images[iimg] = None images = [i for i in images if i is not None] images = list(set(images)) # Sounds if replace_sounds and "[sounds:" in content: sounds = re.findall(REG_SOUNDS, content) try: assert sounds, f"No sounds found but should have. Content: {content}" except AssertionError as err: if strict: raise logger.warning(err) for isound, sound in enumerate(sounds): try: assert sound in content, f"Sound is not in content: {sound}" assert not re.search(REG_IMG, sound), ( f"Image regex identified this sound: {sound}" ) assert re.search(REG_SOUNDS, sound), ( f"Regex didn't identify this sound: {sound}" ) except AssertionError as err: if strict: raise logger.warning(err) sounds[isound] = None sounds = [s for s in sounds if s is not None] sounds = list(set(sounds)) # links if replace_links and "://" in content: links = re.findall(REG_LINKS, content) links = [ link for link in links if not any(other != link and other in link for other in links) ] if strict: assert links, "No links found" for ilink, link in enumerate(links): try: assert link in content, ( f"Link not in content:\nlink: {link}\ncontent: {content}" ) assert re.search(REG_LINKS, link), ( f"Regex couldn't identify link: {link}" ) except AssertionError as err: if strict: raise logger.warning(err) links[ilink] = None links = [li for li in links if li is not None] links = list(set(links)) if not images + sounds + links: return content, {} new_content = content # do the replacing for i, img in enumerate(images): assert replace_image, replace_image try: assert img in content, f"img '{img}' not in content '{content}'" assert img in new_content, ( f"img '{img}' not in new_content '{new_content}'" ) assert img not in media.keys() and img not in media.values() replaced = f"[IMAGE_{i + 1}]" assert replaced not in media.keys() and replaced not in media.values() assert replaced not in content, ( f"Replaced '{replaced}' already in content '{content}'" ) assert replaced not in new_content, ( f"Replaced '{replaced}' already in new_content '{new_content}'" ) new_content = new_content.replace(img, replaced) media[replaced] = img assert img not in new_content assert replaced in new_content except AssertionError as err: if strict: raise logger.warning(f"Failed assert when replacing image: '{err}'") continue for i, sound in enumerate(sounds): try: assert replace_sounds assert sound in content assert sound in new_content assert sound not in media.keys() and sound not in media.values() replaced = f"[SOUND_{i + 1}]" assert replaced not in media.keys() and replaced not in media.values() assert replaced not in content assert replaced not in new_content new_content = new_content.replace(sound, replaced) media[replaced] = sound assert sound not in new_content assert replaced in new_content except AssertionError as err: if strict: raise logger.warning(f"Failed assert when replacing sounds: '{err}'") continue for i, link in enumerate(links): try: assert replace_links assert link in content assert link not in media.keys() replaced = f"[LINK_{i + 1}]" assert replaced not in media.keys() and replaced not in media.values() assert replaced not in content assert replaced not in new_content assert link in new_content or len( [val for val in media.values() if link in val] ) if link not in new_content: continue else: new_content = new_content.replace(link, replaced) media[replaced] = link assert link not in new_content assert replaced in new_content except AssertionError as err: if strict: raise logger.warning(f"Failed assert when replacing links: '{err}'") continue # check no media can be found anymore if replace_image: if strict: assert not re.findall(REG_IMG, new_content), new_content assert not bs4.BeautifulSoup(new_content, "html.parser").find_all( "img" ), new_content assert "<img" not in new_content, new_content elif "<img" in new_content: logger.warning(f"AnkiMediaReplacer: Found '<img' in '{new_content}'") if replace_sounds: if strict: assert not re.findall(REG_SOUNDS, new_content), new_content assert "[sound:" not in new_content, new_content elif "[sound:" in new_content: logger.warning(f"AnkiMediaReplacer: Found '[sound:' in '{new_content}'") if replace_links: if strict: assert not re.findall(REG_LINKS, new_content), new_content assert "://" not in new_content, new_content elif "://" in new_content: logger.warning(f"AnkiMediaReplacer: Found '://' in '{new_content}'") # check non empty temp = new_content for med, val in media.items(): temp = temp.replace(med, "") assert temp.strip() # recursive check: assert ( replace_media( content=new_content, media=media, mode="add_media", strict=strict, replace_image=replace_image, replace_links=replace_links, replace_sounds=replace_sounds, )[0] == content ) return new_content, media elif mode == "add_media": assert media # TODO check that all media are found new_content = content for med, val in media.items(): assert med in content assert val not in content assert val not in new_content new_content = new_content.replace(med, val) assert med not in new_content assert val in new_content return new_content, {} else: raise ValueError(mode)