import signal
import sys
import tempfile
import traceback
from functools import partial
from pathlib import Path
import ftfy
import openparse
import requests
from beartype.typing import List, Optional, Union
from langchain_core.documents import Document
from langchain_text_splitters import TextSplitter
from langchain_community.document_loaders import (
OnlinePDFLoader,
PDFMinerLoader,
PDFPlumberLoader,
PyMuPDFLoader,
PyPDFium2Loader,
PyPDFLoader,
UnstructuredPDFLoader,
)
from loguru import logger
from tqdm.asyncio import tqdm
from unstructured.cleaners.core import clean_extra_whitespace
from wdoc.utils.env import env, is_linux, is_out_piped
from wdoc.utils.errors import TimeoutPdfLoaderError
from wdoc.utils.loaders.shared import debug_return_empty, signal_timeout
from wdoc.utils.misc import (
check_docs_tkn_length,
doc_loaders_cache,
file_hasher,
max_token,
min_lang_prob,
min_token,
optional_strip_unexp_args,
)
try:
import pdftotext
except Exception as err:
if env.WDOC_VERBOSE:
logger.warning(f"Failed to import optional package 'pdftotext': '{err}'")
if is_linux:
logger.warning(
"On linux, you can try to install pdftotext with :\nsudo "
"apt install build-essential libpoppler-cpp-dev pkg-config "
"python3-dev\nThen:\nuv pip install pdftotext"
)
[docs]
class OpenparseDocumentParser:
def __init__(
self,
path: Union[str, Path],
table_args: Optional[dict] = {
"parsing_algorithm": "pymupdf",
"table_output_format": "markdown",
},
# table_args: Optional[dict] = None,
) -> None:
self.path = path
self.table_args = table_args
[docs]
def load(self) -> List[Document]:
parser = openparse.DocumentParser(table_args=self.table_args)
self.parsed = parser.parse(self.path)
base_metadata = self.parsed.dict()
nodes = base_metadata["nodes"]
assert nodes, "No nodes found"
del base_metadata["nodes"]
docs = []
for node in nodes:
meta = base_metadata.copy()
meta.update(node)
assert meta["bbox"], "No bbox found"
meta["page"] = meta["bbox"][0]["page"]
text = meta["text"]
del meta["text"], meta["bbox"], meta["node_id"], meta["tokens"]
if meta["embedding"] is None:
del meta["embedding"]
doc = Document(
page_content=text,
metadata=meta,
)
if not docs:
docs.append(doc)
elif docs[-1].metadata["page"] != meta["page"]:
docs.append(doc)
else:
docs[-1].page_content += "\n" + doc.page_content
for k, v in doc.metadata.items():
if k not in docs[-1].metadata:
docs[-1].metadata[k] = v
else:
val = docs[-1].metadata[k]
if v == val:
continue
elif isinstance(val, list):
if v not in val:
if isinstance(v, list):
docs[-1].metadata[k].extend(v)
else:
docs[-1].metadata[k].append(v)
else:
docs[-1].metadata[k] = [val, v]
self.docs = docs
return docs
pdf_loaders = {
"pymupdf": PyMuPDFLoader, # good for metadata
"pdfplumber": PDFPlumberLoader, # good for metadata
"pdfminer": PDFMinerLoader, # little metadata
"pypdfloader": PyPDFLoader, # little metadata
"pypdfium2": PyPDFium2Loader, # little metadata
# "pdftotext": None, # optional support, see below
"openparse": OpenparseDocumentParser, # gets page number too, finds individual elements, kinda slow but good, optional table support
"unstructured_fast": partial(
UnstructuredPDFLoader,
strategy="fast",
),
"unstructured_elements_fast": partial(
UnstructuredPDFLoader,
mode="elements",
strategy="fast",
),
"unstructured_hires": partial(
UnstructuredPDFLoader,
strategy="hi_res",
),
"unstructured_elements_hires": partial(
UnstructuredPDFLoader,
mode="elements",
strategy="hi_res",
),
"unstructured_fast_clean_table": partial(
UnstructuredPDFLoader,
strategy="fast",
post_processors=[clean_extra_whitespace],
infer_table_structure=True,
# languages=["en"],
),
"unstructured_elements_fast_clean_table": partial(
UnstructuredPDFLoader,
mode="elements",
strategy="fast",
post_processors=[clean_extra_whitespace],
infer_table_structure=True,
# languages=["en"],
),
"unstructured_hires_clean_table": partial(
UnstructuredPDFLoader,
strategy="hi_res",
post_processors=[clean_extra_whitespace],
infer_table_structure=True,
# languages=["en"],
),
"unstructured_elements_hires_clean_table": partial(
UnstructuredPDFLoader,
mode="elements",
strategy="hi_res",
post_processors=[clean_extra_whitespace],
infer_table_structure=True,
# languages=["en"],
),
}
# pdftotext is kinda weird to install on windows so support it
# only if it's correctly imported
if "pdftotext" in sys.modules:
class pdftotext_loader_class:
"simple wrapper for pdftotext to make it load by pdf_loader"
def __init__(self, path: Union[str, Path]) -> None:
self.path = path
def load(self) -> List[Document]:
with open(self.path, "rb") as f:
docs = [
Document(page_content=d, metadata={"page": idoc})
for idoc, d in enumerate(pdftotext.PDF(f))
]
return docs
pdf_loaders["pdftotext"] = pdftotext_loader_class
pdf_loader_max_timeout = env.WDOC_MAX_PDF_LOADER_TIMEOUT
if env.WDOC_VERBOSE:
if pdf_loader_max_timeout > 0:
logger.warning(f"Will use a PDF loader timeout of {pdf_loader_max_timeout}s")
else:
logger.warning("Not using a pdf loader timeout")
@debug_return_empty
@optional_strip_unexp_args
@doc_loaders_cache.cache
def load_online_pdf(
path: Union[str, Path],
text_splitter: TextSplitter,
file_hash: str,
pdf_parsers: Union[str, List[str]] = "pymupdf", # used only if online loading fails
doccheck_min_lang_prob: float = min_lang_prob,
doccheck_min_token: int = min_token,
doccheck_max_token: int = max_token,
) -> List[Document]:
logger.info(f"Loading online pdf: '{path}'")
try:
response = requests.get(path)
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_file:
temp_file.write(response.content)
temp_file.flush()
docs = load_pdf(
path=temp_file.name,
text_splitter=text_splitter,
file_hash=file_hasher({"path": temp_file.name}),
pdf_parsers=pdf_parsers,
doccheck_min_lang_prob=doccheck_min_lang_prob,
doccheck_min_token=doccheck_min_token,
doccheck_max_token=doccheck_max_token,
)
return docs
except Exception as err:
logger.warning(
f"Failed parsing online PDF {path} by downloading it and trying to parse because of error '{err}'. Retrying one last time with OnlinePDFLoader."
)
loader = OnlinePDFLoader(path)
if pdf_loader_max_timeout > 0:
with signal_timeout(
timeout=pdf_loader_max_timeout,
exception=TimeoutPdfLoaderError,
):
docs = loader.load()
try:
signal.alarm(0) # disable alarm again just in case
except Exception:
pass
else:
docs = loader.load()
return docs
@doc_loaders_cache.cache(ignore=["path"])
def _pdf_loader(
loader_name: str, path: Union[str, Path], file_hash: str
) -> List[Document]:
loader = pdf_loaders[loader_name](path)
docs = loader.load()
assert isinstance(docs, list), f"Output of {loader_name} is of type {type(docs)}"
assert all(isinstance(d, Document) for d in docs), (
f"Output of {loader_name} contains elements that are not Documents: {[type(c) for c in docs]}"
)
# remove empty documents
docs = [d for d in docs if d.page_content.strip()]
return docs
[docs]
@debug_return_empty
@optional_strip_unexp_args
def load_pdf(
path: Union[str, Path],
text_splitter: TextSplitter,
file_hash: str,
pdf_parsers: Union[str, List[str]] = "pymupdf",
doccheck_min_lang_prob: float = min_lang_prob,
doccheck_min_token: int = min_token,
doccheck_max_token: int = max_token,
) -> List[Document]:
path = Path(path)
logger.info(f"Loading pdf: '{path}'")
assert path.exists(), f"file not found: '{path}'"
name = path.name
if len(name) > 30:
name = name[:15] + "..." + name[-15:]
if isinstance(pdf_parsers, str):
pdf_parsers = pdf_parsers.strip().split(",")
assert pdf_parsers, "No pdf_parsers found"
assert len(pdf_parsers) == len(set(pdf_parsers)), (
f"You pdf_parsers list contains non unique elements. List: {pdf_parsers}"
)
for pdfp in pdf_parsers:
assert pdfp in pdf_loaders, (
f"The PDF loader '{pdfp}' was not present in the pdf_loaders keys. Your 'pdf_parsers' argument seems wrong."
)
loaded_docs = {}
# using language detection to keep the parsing with the highest lang
# probability
probs = {}
passed_errs = []
warned_errs = []
info = "magic not run"
try:
import magic
info = str(magic.from_file(path))
except Exception as err:
logger.warning(f"Failed to run python-magic: '{err}'")
if "pdf" not in info.lower():
logger.debug(
f"WARNING: magic says that your PDF is not a PDF:\npath={path}\nMagic info='{info}'"
)
pbar = tqdm(
total=len(pdf_parsers),
desc=f"Parsing PDF {name}",
unit="loader",
disable=is_out_piped,
)
for loader_name in pdf_parsers:
pbar.desc = f"Parsing PDF {name} with {loader_name}"
try:
if env.WDOC_DEBUG:
logger.warning(f"Trying to parse {path} using {loader_name}")
if pdf_loader_max_timeout > 0:
with signal_timeout(
timeout=pdf_loader_max_timeout,
exception=TimeoutPdfLoaderError,
):
docs = _pdf_loader(loader_name, str(path), file_hash)
try:
signal.alarm(0) # disable alarm again just in case
except Exception:
pass
else:
docs = _pdf_loader(loader_name, path, file_hash)
pbar.update(1)
for i, d in enumerate(docs):
try:
pc = ftfy.fix_text(d.page_content)
docs[i].page_content = pc
# stupid pydantic error
except Exception as err:
if "'dict' object has no attribute 'add'" in str(err):
pass
else:
raise
if "pdf_loader_name" not in docs[i].metadata:
docs[i].metadata["pdf_loader_name"] = loader_name
prob = check_docs_tkn_length(
docs=docs,
identifier=path,
check_language=True,
min_lang_prob=doccheck_min_lang_prob,
min_token=doccheck_min_token,
max_token=doccheck_max_token,
)
if prob >= 0.5:
# only consider it okay if decent quality
probs[loader_name] = prob
loaded_docs[loader_name] = docs
if prob > 0.95:
# select this one as its bound to be okay
logger.info(
f"Early stopping of PDF parsing because {loader_name} has prob {prob} for {path}"
)
break
else:
logger.info(
f"Ignore parsing by {loader_name} of '{path}' as it seems of poor quality: prob={prob}"
)
continue
if len(probs.keys()) >= 3:
# if more than 3 worked, take the best among them to save
# time on running all the others
break
except Exception as err:
if pdf_loader_max_timeout > 0:
try:
signal.alarm(0) # disable alarm again just in case
except Exception:
pass
if "content" not in locals():
pbar.update(1)
logger.debug(
f"Error when parsing '{path}' with {loader_name}: {err}\nMagic info='{info}'"
)
if (
str(err) in passed_errs
and str(err) not in warned_errs
and "token" not in str(err)
):
exc_type, exc_obj, exc_tb = sys.exc_info()
formatted_tb = "\n".join(
[str(li).strip() for li in traceback.format_tb(exc_tb)]
)
logger.warning(
f"The same error happens to multiple pdf loader, something is fishy.\nFull traceback:\n{formatted_tb}"
)
warned_errs.append(str(err))
passed_errs.append(str(err))
pbar.close()
assert probs.keys(), f"No pdf parser succeeded to parse {path}"
# no loader worked, exiting
if not loaded_docs:
raise Exception(f"No pdf parser worked for {path}")
max_prob = max([v for v in probs.values()])
if env.WDOC_DEBUG:
logger.debug(f"Language probability after parsing {path}: {probs}")
return loaded_docs[[name for name in probs if probs[name] == max_prob][0]]