Parser
parser
| MODULE | DESCRIPTION |
|---|---|
main |
Main module of the application. |
src |
Source code of the parser containing core components and utilities. |
main
Main module of the application.
This module serves as the entry point for the program. It imports necessary modules, sets up any initial configuration or data structures, and possibly defines main functions or classes that are used throughout the application.
src
Source code of the parser containing core components and utilities.
| MODULE | DESCRIPTION |
|---|---|
app |
Initializes the app. |
chunker |
Performs chunking for parsed files (pdf, docx, txt) as well as cleaned text. |
chunker_init |
Computes chunking paremeters to initalize the chunker with. |
docling_model_init |
Initializes the docling pdf parser by downloading nesseccary models. |
endpoints |
Defines all endpoints of the FastAPI app. |
models |
Models loading and checking API and configuration parameters. |
parser |
Defines the parsing class. |
settings |
Load all settings from a central place, not hidden in utils. |
utils |
Utils functions for logging and configuration processing. |
app
Initializes the app.
chunker
Performs chunking for parsed files (pdf, docx, txt) as well as cleaned text.
| CLASS | DESCRIPTION |
|---|---|
Chunker |
Chunks cleaned text and parsing outputs from pdf, docx and txt files. |
Chunker
Chunks cleaned text and parsing outputs from pdf, docx and txt files.
This chunker is optimized to handle the parsing output of the F13 parsing micro-service. The resulting chunks are a list of Document. Each document encompasses the text and metadata for each chunk.
| ATTRIBUTE | DESCRIPTION |
|---|---|
max_chunk_size |
Maximal number of characters per chunk.
TYPE:
|
min_chunk_size |
Minimal number of characters per chunk.
TYPE:
|
include_headings |
Whether headlines should be included or excluded from each chunks content.
TYPE:
|
text |
Parsed and cleaned text or file that needs to be chunked.
TYPE:
|
text_length |
Length of parsed text as number of characters.
TYPE:
|
filetype |
Type of parsing output (markdown, text or docling).
TYPE:
|
filename |
Name of parsed file.
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
chunk_parsed_file |
Performs chunking on the input file of type pdf, docx or txt. |
chunk_parsed_text |
Performs chunking of text input. |
Source code in docs/microservices/parser/src/chunker.py
class Chunker:
"""Chunks cleaned text and parsing outputs from pdf, docx and txt files.
This chunker is optimized to handle the parsing output of the F13 parsing micro-service.
The resulting chunks are a list of Document.
Each document encompasses the text and metadata for each chunk.
Attributes:
max_chunk_size (conint(ge=2)): Maximal number of characters per chunk.
min_chunk_size (PositiveInt): Minimal number of characters per chunk.
include_headings (IncludeHeadings): Whether headlines should be included or excluded from each chunks content.
text (str | DoclingDocument): Parsed and cleaned text or file that needs to be chunked.
text_length (int): Length of parsed text as number of characters.
filetype (str): Type of parsing output (markdown, text or docling).
filename (str): Name of parsed file.
"""
def __init__(self, chunking_parameters: ChunkingParameters) -> None:
"""Initializes attributes of chunker instance.
The chunker demands the following ratio of minimal and maximal chunksize to ensure that all chunks are within
the desired chunk size range: min_chunk_size * 2 <= max_chunk_size.
Args:
chunking_parameters (ChunkingParameters): Panpydantic Model containing the attributes for this class.
"""
self.max_chunk_size = chunking_parameters.max_chunk_size
self.min_chunk_size = chunking_parameters.min_chunk_size
self.text = chunking_parameters.text
self.text_length = chunking_parameters.text_length
self.filetype = chunking_parameters.filetype
self.filename = chunking_parameters.filename
self.include_headings = chunking_parameters.include_headings
logger.debug("Chunker initialized.")
def chunk_parsed_text(self) -> list[Chunk]:
"""Performs chunking of text input.
Returns:
List of chunks with their content and metadata.
"""
chunks = self._chunk_text()
chunks = self._check_chunk_size(chunks=chunks)
logger.debug(
f"Text split into {len(chunks)} chunks. Chunking of text input finished."
)
chunks = self._transform_chunks(chunks=chunks)
return chunks
def _chunk_text(self) -> list[Document]:
"""Divides the text into chunks, while respecting the specified maximum chunk size.
This function is used to chunk text input as well as the contents of txt files.
The text will be split according to the defined separators.
Returns:
List of chunks with their content and metadata.
"""
separators = ["\n\n"]
splitter = RecursiveCharacterTextSplitter(
separators=separators, chunk_size=self.max_chunk_size, chunk_overlap=0
)
chunk_content = splitter.split_text(self.text)
chunks = splitter.create_documents(texts=chunk_content)
chunks = self._set_metadata(chunks=chunks)
return chunks
def _transform_chunks(self, chunks: list[Document]) -> list[Chunk]:
"""Transform a list of Chunks (as LangChain Documents) into a list of Chunks instances.
Args:
chunks (list[Document]): Chunks with their content and metadata.
Returns:
list[Chunk]: Chunks with their content and metadata.
"""
transformed_chunks = []
for chunk in chunks:
chunk_content = chunk.page_content
chunk_metadata = chunk.metadata
transformed_chunks.append(
Chunk(chunk_content=chunk_content, chunk_metadata=chunk_metadata)
)
return transformed_chunks
def chunk_parsed_file(self) -> list[Chunk]:
"""Performs chunking on the input file of type pdf, docx or txt.
Returns:
List of chunks with their content and metadata.
"""
if self.filetype == "markdown":
logger.debug("Started markdown chunking for docx parsing output.")
chunks = self._docx_chunking()
chunks = self._transform_chunks(chunks=chunks)
elif self.filetype == "text":
logger.debug("Started text chunking for txt parsing output.")
chunks = self._txt_chunking()
chunks = self._transform_chunks(chunks=chunks)
elif self.filetype == "docling":
logger.debug("Started docling chunking for pdf parsing output.")
chunks = self._pdf_chunking()
chunks = self._transform_chunks(chunks=chunks)
logger.debug("Chunking finished.")
return chunks
def _txt_chunking(self) -> list[Document]:
"""Chunking of txt parsing output.
Chunks output of txt parsing, adds metadata to each chunk and ensures
that these chunks are within the range of minimal and maximal chunk size.
Returns:
List of chunks with their content and metadata.
"""
chunks = self._chunk_text()
chunks = self._check_chunk_size(chunks=chunks)
return chunks
def _docx_chunking(self) -> list[Document]:
"""Chunking of docx parsing output.
Chunks output of docx parsing, adds metadata to each chunk and ensures
that these chunks are within the range of minimal and maximal chunk size.
Returns:
List of chunks with their content and metadata.
"""
chunks = self._chunk_by_markdown_header()
chunks = self._set_metadata(chunks=chunks)
if any("headings" in chunk.metadata for chunk in chunks):
# merge all chunks with the same heading before merging and splitting them according to chunk size
chunks = self._merge_chunks_by_headlines(chunks=chunks)
chunks = self._check_chunk_size(chunks=chunks)
return chunks
def _pdf_chunking(self) -> list[Document]:
"""Chunking of pdf parsing output.
Chunks output of docling pdf parsing, then merges chunks with the same heading,
then converts these chunks in Document and ensures
that these chunks are within the range of minimal and maximal chunk size.
Returns:
List of chunks with their content and metadata.
"""
chunks_content, chunks_metadata = self._docling_chunking()
chunks = self._chunks_to_langchain_docs(chunks_content=chunks_content)
chunks = self._set_metadata(chunks=chunks, input_metadata=chunks_metadata)
if any("headings" in chunk.metadata for chunk in chunks):
# merge all chunks with the same heading before merging and splitting them according to chunk size
chunks = self._merge_chunks_by_headlines(chunks=chunks)
chunks = self._check_chunk_size(chunks=chunks)
return chunks
def _docling_chunking(self) -> tuple[list, list]:
"""This function chunks text, which was parsed by docling and adds a series of metadata for each chunk.
Returns:
Chunks as tuple containing the chunks contents and the chunks metadata, both as lists:
- chunks_content (list(str)): List of text chunks.
- chunks_metadata (list(ChunkMetadata): List of metadata dictionaries for each chunk.
"""
chunker = HierarchicalChunker()
doc_chunks = list(chunker.chunk(self.text))
chunks_content = []
chunks_metadata = []
for chunk_num, chunk in enumerate(doc_chunks):
# get content
chunk_content = chunk.text
chunks_content.append(chunk_content)
# get metadata
if chunk.meta.headings:
headings = chunk.meta.headings
else:
headings = []
logger.debug("Chunks metadata do not encompass headings.")
try:
pages = [chunk.meta.doc_items[0].prov[0].page_no]
locations = [
{
"boundingbox": chunk.meta.doc_items[0].prov[0].bbox,
"charspan": chunk.meta.doc_items[0].prov[0].charspan,
}
]
except Exception:
pages = None
locations = None
logger.debug("Chunks metadata do not encompass pages or locations.")
chunk_metadata = ChunkMetadata(
chunk_number=chunk_num,
chunk_length=len(chunk.text),
filename=self.filename,
filetype=self.filetype,
headings=headings,
pages=pages,
locations=locations,
)
chunks_metadata.append(chunk_metadata)
return chunks_content, chunks_metadata
def _chunks_to_langchain_docs(
self, chunks_content: list[str], chunks_metadata: list[dict[str, Any]] = None
) -> list[Document]:
"""This function takes the content and metadata of chunks to create langchain documents for each of them.
Each resulting langchain document corresponds to one chunk.
Args:
chunks_content (list(str)): List of text as content of each chunk.
chunks_metadata (list(dict), optional): List of dictionaries as metadata for each chunk.
Returns:
List of chunks with their content and metadata.
"""
chunks = []
for chunk_num, chunk_content in enumerate(chunks_content):
if chunks_metadata:
document = Document(
page_content=chunk_content,
metadata=chunks_metadata[chunk_num],
)
else:
document = Document(page_content=chunk_content)
chunks.append(document)
return chunks
def _chunk_by_markdown_header(self) -> list[Document]:
"""Splits text formatted as markdown into chunks according to the texts headings.
Each chunk corresponds to one heading.
Returns:
List of chunks with their content and metadata.
"""
headers_to_split_on = [
("#", "Header 1"),
("##", "Header 2"),
("###", "Header 3"),
("####", "Header 4"),
]
splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
chunks = splitter.split_text(text=self.text)
for chunk in chunks:
if "headings" not in chunk.metadata:
existing_metadata = dict(chunk.metadata)
chunk.metadata["headings"] = []
if "Header 1" in existing_metadata:
chunk.metadata["headings"] = chunk.metadata["headings"] + [
existing_metadata["Header 1"]
]
if "Header 2" in existing_metadata:
chunk.metadata["headings"] = chunk.metadata["headings"] + [
existing_metadata["Header 2"]
]
if "Header 3" in existing_metadata:
chunk.metadata["headings"] = chunk.metadata["headings"] + [
existing_metadata["Header 3"]
]
if "Header 4" in existing_metadata:
chunk.metadata["headings"] = chunk.metadata["headings"] + [
existing_metadata["Header 4"]
]
return chunks
def _set_metadata(
self, chunks: list[Document], input_metadata: list[ChunkMetadata] = None
) -> list[Document]:
"""Set metadata to chunks.
This always covers chunk number and length as well as file name and file type.
Headings, page numbers and locations on the page can be set using the optional parameter metadata.
Hierarchical Headings from Markdown files will be inserted as list into headings.
Args:
chunks (list[Document]): List of chunks with their content and metadata.
input_metadata (list[dict], optional): List of ChunkMetadata for each chunk.
Returns:
List of chunks with updated metadata.
"""
for chunk_num, chunk in enumerate(chunks):
if not input_metadata:
chunk_metadata = ChunkMetadata(
chunk_number=chunk_num,
chunk_length=len(chunk.page_content),
filename=self.filename,
filetype=self.filetype,
headings=[],
pages=None,
locations=None,
)
else:
chunk_metadata = input_metadata[chunk_num]
chunk.metadata["chunk_number"] = chunk_metadata.chunk_number
chunk.metadata["chunk_length"] = chunk_metadata.chunk_length
chunk.metadata["filename"] = chunk_metadata.filename
chunk.metadata["filetype"] = chunk_metadata.filetype
if "headings" not in chunk.metadata:
chunk.metadata["headings"] = chunk_metadata.headings
chunk.metadata["pages"] = chunk_metadata.pages
chunk.metadata["locations"] = chunk_metadata.locations
return chunks
def _check_chunk_size(self, chunks: list[Document]) -> list[Document]:
"""Checks if chunks respect the minimal and maximal chunk size.
If the maximal chunk size is not respected, big chunks get split into smaller ones.
If the minimal chunk size is not respected, small chunk are merged till they reach the minimal chunk size.
Args:
chunks (List[Document]): List of chunks with their content and metadata.
Returns:
List of chunks with their content and metadata.
"""
if self.min_chunk_size > 1 or self.max_chunk_size != sys.maxsize:
# check if we need to respect a maximum chunk size
if self._check_max_chunk_size(chunks=chunks):
# ensure maximal chunk size
chunks = self._split_chunks(
chunks=chunks, maximal_character_count=self.max_chunk_size
)
# check if we need to respect a minimum chunk size
if self._check_min_chunk_size(chunks=chunks):
# ensure minimal chunk size
chunks = self._merge_small_chunks(chunks=chunks)
# if min and max chunk size are defined, check if we need to repeat the process:
if self.min_chunk_size > 1 and self.max_chunk_size != sys.maxsize:
recursion_count = 0
max_recursions = 5
while self._check_max_chunk_size(
chunks=chunks
) or self._check_min_chunk_size(chunks=chunks):
# check if we need to cut or losses
if recursion_count == max_recursions:
logger.warning(
f"Chunksize range could not be respected after {max_recursions} resizing recursions."
" Please increase the difference between min and max chunk size. "
"The maximal chunk size has to be at least twice as large as minimal chunksize!"
)
break
# fix chunk sizes by splitting or merging chunks
if self._check_max_chunk_size(chunks=chunks):
chunks = self._split_chunks(
chunks=chunks, maximal_character_count=self.min_chunk_size
)
if self._check_min_chunk_size(chunks=chunks):
chunks = self._merge_small_chunks(chunks=chunks)
recursion_count += 1
logger.debug(
f"{recursion_count}. refinement of chunk size finished."
)
if any("headings" in chunk.metadata for chunk in chunks):
new_chunks = [
self._include_headlines_in_chunk_contents(chunk=chunk)
for chunk in chunks
]
chunks = self._update_metadata(chunks=new_chunks)
return chunks
def _check_min_chunk_size(self, chunks: list[Document]) -> bool:
"""Checks if chunks respect the minimal chunk size and if not merges chunks into bigger ones.
Args:
chunks (list[Document]): List of chunks with their content and metadata.
Returns:
True if we need to merge chunks in order to guarantee min chunk size.
False if all chunks have the minimal chunk size.
"""
merge_chunks = False
# check if chunks are to small
if self.min_chunk_size > 1:
for chunk in chunks:
if len(chunk.page_content) < self.min_chunk_size:
merge_chunks = True
break
logger.debug(f"Is min chunk size respected? {not merge_chunks}")
return merge_chunks
def _check_max_chunk_size(self, chunks: list[Document]) -> bool:
"""Checks if chunks respect the maximal chunk size and if not split big chunks into smaller ones.
Args:
chunks (list[Document]): List of chunks with their content and metadata.
Returns:
True if we need to split chunks in order to guarantee max chunk size.
False if all chunks have the maximal chunk size.
"""
split_again = False
if self.max_chunk_size != sys.maxsize:
# check if chunks are to big
for chunk in chunks:
if len(chunk.page_content) > self.max_chunk_size:
if (
(self.min_chunk_size > 1)
and (chunk.metadata["chunk_number"] == len(chunks) - 1)
and (len(chunk.page_content) <= self.min_chunk_size * 2)
):
# last chunk should only be split again if it has twice the size of min chunk size
# this ensures that the minimal chunk size is guaranteed and the max chunk size will
# be violated only once if there is no other way.
split_again = False
else:
split_again = True
break
logger.debug(f"Is max chunk size respected? {not split_again}")
return split_again
def _merge_small_chunks(self, chunks: list[Document]) -> list[Document]:
"""Combines chunks that are to small by merging their content and metadata.
The new chunks have merged metadata and updated chunk numbers and chunk length.
Args:
chunks (list[Document]): Chunks with their content and metadata.
Returns:
New chunks with their content and metadata.
"""
resized_chunks = []
processed_chunks = []
logger.debug(f"Amount of chunks before merging: {len(chunks)}")
for chunk_number, chunk in enumerate(chunks):
# skip chunks which were already merged
if chunk_number in processed_chunks:
continue
# if chunk has the minimal size add it to new list of chunks
elif len(chunk.page_content) >= self.min_chunk_size:
resized_chunks.append(chunk)
processed_chunks.append(chunk_number)
logger.debug(
f"Chunk {chunk_number} was big enough: {len(chunk.page_content)}: no more merging of chunks needed."
)
# if its not the last chunk merge with subsequent chunks
elif chunk_number < len(chunks) - 1:
added_chunks_counter = 0
merged_chunk = chunk
processed_chunks.append(chunk_number)
logger.debug(
f"Chunk {chunk_number} with length {len(chunk.page_content)}: merge with subsequent chunks"
)
# add subsequent chunks
while (
len(merged_chunk.page_content) < self.min_chunk_size
and chunk_number + added_chunks_counter < len(chunks) - 1
):
added_chunks_counter += 1
chunk_number_of_added_chunk = chunk_number + added_chunks_counter
add_chunk = chunks[chunk_number_of_added_chunk]
if chunk_number_of_added_chunk < len(chunks):
merged_chunk = self._merge_contents(
base_chunk=merged_chunk, add_chunk=add_chunk
)
merged_chunk = self._merge_metadata(
base_chunk=merged_chunk, add_chunk=add_chunk
)
processed_chunks.append(chunk_number_of_added_chunk)
resized_chunks.append(merged_chunk)
# if its the last chunk, merge it with previous chunk
elif chunk_number == len(chunks) - 1:
logger.debug(
f"Chunk {chunk_number} (= last chunk) with length {len(chunk.page_content)}: "
"merge with previous chunks"
)
merged_chunk = self._merge_contents(
base_chunk=resized_chunks[-1], add_chunk=chunk
)
merged_chunk = self._merge_metadata(
base_chunk=resized_chunks[-1], add_chunk=chunk
)
processed_chunks.append(chunk_number)
resized_chunks[-1] = merged_chunk
# if new last chunk is still to short merge again with previous chunk
while (
len(resized_chunks[-1].page_content) < self.min_chunk_size
and len(resized_chunks) > 1
):
logger.debug(
f"New last chunk is still to short {len(resized_chunks[-1].page_content)}: "
f"merge again with previous chunk"
)
merged_chunk = self._merge_contents(
base_chunk=resized_chunks[-2], add_chunk=resized_chunks[-1]
)
merged_chunk = self._merge_metadata(
base_chunk=resized_chunks[-2], add_chunk=resized_chunks[-1]
)
resized_chunks = resized_chunks[:-2]
resized_chunks.append(merged_chunk)
resized_chunks = self._update_metadata(chunks=resized_chunks)
logger.debug(f"Amount of Chunks after merging: {len(resized_chunks)}")
return resized_chunks
def _split_chunks(
self, chunks: list[Document], maximal_character_count: int
) -> list[Document]:
"""Splits large chunks in order to respect the size maximum.
The maximal character count can either be max chunk size or something smaller if chunks need to be combined
in order to fit within a definied chunk size range. In this case the min chunk size is used.
The new chunks have updated chunk numbers and chunk length, while keeping the remaining metadata.
Args:
chunks (list[Document]): Chunks with their content and metadata.
maximal_character_count (int): Maximal chunk length, which defines where to split large chunks.
Returns:
List of updated chunks.
"""
logger.debug(f"Amount of chunks before splitting: {len(chunks)}")
separators = ["\n\n", "\n", " "]
splitter = RecursiveCharacterTextSplitter(
separators=separators, chunk_size=maximal_character_count, chunk_overlap=0
)
chunks = splitter.split_documents(chunks)
chunks = self._update_metadata(chunks=chunks)
logger.debug(f"Amount of chunks after splitting: {len(chunks)}")
return chunks
def _merge_chunks_by_headlines(self, chunks: list[Document]) -> list[Document]:
"""This function merges chunks if they have the same heading.
In case a maximal chunk size is given, this function will only merge two chunks,
if the new chunks size is smaller than that. If no maximal chunk size is given,
all chunks with the same heading will be merged.
Args:
chunks (list[Document]): Chunks with their content and metadata.
Returns:
List of updated chunks.
"""
logger.debug(f"Amount of chunks before merging by headlines: {len(chunks)}")
chunks_by_headlines = []
for chunk_num, chunk in enumerate(chunks):
if chunk_num == 0:
chunks_by_headlines.append(chunk)
else:
same_headings = (
chunk.metadata["headings"]
== chunks_by_headlines[-1].metadata["headings"]
)
if self.max_chunk_size == sys.maxsize:
too_long = False
else:
too_long = (
len(chunks_by_headlines[-1].page_content)
+ len(chunk.page_content)
) > self.max_chunk_size
if not too_long and same_headings:
merged_chunk = chunks_by_headlines[-1]
merged_chunk.page_content = (
merged_chunk.page_content + "\n\n" + chunk.page_content
)
merged_chunk = self._merge_metadata(
base_chunk=merged_chunk, add_chunk=chunk
)
chunks_by_headlines[-1] = merged_chunk
else:
chunks_by_headlines.append(chunk)
chunks_by_headlines = self._update_metadata(chunks=chunks_by_headlines)
logger.debug(
f"Amount of chunks after merging by headlines: {len(chunks_by_headlines)}"
)
return chunks_by_headlines
def _include_headlines_in_chunk_contents(self, chunk: Document) -> Document:
"""Include each chunk headline into its contents.
Hierarchical headings from Markdown files are handled by interesting only the headline with the lowest level.
Args:
chunk (Document): Chunks with its content and metadata.
Returns:
Chunk with updated contents.
"""
if (
"headings" in chunk.metadata
and self.include_headings == IncludeHeadings.INCLUDE
and isinstance(chunk.metadata["headings"], list)
and len(chunk.metadata["headings"]) > 0
):
heading = chunk.metadata["headings"][-1]
chunk.page_content = "\n".join([heading, chunk.page_content])
return chunk
def _merge_contents(self, base_chunk: Document, add_chunk: Document) -> Document:
"""Merges contents of two chunks, while adding the headline of the second chunk into the new chunks contents.
Args:
base_chunk (Document): Chunk to add metadata to.
add_chunk (Document): Chunk from which the heading and content will be added to the base_chunks contents.
Returns:
Chunk with merged contents including the headline of the add_chunk.
"""
# include headline of the second chunk into the chunks content
add_chunk = self._include_headlines_in_chunk_contents(chunk=add_chunk)
merged_chunk = base_chunk
merged_chunk.page_content = "\n\n".join(
[base_chunk.page_content, add_chunk.page_content]
)
return merged_chunk
def _merge_metadata(self, base_chunk: Document, add_chunk: Document) -> Document:
"""Merges metadata (headings, pages and locations) of two chunks.
Args:
base_chunk (Document): Chunk to add metadata to.
add_chunk (Document): Chunk containing the metadata, which will be added to the base_chunk.
Returns:
Chunk with merged metadata.
"""
metadata_entries = [
{"entry": "headings", "none-value": [""]},
{"entry": "pages", "none-value": [0]},
{
"entry": "locations",
"none-value": [{"boundingbox": None, "charspan": None}],
},
]
merged_chunk = base_chunk
for metadata in metadata_entries:
base_entry = base_chunk.metadata[metadata["entry"]]
add_entry = add_chunk.metadata[metadata["entry"]]
if (not base_entry) and (not add_entry):
merged_chunk.metadata[metadata["entry"]] = None
else:
if not add_entry:
add_entry = metadata["none-value"]
if not base_entry:
base_entry = metadata["none-value"]
if isinstance(add_entry, list) and isinstance(base_entry, list):
merged_entry = base_entry
# avoid duplicats in lists due to different orders in lists
for add_item in add_entry:
if add_item not in base_entry:
merged_entry.append(add_item)
merged_chunk.metadata[metadata["entry"]] = merged_entry
elif add_entry not in base_entry:
# add entry if its not equal to the current entry
merged_chunk.metadata[metadata["entry"]] = base_entry + add_entry
else:
merged_chunk.metadata[metadata["entry"]] = base_entry
return merged_chunk
def _update_metadata(self, chunks: list[Document]) -> list[Document]:
"""Update chunk number and length of each chunks metadata.
Args:
chunks (list[Document]): List of chunks with their content and metadata.
Returns:
List of chunks with updated metadata.
"""
updated_chunks = []
for chunk_num, chunk in enumerate(chunks):
updated_chunk = chunk
updated_chunk.metadata["chunk_number"] = chunk_num
updated_chunk.metadata["chunk_length"] = len(chunk.page_content)
updated_chunks.append(updated_chunk)
return updated_chunks
chunk_parsed_file
Performs chunking on the input file of type pdf, docx or txt.
| RETURNS | DESCRIPTION |
|---|---|
list[Chunk]
|
List of chunks with their content and metadata. |
Source code in docs/microservices/parser/src/chunker.py
def chunk_parsed_file(self) -> list[Chunk]:
"""Performs chunking on the input file of type pdf, docx or txt.
Returns:
List of chunks with their content and metadata.
"""
if self.filetype == "markdown":
logger.debug("Started markdown chunking for docx parsing output.")
chunks = self._docx_chunking()
chunks = self._transform_chunks(chunks=chunks)
elif self.filetype == "text":
logger.debug("Started text chunking for txt parsing output.")
chunks = self._txt_chunking()
chunks = self._transform_chunks(chunks=chunks)
elif self.filetype == "docling":
logger.debug("Started docling chunking for pdf parsing output.")
chunks = self._pdf_chunking()
chunks = self._transform_chunks(chunks=chunks)
logger.debug("Chunking finished.")
return chunks
chunk_parsed_text
Performs chunking of text input.
| RETURNS | DESCRIPTION |
|---|---|
list[Chunk]
|
List of chunks with their content and metadata. |
Source code in docs/microservices/parser/src/chunker.py
def chunk_parsed_text(self) -> list[Chunk]:
"""Performs chunking of text input.
Returns:
List of chunks with their content and metadata.
"""
chunks = self._chunk_text()
chunks = self._check_chunk_size(chunks=chunks)
logger.debug(
f"Text split into {len(chunks)} chunks. Chunking of text input finished."
)
chunks = self._transform_chunks(chunks=chunks)
return chunks
chunker_init
Computes chunking paremeters to initalize the chunker with.
| FUNCTION | DESCRIPTION |
|---|---|
compute_chunking_parameters |
Composes all parameters needed by the Chunker repecting the chunking settings. |
compute_chunking_parameters
Composes all parameters needed by the Chunker repecting the chunking settings.
This includes the computation of the appropriate chunk size range according to the chunking settings.
| PARAMETER | DESCRIPTION |
|---|---|
chunking_input
|
Settings for text chunking.
TYPE:
|
parsed_text
|
Cleaned and parsed text.
TYPE:
|
parsed_text_raw
|
Text incl. type of text, which will be used for chunking.
TYPE:
|
filename
|
Name of parsed file.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ChunkingParameters
|
All parameters required by the chunker.
TYPE:
|
Source code in docs/microservices/parser/src/chunker_init.py
def compute_chunking_parameters(
chunking_input: ChunkingInput,
parsed_text: str,
parsed_text_raw: RawText,
filename: str,
) -> ChunkingParameters:
"""Composes all parameters needed by the Chunker repecting the chunking settings.
This includes the computation of the appropriate chunk size range according to the chunking settings.
Args:
chunking_input (ChunkingInput): Settings for text chunking.
parsed_text (str): Cleaned and parsed text.
parsed_text_raw (RawText): Text incl. type of text, which will be used for chunking.
filename (str): Name of parsed file.
Returns:
ChunkingParameters: All parameters required by the chunker.
"""
text = parsed_text_raw.text
text_length = len(parsed_text)
filetype = parsed_text_raw.type
chunking_mode = chunking_input.mode
max_chunk_size = chunking_input.max_chunk_size
min_chunk_size = chunking_input.min_chunk_size
include_headings = chunking_input.include_headings
if chunking_mode == ChunkingMode.RANGE_CHUNKING:
pass
elif chunking_mode == ChunkingMode.SUMMARY_CHUNKING:
min_chunk_size, max_chunk_size = _get_summary_chunksizerange(
chunking_input=chunking_input, text_length=text_length
)
include_headings = IncludeHeadings.INCLUDE
elif chunking_mode == ChunkingMode.COUNT_CHUNKING:
min_chunk_size, max_chunk_size = _get_chunkcount_chunksizerange(
chunking_input=chunking_input, text_length=text_length
)
elif chunking_mode == ChunkingMode.HEADLINE_CHUNKING:
# use defaults because headline chunking requires maximum flexibility
max_chunk_size = sys.maxsize
min_chunk_size = 1
return ChunkingParameters(
max_chunk_size=max_chunk_size,
min_chunk_size=min_chunk_size,
text=text,
text_length=text_length,
filetype=filetype,
filename=filename,
include_headings=include_headings,
)
docling_model_init
Initializes the docling pdf parser by downloading nesseccary models.
| FUNCTION | DESCRIPTION |
|---|---|
check_docling_models |
Checks that all directories for the parsing models are not empty. |
download_pdf_parsing_models |
Initializes the docling pdf parser. |
check_docling_models
Checks that all directories for the parsing models are not empty.
| PARAMETER | DESCRIPTION |
|---|---|
model_path
|
path to directory were the models for parsing with docling are stored.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
bool
|
true if all models have been donwloaded, false if downloads are not been complete
TYPE:
|
Source code in docs/microservices/parser/src/docling_model_init.py
def check_docling_models(model_path: Path) -> bool:
"""Checks that all directories for the parsing models are not empty.
Args:
model_path (Path): path to directory were the models for parsing with docling are stored.
Returns:
bool: true if all models have been donwloaded, false if downloads are not been complete
"""
# state directories (and a small selection of files) needed by docling
docling_model_dirs = [
model_path,
model_path / "ds4sd--CodeFormulaV2",
model_path / "ds4sd--docling-models",
model_path / "ds4sd--docling-models" / "model_artifacts" / "layout",
model_path
/ "ds4sd--docling-models"
/ "model_artifacts"
/ "tableformer"
/ "accurate",
model_path
/ "ds4sd--docling-models"
/ "model_artifacts"
/ "tableformer"
/ "fast",
model_path / "ds4sd--DocumentFigureClassifier",
model_path / "EasyOcr",
model_path / "hub",
model_path / "hub" / "models--ds4sd--CodeFormulaV2",
model_path / "hub" / "models--ds4sd--CodeFormulaV2" / "refs",
model_path / "hub" / "models--ds4sd--docling-models",
model_path / "hub" / "models--ds4sd--docling-models" / "refs",
model_path / "hub" / "models--ds4sd--DocumentFigureClassifier",
model_path / "hub" / "models--ds4sd--DocumentFigureClassifier" / "refs",
model_path / "xet",
]
docling_model_files = [
model_path / "EasyOcr" / "craft_mlt_25k.pth",
model_path / "EasyOcr" / "english_g2.pth",
model_path / "EasyOcr" / "latin_g2.pth",
]
# check for missing directories
missing_dirs = [
directory for directory in docling_model_dirs if not directory.is_dir()
]
missing_files = [file for file in docling_model_files if not file.is_file()]
if missing_dirs or missing_files:
logger.info(
"Previous download of PDF parsing models was uncomplete! "
f"Missing directories: {missing_dirs}; Missing filies: {missing_files}"
)
return False
else:
# check for empty directories
empty_dirs = [
directory
for directory in docling_model_dirs
if not any(directory.iterdir())
]
if empty_dirs:
logger.info(
f"Previous download of PDF parsing models was uncomplete! Empty directories: {empty_dirs}"
)
return False
else:
# all checks successfully passed!
logger.info("All PDF parsing models are present.")
return True
download_pdf_parsing_models
Initializes the docling pdf parser.
Checks that all models needed to parse pdf files are present. If at least one model is missing the download is started.
Source code in docs/microservices/parser/src/docling_model_init.py
def download_pdf_parsing_models() -> None:
"""Initializes the docling pdf parser.
Checks that all models needed to parse pdf files are present.
If at least one model is missing the download is started.
"""
logger.debug("Starting PDF parsing initialization.")
start_time = time.time()
status = ""
model_path = settings.docling_models_path
if check_docling_models(model_path=model_path):
logger.debug("Skipping download of PDF parsing models.")
else:
logger.info(
"Download of nesseccary models for parsing started. "
"Please wait till all downloads have been completed!"
)
download_models(output_dir=model_path, with_easyocr=True)
if check_docling_models(model_path=model_path):
logger.debug("Download of parsing models finished successfully.")
status = "successfully"
else:
logger.error(
"Download of docling models (which are neccessary to parse pdf files) has failed! "
"Docx and txt files can still be parsed. For pdf files: "
"Please check your network connenction or try it again later."
)
status = "with warnings"
end_time = time.time()
logger.info(
f"PDF parser initialization finished {status} after {end_time - start_time:.2f} seconds"
)
endpoints
Defines all endpoints of the FastAPI app.
| FUNCTION | DESCRIPTION |
|---|---|
health |
Returns a health check message. |
parse_file |
Defines the file-parsing endpoint. |
parse_string |
Defines the string-parsing endpoint. |
health
async
Returns a health check message.
| RETURNS | DESCRIPTION |
|---|---|
dict[str, str]
|
The health check message as a dictionary. |
Source code in docs/microservices/parser/src/endpoints.py
@router.get(
"/",
summary="Health check endpoint",
description=(
"Returns a simple message indicating that the parser service is running.\n\n"
"Use this endpoint to verify that the service is alive and responsive."
),
responses={
200: {
"description": "Health check successful",
"content": {
"application/json": {"example": {"status": "Parser is running"}}
},
},
500: {"description": "Internal server error"},
},
)
@router.get(
"/health",
summary="Health check endpoint",
description=(
"Returns a simple message indicating that the parser service is running.\n\n"
"Use this endpoint to verify that the service is alive and responsive."
),
responses={
200: {
"description": "Health check successful",
"content": {
"application/json": {"example": {"status": "Parser is running"}}
},
},
500: {"description": "Internal server error"},
},
)
async def health() -> dict[str, str]:
"""Returns a health check message.
Returns:
The health check message as a dictionary.
"""
return {"message": f"{settings.service_name} is running"}
parse_file
async
parse_file(file=File(..., description='Upload a PDF, DOCX, or TXT file'), table_handling=Form(TableHandling.EXCLUDE), empty_lines_handling=Form(EmptyLinesHandling.REMOVE), chunking_input=Depends(ChunkingInput.as_form))
Defines the file-parsing endpoint.
| PARAMETER | DESCRIPTION |
|---|---|
file
|
File that needs to be parsed and cleaned.
TYPE:
|
table_handling
|
Indicating if tables should be included. Default excludes tables during parsing.
TYPE:
|
empty_lines_handling
|
Indicating if consecutive empty lines should be removed or kept. Default is removal.
TYPE:
|
chunking_input
|
Switch chunking on or off and set additional options for chunk size. Default setting skips chunking.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ParsingOutput
|
Parsing output used to hold parsed text and input for chunking. |
Note
|
Parsing ouput attribute 'text_raw' is deprecated and will be removed in furture releases.
TYPE:
|
Source code in docs/microservices/parser/src/endpoints.py
@router.post(
"/parse/file",
response_model=ParsingOutput,
summary="Parse a file and clean text",
description=(
"Parses a string from a file upload.\n\n"
"This endpoint performs basic text cleaning. "
"You can choose whether consecutive empty lines should be removed or kept, "
"and whether tables should be included in the parsed output."
"You can choose whether the parsed text should be chunked and specify the desired chunk size."
),
responses={
200: {
"description": "Successfully parsed file.",
"content": {
"application/json": {
"examples": ParsingOutput.model_config["json_schema_extra"][
"openapi_examples"
],
}
},
},
400: {"description": "Error parsing the body."},
422: {"description": "Unsupported file type."},
424: {"description": "Error during PDF-processing."},
500: {"description": "Internal server error."},
},
)
async def parse_file(
file: UploadFile = File(..., description="Upload a PDF, DOCX, or TXT file"),
table_handling: TableHandling = Form(TableHandling.EXCLUDE),
empty_lines_handling: EmptyLinesHandling = Form(EmptyLinesHandling.REMOVE),
chunking_input: ChunkingInput = Depends(ChunkingInput.as_form),
) -> ParsingOutput:
"""Defines the file-parsing endpoint.
Args:
file (UploadFile): File that needs to be parsed and cleaned.
table_handling (TableHandling, optional): Indicating if tables should be included.
Default excludes tables during parsing.
empty_lines_handling (EmptyLinesHandling, optional): Indicating if consecutive empty lines should be removed or
kept. Default is removal.
chunking_input (ChunkingInput, optional): Switch chunking on or off and set additional options for chunk size.
Default setting skips chunking.
Returns:
Parsing output used to hold parsed text and input for chunking.
Note: Parsing ouput attribute 'text_raw' is deprecated and will be removed in furture releases.
"""
# Parsing & Cleaning
parsing_output = parser.run_file_parsing(
input_file=file,
table_handling=table_handling,
empty_lines_handling=empty_lines_handling,
)
# Chunking
chunks = None
if chunking_input.mode != ChunkingMode.NO_CHUNKING:
chunking_parameters = compute_chunking_parameters(
chunking_input=chunking_input,
parsed_text=parsing_output.text,
parsed_text_raw=parsing_output.text_raw,
filename=file.filename,
)
chunker = Chunker(chunking_parameters=chunking_parameters)
chunks = chunker.chunk_parsed_file()
logger.info(f"Chunking finished with {len(chunks)} chunks")
else:
logger.info(
f"Chunking was skipped due to chunking mode = {chunking_input.mode}"
)
# Deprication Warning
logger.warning(
"The parsing output attribute text_raw is deprecated and will be removed in future releases."
"You might use text or chunks instead depedning on your usecase."
)
return ParsingOutput(
text=parsing_output.text,
text_raw={
"text": parsing_output.text_raw.text,
"type": parsing_output.text_raw.type,
},
chunks=chunks,
)
parse_string
async
Defines the string-parsing endpoint.
| PARAMETER | DESCRIPTION |
|---|---|
parsing_input
|
String that needs to be parsed, cleaned, and chunked.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ParsingOutput
|
Parsing output used to hold parsed text and chunks. |
Note
|
Parsing ouput attribute 'text_raw' is deprecated and will be removed in furture releases.
TYPE:
|
Source code in docs/microservices/parser/src/endpoints.py
@router.post(
"/parse/string",
response_model=ParsingOutput,
summary="Text parsing endpoint.",
description=(
"Parses a string from plain text input.\n\n"
"This endpoint performs basic text cleaning. "
"You can choose whether consecutive empty lines should be removed or kept, "
"and whether tables should be included in the parsed output."
"You can choose whether the parsed text should be chunked and specify the desired chunk size."
),
openapi_extra={
"requestBody": {
"content": {
"application/json": {
"examples": StringParsingInput.model_config["json_schema_extra"][
"openapi_examples"
],
}
}
}
},
responses={
200: {
"description": "Successfully parsed string.",
"content": {
"application/json": {
"examples": ParsingOutput.model_config["json_schema_extra"][
"openapi_examples"
],
}
},
},
400: {"description": "Error parsing the body."},
},
)
async def parse_string(parsing_input: StringParsingInput) -> ParsingOutput:
"""Defines the string-parsing endpoint.
Args:
parsing_input (StringParsingInput): String that needs to be parsed, cleaned, and chunked.
Returns:
Parsing output used to hold parsed text and chunks.
Note: Parsing ouput attribute 'text_raw' is deprecated and will be removed in furture releases.
"""
# Parsing & Cleaning
text = parsing_input.input_text
if parsing_input.empty_lines_handling == EmptyLinesHandling.REMOVE:
text = parser.run_string_parsing(
text=text,
)
text = text.strip()
text_raw = RawText(text=text, type="text")
# Chunking
chunks = None
if parsing_input.chunking_input.mode != ChunkingMode.NO_CHUNKING:
chunking_parameters = compute_chunking_parameters(
chunking_input=parsing_input.chunking_input,
parsed_text=text,
parsed_text_raw=text_raw,
filename="Texteingabe",
)
chunker = Chunker(chunking_parameters=chunking_parameters)
chunks = chunker.chunk_parsed_text()
logger.info(f"Chunking finished with {len(chunks)} chunks")
else:
logger.info(
f"Chunking was skipped due to chunking mode = {parsing_input.chunking_input.mode}"
)
# Deprication Warning
logger.warning(
"The parsing output attribute text_raw is deprecated and will be removed in future releases. "
"You might use text instead or use the chunking capabilities of this microservice."
)
return ParsingOutput(
text=text,
text_raw=text_raw,
chunks=chunks,
)
models
Models loading and checking API and configuration parameters.
| MODULE | DESCRIPTION |
|---|---|
api_input |
Defines parsing input parameters. |
api_output |
Defines pydantic Models for API input parameters. |
chunking_models |
Pydantic Models for Chunker input parameters. |
general |
Loads and checks Settings from yml. |
api_input
Defines parsing input parameters.
| CLASS | DESCRIPTION |
|---|---|
ChunkingInput |
Input model controlling chunking behavior. |
ChunkingMode |
Defines modes for chunking. |
ChunkingOptions |
Input model controlling chunking options. |
EmptyLinesHandling |
Defines options for handling consecutive empty lines within parsing output. |
StringParsingInput |
Defines a parsing input model that is used to parse, clean and chunk strings. |
TableHandling |
Defines options for handling tables during input parsing. |
ChunkingInput
Bases: BaseModel
Input model controlling chunking behavior.
| ATTRIBUTE | DESCRIPTION |
|---|---|
mode |
Selected chunking mode.
TYPE:
|
include_headings |
Whether headlines should be included or excluded from each chunks content.
TYPE:
|
min_chunk_size |
Minimal number of characters per chunk (necessary for RANGE_CHUNKING and optional for COUNT_CHUNKING).
TYPE:
|
max_chunk_size |
Maximum number of characters per chunk (necessary for RANGE_CHUNKING and optional for COUNT_CHUNKING).
TYPE:
|
max_llm_input_chars |
Maximum number of characters the LLM can process (only for SUMMARY_CHUNKING).
TYPE:
|
min_number_of_chunks |
Minimum number of chunks (only for SUMMARY_CHUNKING).
TYPE:
|
number_of_chunks |
Number of chunks to split the text into (only for COUNT_CHUNKING).
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
as_form |
Creates a ChunkingInput from form data. |
Source code in docs/microservices/parser/src/models/api_input.py
class ChunkingInput(BaseModel):
"""Input model controlling chunking behavior.
Attributes:
mode (ChunkingMode): Selected chunking mode.
include_headings (IncludeHeadings): Whether headlines should be included or excluded from each chunks content.
min_chunk_size (PositiveInt): Minimal number of characters per chunk (necessary for RANGE_CHUNKING and
optional for COUNT_CHUNKING).
max_chunk_size (conint(ge=2)): Maximum number of characters per chunk (necessary for RANGE_CHUNKING and optional
for COUNT_CHUNKING).
max_llm_input_chars (PositiveInt): Maximum number of characters the LLM can process (only for SUMMARY_CHUNKING).
min_number_of_chunks (PositiveInt): Minimum number of chunks (only for SUMMARY_CHUNKING).
number_of_chunks (PositiveInt): Number of chunks to split the text into (only for COUNT_CHUNKING).
"""
mode: ChunkingMode = Form("no_chunking")
include_headings: IncludeHeadings = Form("include")
min_chunk_size: PositiveInt = Field(
1,
description=(
"Minimal number of characters per chunk (necessary for RANGE_CHUNKING and"
"optional for COUNT_CHUNKING)."
),
example=500,
)
max_chunk_size: conint(ge=2) = Field(
sys.maxsize,
description=(
"Maximal number of characters per chunk (necessary for RANGE_CHUNKING and optional"
"for COUNT_CHUNKING)."
),
example=1000,
)
max_llm_input_chars: PositiveInt = Field(
365851,
description="Maximum number of characters the LLM can process in one call (only for SUMMARY_CHUNKING).",
example=100000,
)
min_number_of_chunks: PositiveInt = Field(
5,
description="Minimal number of chunks to generate before summarization (only for SUMMARY_CHUNKING).",
example=3,
)
number_of_chunks: PositiveInt = Field(
1,
description="Number of chunks to split the text into (only for COUNT_CHUNKING).",
example=3,
)
@classmethod
def as_form(
cls,
mode: ChunkingMode = Form(
ChunkingMode.NO_CHUNKING,
description="Selected chunking mode.",
example="no_chunking",
),
options: ChunkingOptions = Depends(),
) -> Self:
"""Creates a ChunkingInput from form data.
Attributes:
mode (ChunkingMode): Defined mode for chunking.
options (ChunkingOptions): Defined options for chunking.
Returns:
ChunkingInput: Validated ChunkingInput
"""
return cls(
mode=mode,
min_chunk_size=options.min_chunk_size,
max_chunk_size=options.max_chunk_size,
max_llm_input_chars=options.max_llm_input_chars,
min_number_of_chunks=options.min_number_of_chunks,
number_of_chunks=options.number_of_chunks,
include_headings=options.include_headings,
)
as_form
classmethod
as_form(mode=Form(ChunkingMode.NO_CHUNKING, description='Selected chunking mode.', example='no_chunking'), options=Depends())
Creates a ChunkingInput from form data.
| ATTRIBUTE | DESCRIPTION |
|---|---|
mode |
Defined mode for chunking.
TYPE:
|
options |
Defined options for chunking.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ChunkingInput
|
Validated ChunkingInput
TYPE:
|
Source code in docs/microservices/parser/src/models/api_input.py
@classmethod
def as_form(
cls,
mode: ChunkingMode = Form(
ChunkingMode.NO_CHUNKING,
description="Selected chunking mode.",
example="no_chunking",
),
options: ChunkingOptions = Depends(),
) -> Self:
"""Creates a ChunkingInput from form data.
Attributes:
mode (ChunkingMode): Defined mode for chunking.
options (ChunkingOptions): Defined options for chunking.
Returns:
ChunkingInput: Validated ChunkingInput
"""
return cls(
mode=mode,
min_chunk_size=options.min_chunk_size,
max_chunk_size=options.max_chunk_size,
max_llm_input_chars=options.max_llm_input_chars,
min_number_of_chunks=options.min_number_of_chunks,
number_of_chunks=options.number_of_chunks,
include_headings=options.include_headings,
)
ChunkingMode
Bases: StrEnum
Defines modes for chunking.
Switches Chunking on or off and sets how chunks should be computed.
Values
NO_CHUNKING: "no_chunking" – skip chunking, only do parsing. RANGE_CHUNKING: "range_chunking" – chunks with specific size (minimal and maximal number ob characters). SUMMARY_CHUNKING: "summary_chunking" – chunks with ideal size for summarization. COUNT_CHUNKING: "count_chunking" – chunks into fix number of chunks of same sizes. HEADLINE_CHUNKING: "headline_chunking" – creates chunks for each headlines content.
Source code in docs/microservices/parser/src/models/api_input.py
class ChunkingMode(StrEnum):
"""Defines modes for chunking.
Switches Chunking on or off and sets how chunks should be computed.
Values:
NO_CHUNKING: "no_chunking" – skip chunking, only do parsing.
RANGE_CHUNKING: "range_chunking" – chunks with specific size
(minimal and maximal number ob characters).
SUMMARY_CHUNKING: "summary_chunking" – chunks with ideal size for summarization.
COUNT_CHUNKING: "count_chunking" – chunks into fix number of chunks of same sizes.
HEADLINE_CHUNKING: "headline_chunking" – creates chunks for each headlines content.
"""
NO_CHUNKING = "no_chunking"
RANGE_CHUNKING = "range_chunking"
SUMMARY_CHUNKING = "summary_chunking"
COUNT_CHUNKING = "count_chunking"
HEADLINE_CHUNKING = "headline_chunking"
ChunkingOptions
Bases: BaseModel
Input model controlling chunking options.
| ATTRIBUTE | DESCRIPTION |
|---|---|
min_chunk_size |
Minimal number of characters per chunk (only for RANGE_CHUNKING).
TYPE:
|
max_chunk_size |
Maximal number of characters per chunk (only for RANGE_CHUNKING).
TYPE:
|
max_llm_input_chars |
Maximum number of characters the LLM can process (only for SUMMARY_CHUNKING).
TYPE:
|
min_number_of_chunks |
Minimal number of chunks (only for SUMMARY_CHUNKING).
TYPE:
|
number_of_chunks |
Number of chunks to split the text into (only for COUNT_CHUNKING).
TYPE:
|
include_headings |
Whether headlines should be included or excluded from each chunks content.
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
check_chunk_size_range |
Validates appropriate settings for minimal and maximal chunksize. |
Source code in docs/microservices/parser/src/models/api_input.py
class ChunkingOptions(BaseModel):
"""Input model controlling chunking options.
Attributes:
min_chunk_size (PositiveInt): Minimal number of characters per chunk (only for RANGE_CHUNKING).
max_chunk_size (conint(ge=2)): Maximal number of characters per chunk (only for RANGE_CHUNKING).
max_llm_input_chars (PositiveInt): Maximum number of characters the LLM can process (only for SUMMARY_CHUNKING).
min_number_of_chunks (PositiveInt): Minimal number of chunks (only for SUMMARY_CHUNKING).
number_of_chunks (PositiveInt): Number of chunks to split the text into (only for COUNT_CHUNKING).
include_headings (IncludeHeadings): Whether headlines should be included or excluded from each chunks content.
"""
min_chunk_size: PositiveInt = 1
max_chunk_size: conint(ge=2) = sys.maxsize
max_llm_input_chars: PositiveInt = 365851
min_number_of_chunks: PositiveInt = 5
number_of_chunks: PositiveInt = 1
include_headings: IncludeHeadings = IncludeHeadings.INCLUDE
@model_validator(mode="after")
def check_chunk_size_range(self) -> Self:
"""Validates appropriate settings for minimal and maximal chunksize.
Raises:
HTTPException: HTTP_400_BAD_REQUES if input is not valid because minimal chunksize is bigger than maximal
chunksize.
Returns:
ChunkingOptions: validated parameters for chunk sizes.
"""
if self.min_chunk_size > self.max_chunk_size:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=("min_chunk_size has to be smaller than max_chunk_size."),
)
return self
check_chunk_size_range
Validates appropriate settings for minimal and maximal chunksize.
| RAISES | DESCRIPTION |
|---|---|
HTTPException
|
HTTP_400_BAD_REQUES if input is not valid because minimal chunksize is bigger than maximal chunksize. |
| RETURNS | DESCRIPTION |
|---|---|
ChunkingOptions
|
validated parameters for chunk sizes.
TYPE:
|
Source code in docs/microservices/parser/src/models/api_input.py
@model_validator(mode="after")
def check_chunk_size_range(self) -> Self:
"""Validates appropriate settings for minimal and maximal chunksize.
Raises:
HTTPException: HTTP_400_BAD_REQUES if input is not valid because minimal chunksize is bigger than maximal
chunksize.
Returns:
ChunkingOptions: validated parameters for chunk sizes.
"""
if self.min_chunk_size > self.max_chunk_size:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=("min_chunk_size has to be smaller than max_chunk_size."),
)
return self
EmptyLinesHandling
Bases: StrEnum
Defines options for handling consecutive empty lines within parsing output.
Consecutive empty lines can be removed or kept. Removing is recommended for most use cases.
Values
REMOVE: "remove" – remove consecutive empty lines. KEEP: "keep" – keep consecutive empty lines.
Source code in docs/microservices/parser/src/models/api_input.py
class EmptyLinesHandling(StrEnum):
"""Defines options for handling consecutive empty lines within parsing output.
Consecutive empty lines can be removed or kept.
Removing is recommended for most use cases.
Values:
REMOVE: "remove" – remove consecutive empty lines.
KEEP: "keep" – keep consecutive empty lines.
"""
REMOVE = "remove"
KEEP = "keep"
StringParsingInput
Bases: BaseModel
Defines a parsing input model that is used to parse, clean and chunk strings.
| ATTRIBUTE | DESCRIPTION |
|---|---|
input_text |
Text input that needs to be parsed and cleaned.
TYPE:
|
empty_lines_handling |
Indicating if consecutive empty lines should be removed or kept. Default is removal.
TYPE:
|
table_handling |
Indicating if tables should be included. Default excludes tables during parsing.
TYPE:
|
chunking_input |
Switch chunking on or off and set additional options for chunk size. Default setting skips chunking.
TYPE:
|
Source code in docs/microservices/parser/src/models/api_input.py
class StringParsingInput(BaseModel):
"""Defines a parsing input model that is used to parse, clean and chunk strings.
Attributes:
input_text (str): Text input that needs to be parsed and cleaned.
empty_lines_handling (EmptyLinesHandling): Indicating if consecutive empty lines should be removed or
kept. Default is removal.
table_handling (TableHandling): Indicating if tables should be included.
Default excludes tables during parsing.
chunking_input (ChunkingInput): Switch chunking on or off and set additional options for chunk size.
Default setting skips chunking.
"""
input_text: str
empty_lines_handling: EmptyLinesHandling = EmptyLinesHandling.REMOVE
table_handling: TableHandling = TableHandling.EXCLUDE
chunking_input: ChunkingInput = ChunkingInput()
model_config = ConfigDict(
json_schema_extra={
"openapi_examples": {
"default_parsing": {
"summary": "Text parsing with default settings.",
"description": (
"Example input for parsing a short text with default settings. "
"Removing consecutive empty lines form the parsing output."
"Without parsing of Tables and without Chunking."
),
"value": {
"input_text": "This is a sample text.\n\n\n\nWith empty lines.",
"empty_lines_handling": "remove",
"table_handling": "exclude",
"chunking_input": {"mode": "no_chunking"},
},
},
"only_parsing": {
"summary": "Parse text without post-processing and chunking it.",
"description": "Example input for parsing text and keeping all empty lines.",
"value": {
"input_text": "This is a sample text.\n\n\n\nWith empty lines.",
"empty_lines_handling": "keep",
"table_handling": "exclude",
"chunking_input": {"mode": "no_chunking"},
},
},
"parsing_and_chunking_according_to_range_chunking": {
"summary": "Parse text and chunk according to a chunk size range.",
"description": "Example input for parsing text and chunking it according to a desired chunk size "
"range.",
"value": {
"input_text": "This is a sample text.\n\n\n\nWith empty lines.",
"empty_lines_handling": "remove",
"table_handling": "exclude",
"chunking_input": {
"mode": "range_chunking",
"min_chunk_size": 5,
"max_chunk_size": 15,
},
},
},
"parsing_and_chunking_according_to_count_chunking": {
"summary": "Parse text and chunk according to a chunk count.",
"description": "Example input for parsing text and chunking it according to a desired chunk count.",
"value": {
"input_text": "This is a sample text.\n\n\n\nWith empty lines.",
"empty_lines_handling": "remove",
"table_handling": "exclude",
"chunking_input": {
"mode": "count_chunking",
"number_of_chunks": 3,
},
},
},
"parsing_and_chunking_for_summary_microservice": {
"summary": "Parse text and generate chunks for the summary microservice.",
"description": "Example input for parsing text and chunking it the way it needs to be for further "
"processing with the summary microservice. The chunk size is computed according to a minimal number"
" of chunks and the maximal number of characters which can be processed by one LLM call.",
"value": {
"input_text": "This is a repeating sample text.\n\n\n\nWith empty lines. Lorem ipsum dolor sit "
"amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore "
"magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea "
"rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. "
"Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor "
"invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et "
"accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata "
"sanctus est Lorem ipsum dolor sit amet.",
"empty_lines_handling": "remove",
"table_handling": "exclude",
"chunking_input": {
"mode": "summary_chunking",
"max_llm_input_chars": 100000,
"min_number_of_chunks": 2,
},
},
},
}
}
)
TableHandling
Bases: StrEnum
Defines options for handling tables during input parsing.
Tables can either be excluded or included in the parsing process. Exclusion is recommended, as complex tables are particularly prone to parsing errors.
Values
EXCLUDE: "exclude" – Exclude tables from the parsing output. INCLUDE: "include" – Include tables in the parsing output.
Source code in docs/microservices/parser/src/models/api_input.py
class TableHandling(StrEnum):
"""Defines options for handling tables during input parsing.
Tables can either be excluded or included in the parsing process.
Exclusion is recommended, as complex tables are particularly prone to parsing errors.
Values:
EXCLUDE: "exclude" – Exclude tables from the parsing output.
INCLUDE: "include" – Include tables in the parsing output.
"""
INCLUDE = "include"
EXCLUDE = "exclude"
api_output
Defines pydantic Models for API input parameters.
| CLASS | DESCRIPTION |
|---|---|
ParsingOutput |
Defines a parsing output model used to hold parsed text and input for chunking. |
RawText |
Defines the text incl. type of text, which will be used for chunking. |
ParsingOutput
Bases: BaseModel
Defines a parsing output model used to hold parsed text and input for chunking.
| ATTRIBUTE | DESCRIPTION |
|---|---|
text |
Cleaned and parsed text.
TYPE:
|
text_raw |
Contains 'parsed_text' and 'text_type' for further chunking. Note: Parsing ouput attribute 'text_raw' is depricated and will be removed in furture releases.
TYPE:
|
chunks |
Chunks
TYPE:
|
warning_msg |
Message to the user containing information about the resulting text.
TYPE:
|
Source code in docs/microservices/parser/src/models/api_output.py
class ParsingOutput(BaseModel):
"""Defines a parsing output model used to hold parsed text and input for chunking.
Attributes:
text (str): Cleaned and parsed text.
text_raw (RawText): Contains 'parsed_text' and 'text_type' for further chunking.
Note: Parsing ouput attribute 'text_raw' is depricated and will be removed in furture releases.
chunks (list[Chunk], optional): Chunks
warning_msg (str, optional): Message to the user containing information about the resulting text.
"""
text: str
text_raw: RawText = Field(
...,
description="Raw text content of the request.",
deprecated=True,
)
chunks: list[Chunk] | None = None
warning_msg: str = ""
model_config = ConfigDict(
json_schema_extra={
"openapi_examples": {
"text_parsing_output": {
"summary": "Text parsing output without chunking",
"description": "Parsed text from plain text without chunking or any warnings.",
"value": {
"text": "This is the cleaned and parsed text.",
"text_raw": {
"text": "This is the cleaned and parsed text.",
"type": "text",
},
"chunks": "null",
"warning_msg": "",
},
},
"docx_parsing_output": {
"summary": "DOCX parsing output without chunking",
"description": "Parsed text from DOCX file with chunking but without any warnings.",
"value": {
"text": "Title\nThis is the cleaned and parsed text.",
"text_raw": {
"text": "# Title\nThis is the cleaned and parsed text.",
"type": "markdown",
},
"chunks": "null",
"warning_msg": "",
},
},
"with_warning": {
"summary": "TXT parsing output with warning and without chunking",
"description": "Parsed text from TXT file where a warning message is included.",
"value": {
"text": "This is the cleaned and parsed text.",
"text_raw": {
"text": "This is the cleaned and parsed text.",
"type": "text",
},
"chunks": "null",
"warning_msg": "This is a warning message for the user.",
},
},
"with_chunks": {
"summary": "TXT parsing output without warning and with chunking",
"description": "Parsed text from TXT file incl. 2 chunks.",
"value": {
"text": "This is a sample text.\n\nWith empty lines.",
"text_raw": {
"text": "This is a sample text.\n\nWith empty lines.",
"type": "text",
},
"chunks": [
{
"chunk_content": "This is a\n\nsample",
"chunk_metadata": {
"chunk_number": 0,
"chunk_length": 17,
"filename": "Texteinagbe",
"filetype": "text",
"headings": "null",
"pages": "null",
"locations": "null",
},
},
{
"chunk_content": "text.\n\nWith empty\n\nlines.",
"chunk_metadata": {
"chunk_number": 1,
"chunk_length": 25,
"filename": "Texteinagbe",
"filetype": "text",
"headings": "null",
"pages": "null",
"locations": "null",
},
},
],
"warning_msg": "",
},
},
}
}
)
RawText
Bases: BaseModel
Defines the text incl. type of text, which will be used for chunking.
| ATTRIBUTE | DESCRIPTION |
|---|---|
text |
Cleaned and parsed text for chunking library.
TYPE:
|
type |
Text type information for chunking library. Either "docling" for pdf files, "markdown" for docx files or "text" for txt files and plain text input.
TYPE:
|
Source code in docs/microservices/parser/src/models/api_output.py
class RawText(BaseModel):
"""Defines the text incl. type of text, which will be used for chunking.
Attributes:
text (str | DoclingDocument): Cleaned and parsed text for chunking library.
type (str): Text type information for chunking library. Either "docling" for pdf files, "markdown" for docx
files or "text" for txt files and plain text input.
"""
text: str | DoclingDocument
type: str
chunking_models
Pydantic Models for Chunker input parameters.
| CLASS | DESCRIPTION |
|---|---|
Chunk |
Chunk of the parsed text incl. text and metadata. |
ChunkMetadata |
Defines the metadata of each chunk. |
ChunkingParameters |
Defines the input used to chunk parsed text. |
IncludeHeadings |
Defines whether headlines should be included or excluded from each chunks content. |
Chunk
Bases: BaseModel
Chunk of the parsed text incl. text and metadata.
| ATTRIBUTE | DESCRIPTION |
|---|---|
chunk_content |
Text content of this chunk.
TYPE:
|
chunk_metadata |
Metadata of this chunk (as definied by ChunkMetadata).
TYPE:
|
Source code in docs/microservices/parser/src/models/chunking_models.py
ChunkMetadata
Bases: BaseModel
Defines the metadata of each chunk.
| ATTRIBUTE | DESCRIPTION |
|---|---|
chunk_number |
Id of the chunk.
TYPE:
|
chunk_length |
Length of the chunks content as number of chars.
TYPE:
|
filename |
The name of the original file or in case of text input "Texteingabe" or "Default Filename".
TYPE:
|
filetype |
The type of the original file or in case of text input "string".
TYPE:
|
headings |
List of headings. Empty if there is no heading to this chunk.
TYPE:
|
pages |
List of pages within the original pdf document. None in case of chunks from text, txt or docx.
TYPE:
|
locations |
Covering boundingbox and charspan of docling metadata for pdf files only. None in case of chunks from text, txt or docx.
TYPE:
|
Source code in docs/microservices/parser/src/models/chunking_models.py
class ChunkMetadata(BaseModel):
"""Defines the metadata of each chunk.
Attributes:
chunk_number (int): Id of the chunk.
chunk_length (int): Length of the chunks content as number of chars.
filename (str): The name of the original file or in case of text input "Texteingabe" or "Default Filename".
filetype (str): The type of the original file or in case of text input "string".
headings (list[str] | None): List of headings. Empty if there is no heading to this chunk.
pages (list[int] | None): List of pages within the original pdf document.
None in case of chunks from text, txt or docx.
locations (dict[str, Any] | None): Covering boundingbox and charspan of docling metadata for pdf files only.
None in case of chunks from text, txt or docx.
"""
chunk_number: int
chunk_length: int
filename: str
filetype: str
headings: list[str] | None
pages: list[int] | None
locations: list[dict[str, Any]] | None
ChunkingParameters
Bases: BaseModel
Defines the input used to chunk parsed text.
| ATTRIBUTE | DESCRIPTION |
|---|---|
max_chunk_size |
Maximal number of characters per chunk.
TYPE:
|
min_chunk_size |
Minimal number of characters per chunk.
TYPE:
|
text |
Parsed text or file that needs to be chunked.
TYPE:
|
text_length |
Length of parsed text as number of characters.
TYPE:
|
filetype |
Type of parsed text.
TYPE:
|
filename |
The name of the parsed file or in case of text input "Texteingabe".
TYPE:
|
include_headings |
Whether headlines should be included or excluded from each chunks content.
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
check_minimal_text_length_for_chunking |
Validates the required minimal text length to enable chunking with the minimal chunk size settings. |
Source code in docs/microservices/parser/src/models/chunking_models.py
class ChunkingParameters(BaseModel):
"""Defines the input used to chunk parsed text.
Attributes:
max_chunk_size (conint(ge=2)): Maximal number of characters per chunk.
min_chunk_size (PositiveInt): Minimal number of characters per chunk.
text (str | DoclingDocument): Parsed text or file that needs to be chunked.
text_length (int): Length of parsed text as number of characters.
filetype (str): Type of parsed text.
filename (str): The name of the parsed file or in case of text input "Texteingabe".
include_headings (IncludeHeadings): Whether headlines should be included or excluded from each chunks content.
"""
max_chunk_size: conint(ge=2) = sys.maxsize
min_chunk_size: PositiveInt = 1
text: str | DoclingDocument
text_length: int
filetype: Literal["docling", "text", "markdown"]
filename: str = "Default Filename"
include_headings: IncludeHeadings = IncludeHeadings.INCLUDE
@model_validator(mode="after")
def check_minimal_text_length_for_chunking(self) -> Self:
"""Validates the required minimal text length to enable chunking with the minimal chunk size settings.
Raises:
HTTPException: HTTP_400_BAD_REQUEST raised if the text could not be chunked since the text is shorter than
the minimal chunk size or because the parser did return an unkown filetype.
Returns:
ChunkingParameters: validated parameters for chunk sizes.
"""
if (self.text_length < self.min_chunk_size) and (self.min_chunk_size > 1):
logger.error(
f"Chunking failed: text input < minimal chunk size of {self.min_chunk_size} characters."
)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=(
"Der Text konnte nicht zu Chunks verarbeitet werden, da er zu kurz ist. "
"Bitte versuchen Sie es mit einer anderen Datei."
),
)
return self
check_minimal_text_length_for_chunking
Validates the required minimal text length to enable chunking with the minimal chunk size settings.
| RAISES | DESCRIPTION |
|---|---|
HTTPException
|
HTTP_400_BAD_REQUEST raised if the text could not be chunked since the text is shorter than the minimal chunk size or because the parser did return an unkown filetype. |
| RETURNS | DESCRIPTION |
|---|---|
ChunkingParameters
|
validated parameters for chunk sizes.
TYPE:
|
Source code in docs/microservices/parser/src/models/chunking_models.py
@model_validator(mode="after")
def check_minimal_text_length_for_chunking(self) -> Self:
"""Validates the required minimal text length to enable chunking with the minimal chunk size settings.
Raises:
HTTPException: HTTP_400_BAD_REQUEST raised if the text could not be chunked since the text is shorter than
the minimal chunk size or because the parser did return an unkown filetype.
Returns:
ChunkingParameters: validated parameters for chunk sizes.
"""
if (self.text_length < self.min_chunk_size) and (self.min_chunk_size > 1):
logger.error(
f"Chunking failed: text input < minimal chunk size of {self.min_chunk_size} characters."
)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=(
"Der Text konnte nicht zu Chunks verarbeitet werden, da er zu kurz ist. "
"Bitte versuchen Sie es mit einer anderen Datei."
),
)
return self
IncludeHeadings
Bases: StrEnum
Defines whether headlines should be included or excluded from each chunks content.
Values
INCLUDE: Including headings into each chunks content in addition to chunks metadata. EXCLUDE: Excluding heading from each chunks content because headings are part of the chunks metadata.
Source code in docs/microservices/parser/src/models/chunking_models.py
class IncludeHeadings(StrEnum):
"""Defines whether headlines should be included or excluded from each chunks content.
Values:
INCLUDE: Including headings into each chunks content in addition to chunks metadata.
EXCLUDE: Excluding heading from each chunks content because headings are part of the chunks metadata.
"""
INCLUDE = "include"
EXCLUDE = "exclude"
general
Loads and checks Settings from yml.
| CLASS | DESCRIPTION |
|---|---|
LogLevel |
Specifies possible log levels using a enum class. |
Settings |
General settings for the service. |
LogLevel
Bases: StrEnum
Specifies possible log levels using a enum class.
Source code in docs/microservices/parser/src/models/general.py
class LogLevel(StrEnum):
"""Specifies possible log levels using a enum class."""
CRITICAL = "CRITICAL"
ERROR = "ERROR"
WARNING = "WARNING"
INFO = "INFO"
DEBUG = "DEBUG"
@classmethod
def _missing_(cls, value: object) -> None:
"""Convert strings to uppercase and recheck for existance."""
if isinstance(value, str):
value = value.upper()
for level in cls:
if level == value:
return level
return None
Settings
Bases: BaseModel
General settings for the service.
| ATTRIBUTE | DESCRIPTION |
|---|---|
model_config |
Used to ignore other defined settings, which are not used by this service.
TYPE:
|
service_name |
Name of the current service.
TYPE:
|
log_level |
Log level that should be used by the logger.
TYPE:
|
log_file |
Path to logs.
TYPE:
|
log_file_max_bytes |
Maximal size of logfile in bytes.
TYPE:
|
log_file_backup_count |
Number of log-files to loop over.
TYPE:
|
n_uvicorn_workers |
Number of parallel uvicorn instances.
TYPE:
|
docling_models_path |
Path to directrory in which the models for parsing with docling are stored.
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
ensure_log_dir |
Create the log directory after validation. |
Source code in docs/microservices/parser/src/models/general.py
class Settings(BaseModel):
"""General settings for the service.
Attributes:
model_config (ConfigDict): Used to ignore other defined settings, which are not used by this service.
service_name (str): Name of the current service.
log_level (LogLevel): Log level that should be used by the logger.
log_file (FilePath): Path to logs.
log_file_max_bytes (PositiveInt): Maximal size of logfile in bytes.
log_file_backup_count (PositiveInt): Number of log-files to loop over.
n_uvicorn_workers (PositiveInt): Number of parallel uvicorn instances.
docling_models_path (Path): Path to directrory in which the models for parsing with docling are stored.
"""
model_config = ConfigDict(extra="ignore")
service_name: str = "Parser"
service_descripton: str = "Parsing of files and text"
log_level: LogLevel = LogLevel.INFO
log_file_max_bytes: PositiveInt = 1 * 1024 * 1024
log_file_backup_count: PositiveInt = 3
log_file: FilePath = Path("/parser/logs/log")
n_uvicorn_workers: PositiveInt = 1
# default as used via dockerfile
docling_models_path: Path = Path("/parser/src/docling-models/")
@model_validator(mode="after")
def ensure_log_dir(self) -> "Settings":
"""Create the log directory after validation."""
self.log_file.parent.mkdir(parents=True, exist_ok=True)
return self
ensure_log_dir
parser
Defines the parsing class.
Attributes and methods used for parsing of files with type pdf, docx, txt.
The parsing also encompasses text cleaning for parsing output as well as text input. These methods are used by preprocess.py.
| CLASS | DESCRIPTION |
|---|---|
Parser |
Parses files (pdf, docx and txt) as well as cleaning of input text and parsing outputs. |
Parser
Parses files (pdf, docx and txt) as well as cleaning of input text and parsing outputs.
| METHOD | DESCRIPTION |
|---|---|
run_file_parsing |
Performs parsing on an input file of type pdf, docx, or txt. |
run_string_parsing |
Takes a text and cleans it according to the parsers attributes. |
Source code in docs/microservices/parser/src/parser.py
class Parser:
"""Parses files (pdf, docx and txt) as well as cleaning of input text and parsing outputs."""
def run_file_parsing(
self,
input_file: UploadFile,
table_handling: TableHandling = TableHandling.EXCLUDE,
empty_lines_handling: EmptyLinesHandling = EmptyLinesHandling.REMOVE,
) -> ParsingOutput:
"""Performs parsing on an input file of type pdf, docx, or txt.
Args:
table_handling (TableHandling, optional): Indicating if tables should be included.
Default excludes tables during parsing.
empty_lines_handling (EmptyLinesHandling, optional): Indicating if consecutive empty lines should be removed
or kept. Default is removal.
input_file (UploadFile): The file that needs to be parsed and cleaned.
Returns:
Parsed text, a warning message and additional information for chunking in text_raw.
"""
warning_msg = []
# get filetype using the filename suffix
filetype = input_file.filename.split(".")[-1]
if filetype not in ("pdf", "docx", "txt"):
logger.error(
f"File type {filetype.lower()} is not supported."
"Please enter plain text, txt, docx or pdf files."
)
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=(
f"Das Dokument konnte nicht verarbeitet werden, da das eingegebene Dateiformat {filetype.lower()} "
"nicht unterstützt wird. Bitte prüfen Sie die Dateiendung Ihres Dokuments. "
"Wir akzeptieren txt-, docx- und pdf-Dateien sowie Texteingaben."
),
)
else:
try:
# load file to temporary folder
random_dir_name = uuid.uuid4()
with tempfile.TemporaryDirectory(
prefix=str(random_dir_name)
) as tempdir:
temp_file_path = Path(tempdir) / input_file.filename
logger.debug(f"path to tempfile: {temp_file_path}")
with open(temp_file_path, "wb") as temp_file:
shutil.copyfileobj(input_file.file, temp_file)
# get corresponding parser and chunker for the current filetype
if filetype.lower() == "pdf":
parsed_text_raw = self._parse_docling(
path_to_document=temp_file_path,
table_handling=table_handling,
)
parsed_text_type = "docling"
parsed_text = parsed_text_raw.export_to_markdown()
logger.debug("finished pdf parsing and text cleaning")
elif filetype.lower() == "docx":
parse_result = self._parse_docx(
path_to_docx_document=temp_file_path,
table_handling=table_handling,
warning_msg=warning_msg,
)
parsed_text_raw = parse_result["text_raw"]
warning_msg = parse_result["warning_msg"]
if empty_lines_handling == EmptyLinesHandling.REMOVE:
parsed_text_raw = self.run_string_parsing(
text=parsed_text_raw
)
parsed_text_raw = parsed_text_raw.strip()
parsed_text = parsed_text_raw
parsed_text_type = "markdown"
logger.debug("finished docx parsing and text cleaning")
elif filetype.lower() == "txt":
with open(temp_file_path, encoding="utf-8") as f:
parsed_text_raw = f.read()
if empty_lines_handling == EmptyLinesHandling.REMOVE:
parsed_text_raw = self.run_string_parsing(
text=parsed_text_raw
)
parsed_text_raw = parsed_text_raw.strip()
parsed_text = parsed_text_raw
parsed_text_type = "text"
logger.debug("finished txt text cleaning")
except Exception as e:
logger.error(f"During parsing following unexpected error occurred: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=(
"Das Dokument konnte wegen eines unerwarteten Fehlers nicht verarbeitet werden."
),
)
return ParsingOutput(
text=parsed_text,
text_raw={
"text": parsed_text_raw,
"type": parsed_text_type,
},
warning_msg=" ".join(warning_msg),
)
def run_string_parsing(self, text: str) -> str:
"""Takes a text and cleans it according to the parsers attributes.
This function can remove unnecessary lines as well as from the parsing output.
In addition, all leading and trailing spaces and line breaks in the text are removed.
Args:
text (str): Text which needs to be cleaned.
Returns:
Cleaned text.
Raises:
HTTPException: HTTP_400_BAD_REQUEST raised if the parsing input is empty.
"""
text = re.sub(r"\n\s*\n+", "\n\n", text)
if len(text) == 0:
logger.warning("Parsing failed because the parsing input was empty.")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=(
"Der Text konnte nicht verarbeitet werden, da die Eingabe leer war."
),
)
return text
def _parse_docling(
self,
path_to_document: Path,
table_handling: TableHandling,
) -> DoclingDocument:
"""Parses a PDF file using Doclings Document Converter.
Args:
path_to_document (Path): Path to the pdf file, which should be parsed.
table_handling (TableHandling, optional): Indicating if tables should be included.
Default excludes tables during parsing.
Returns:
Parsed text and meta data of the input pdf file.
"""
try:
# configure PDF Pipeline of Docling Document Converter
artifacts_path = settings.docling_models_path
# load prefeched models
pipeline_options = PdfPipelineOptions(artifacts_path=artifacts_path)
# use german ocr model
pipeline_options.ocr_options = EasyOcrOptions(lang=["de"])
# accelerate parsing pipeline by using gpu
accelerator_options = AcceleratorOptions(
num_threads=8, device="cuda", cuda_use_flash_attention2=True
)
pipeline_options.accelerator_options = accelerator_options
# exclude tables
if table_handling == TableHandling.EXCLUDE:
pipeline_options.do_table_structure = False
pipeline_options.table_structure_options.do_cell_matching = False
converter = DocumentConverter(
format_options={
InputFormat.PDF: PdfFormatOption(
pipeline_options=pipeline_options,
backend=PyPdfiumDocumentBackend,
)
}
)
result = converter.convert(path_to_document)
except Exception as e:
logger.error(
f"An error has occurred during the pdf parsing using a docling converter: {e}"
)
raise HTTPException(
status_code=status.HTTP_424_FAILED_DEPENDENCY,
detail=(
"Das Dokument konnte nicht verarbeitet werden. Bitte versuchen Sie es mit einem anderen Dokument. "
),
)
return result.document
def _parse_docx(
self,
path_to_docx_document: Path,
table_handling: TableHandling,
warning_msg: list[str],
) -> dict[str, list[str]]:
"""Parses a docx file into Markdown text.
Args:
path_to_docx_document (Path): Path to docx file.
table_handling (TableHandling, optional): Indicating if tables should be included.
Default excludes tables during parsing.
warning_msg (list[str]): List of messages to the user containing information about the output.
Returns:
Dictionary with key "markdown" containing the Parsed text of the input file formatted as markdown
and key "warning_msg" containing a updated list of messages to the user.
"""
markdown = ""
docx_document = Document(path_to_docx_document)
for paragraph in docx_document.paragraphs:
# use style information of paragraphs to create a markdown
paragraph_style = paragraph.style.name
if paragraph_style.startswith("Heading"):
# get level of heading
heading_level = int(paragraph_style.split()[-1])
markdown += "#" * heading_level + " " + paragraph.text + "\n\n"
else:
markdown += paragraph.text + "\n\n"
try:
if table_handling == TableHandling.INCLUDE:
table_markdown = ""
for table in docx_document.tables:
for idx, row in enumerate(table.rows):
row_cells = [cell.text.strip() for cell in row.cells]
if idx == 0:
# use first row as table header
table_markdown += "| " + " | ".join(row_cells) + " |\n"
table_markdown += (
"| " + " | ".join(["---"] * len(row_cells)) + " |\n"
)
else:
table_markdown += "| " + " | ".join(row_cells) + " |\n"
markdown += table_markdown + "\n"
except Exception as e:
logger.warning(f"An error has occurred during docx table parsing: {e}")
warning_msg.append(
"Die Tabellen des Dokuments konnten nicht verarbeitet werden."
)
return {"text_raw": markdown, "warning_msg": warning_msg}
run_file_parsing
run_file_parsing(input_file, table_handling=TableHandling.EXCLUDE, empty_lines_handling=EmptyLinesHandling.REMOVE)
Performs parsing on an input file of type pdf, docx, or txt.
| PARAMETER | DESCRIPTION |
|---|---|
table_handling
|
Indicating if tables should be included. Default excludes tables during parsing.
TYPE:
|
empty_lines_handling
|
Indicating if consecutive empty lines should be removed or kept. Default is removal.
TYPE:
|
input_file
|
The file that needs to be parsed and cleaned.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ParsingOutput
|
Parsed text, a warning message and additional information for chunking in text_raw. |
Source code in docs/microservices/parser/src/parser.py
def run_file_parsing(
self,
input_file: UploadFile,
table_handling: TableHandling = TableHandling.EXCLUDE,
empty_lines_handling: EmptyLinesHandling = EmptyLinesHandling.REMOVE,
) -> ParsingOutput:
"""Performs parsing on an input file of type pdf, docx, or txt.
Args:
table_handling (TableHandling, optional): Indicating if tables should be included.
Default excludes tables during parsing.
empty_lines_handling (EmptyLinesHandling, optional): Indicating if consecutive empty lines should be removed
or kept. Default is removal.
input_file (UploadFile): The file that needs to be parsed and cleaned.
Returns:
Parsed text, a warning message and additional information for chunking in text_raw.
"""
warning_msg = []
# get filetype using the filename suffix
filetype = input_file.filename.split(".")[-1]
if filetype not in ("pdf", "docx", "txt"):
logger.error(
f"File type {filetype.lower()} is not supported."
"Please enter plain text, txt, docx or pdf files."
)
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=(
f"Das Dokument konnte nicht verarbeitet werden, da das eingegebene Dateiformat {filetype.lower()} "
"nicht unterstützt wird. Bitte prüfen Sie die Dateiendung Ihres Dokuments. "
"Wir akzeptieren txt-, docx- und pdf-Dateien sowie Texteingaben."
),
)
else:
try:
# load file to temporary folder
random_dir_name = uuid.uuid4()
with tempfile.TemporaryDirectory(
prefix=str(random_dir_name)
) as tempdir:
temp_file_path = Path(tempdir) / input_file.filename
logger.debug(f"path to tempfile: {temp_file_path}")
with open(temp_file_path, "wb") as temp_file:
shutil.copyfileobj(input_file.file, temp_file)
# get corresponding parser and chunker for the current filetype
if filetype.lower() == "pdf":
parsed_text_raw = self._parse_docling(
path_to_document=temp_file_path,
table_handling=table_handling,
)
parsed_text_type = "docling"
parsed_text = parsed_text_raw.export_to_markdown()
logger.debug("finished pdf parsing and text cleaning")
elif filetype.lower() == "docx":
parse_result = self._parse_docx(
path_to_docx_document=temp_file_path,
table_handling=table_handling,
warning_msg=warning_msg,
)
parsed_text_raw = parse_result["text_raw"]
warning_msg = parse_result["warning_msg"]
if empty_lines_handling == EmptyLinesHandling.REMOVE:
parsed_text_raw = self.run_string_parsing(
text=parsed_text_raw
)
parsed_text_raw = parsed_text_raw.strip()
parsed_text = parsed_text_raw
parsed_text_type = "markdown"
logger.debug("finished docx parsing and text cleaning")
elif filetype.lower() == "txt":
with open(temp_file_path, encoding="utf-8") as f:
parsed_text_raw = f.read()
if empty_lines_handling == EmptyLinesHandling.REMOVE:
parsed_text_raw = self.run_string_parsing(
text=parsed_text_raw
)
parsed_text_raw = parsed_text_raw.strip()
parsed_text = parsed_text_raw
parsed_text_type = "text"
logger.debug("finished txt text cleaning")
except Exception as e:
logger.error(f"During parsing following unexpected error occurred: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=(
"Das Dokument konnte wegen eines unerwarteten Fehlers nicht verarbeitet werden."
),
)
return ParsingOutput(
text=parsed_text,
text_raw={
"text": parsed_text_raw,
"type": parsed_text_type,
},
warning_msg=" ".join(warning_msg),
)
run_string_parsing
Takes a text and cleans it according to the parsers attributes.
This function can remove unnecessary lines as well as from the parsing output. In addition, all leading and trailing spaces and line breaks in the text are removed.
| PARAMETER | DESCRIPTION |
|---|---|
text
|
Text which needs to be cleaned.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
str
|
Cleaned text. |
| RAISES | DESCRIPTION |
|---|---|
HTTPException
|
HTTP_400_BAD_REQUEST raised if the parsing input is empty. |
Source code in docs/microservices/parser/src/parser.py
def run_string_parsing(self, text: str) -> str:
"""Takes a text and cleans it according to the parsers attributes.
This function can remove unnecessary lines as well as from the parsing output.
In addition, all leading and trailing spaces and line breaks in the text are removed.
Args:
text (str): Text which needs to be cleaned.
Returns:
Cleaned text.
Raises:
HTTPException: HTTP_400_BAD_REQUEST raised if the parsing input is empty.
"""
text = re.sub(r"\n\s*\n+", "\n\n", text)
if len(text) == 0:
logger.warning("Parsing failed because the parsing input was empty.")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=(
"Der Text konnte nicht verarbeitet werden, da die Eingabe leer war."
),
)
return text
settings
Load all settings from a central place, not hidden in utils.
utils
Utils functions for logging and configuration processing.
| MODULE | DESCRIPTION |
|---|---|
base_logger |
Set up the root logger for the entire application. This logger will log messages to the console and a file. |
process_configs |
Methods to load and config and start checks of config integrity. |
base_logger
Set up the root logger for the entire application. This logger will log messages to the console and a file.
| FUNCTION | DESCRIPTION |
|---|---|
setup_logger |
Initialize the logger with the desired log level and add handlers. |
setup_logger
Initialize the logger with the desired log level and add handlers.
Sets up the root logger, which all other loggers inherit from. Adds file, console and exit handlers to the logger and sets the format.
Source code in docs/microservices/parser/src/utils/base_logger.py
def setup_logger() -> None:
"""Initialize the logger with the desired log level and add handlers.
Sets up the root logger, which all other loggers inherit from.
Adds file, console and exit handlers to the logger and sets the format.
"""
logger = logging.getLogger()
# create different handlers for log file and console
file_handler = logging.handlers.RotatingFileHandler(
filename=settings.log_file,
maxBytes=settings.log_file_max_bytes,
backupCount=settings.log_file_backup_count,
)
console_handler = logging.StreamHandler()
# define log format and set for each handler
formatter = logging.Formatter(
fmt="%(asctime)s - %(levelname)8s - %(module)s - %(funcName)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S%z",
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# add handlers to the logger
logger.addHandler(file_handler)
logger.addHandler(console_handler)
logger.setLevel(settings.log_level)
process_configs
Methods to load and config and start checks of config integrity.
| FUNCTION | DESCRIPTION |
|---|---|
load_all_configs |
Load config settings from respective paths. |
load_from_yml_in_pydantic_model |
Load config from 'list_of_yaml_paths' into given pydantic-Model. |
load_yaml |
Load yaml. |
load_all_configs
Load config settings from respective paths.
| PARAMETER | DESCRIPTION |
|---|---|
general_config_paths
|
Path to config, matching 'Settings'.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Settings
|
Config loaded into their Pydantic Model. |
Source code in docs/microservices/parser/src/utils/process_configs.py
def load_all_configs(general_config_paths: Path) -> Settings:
"""Load config settings from respective paths.
Args:
general_config_paths (Path): Path to config, matching 'Settings'.
Returns:
Config loaded into their Pydantic Model.
"""
settings = load_from_yml_in_pydantic_model(general_config_paths, Settings)
return settings
load_from_yml_in_pydantic_model
Load config from 'list_of_yaml_paths' into given pydantic-Model.
| PARAMETER | DESCRIPTION |
|---|---|
yaml_path
|
Yaml to load.
TYPE:
|
pydantic_reference_model
|
Pydantic model to load yaml into.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
BaseModel
|
BaseModel derived pydantic data class. |
Source code in docs/microservices/parser/src/utils/process_configs.py
def load_from_yml_in_pydantic_model(
yaml_path: Path, pydantic_reference_model: BaseModel
) -> BaseModel:
"""Load config from 'list_of_yaml_paths' into given pydantic-Model.
Args:
yaml_path (Path): Yaml to load.
pydantic_reference_model (BaseModel): Pydantic model to load yaml into.
Returns:
BaseModel derived pydantic data class.
"""
data = load_yaml(yaml_path)
try:
pydantic_class = pydantic_reference_model(**data)
logger.info(f"Config loaded from: '{yaml_path}'")
return pydantic_class
except ValidationError as e:
logger.critical(f"Error loading config: '{e}'")
raise e
load_yaml
Load yaml.
| PARAMETER | DESCRIPTION |
|---|---|
yaml_path
|
Path to yaml.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
dict[str, Any]
|
Content of loaded yaml. |