Summary
summary
| MODULE | DESCRIPTION |
|---|---|
main |
Main module of the application. |
src |
Source code of the summary containing core components and utilities. |
main
Main module of the application.
This module serves as the entry point for the program. It imports necessary modules, sets up any initial configuration or data structures, and possibly defines main functions or classes that are used throughout the application.
src
Source code of the summary containing core components and utilities.
| MODULE | DESCRIPTION |
|---|---|
app |
Initializes the app. |
endpoints |
Defines all endpoints of the FastAPI app. |
input_handling |
Handles logic of file and text summary endpoints. |
models |
Models loading and checking API and configuration parameters. |
settings |
Loads all settings from a central place, not hidden in utils. |
summarizing |
Implementation of the core logic of the summary. |
utils |
Utils functions for logging, LLM availability check, LLM authentication and configuration processing. |
app
Initializes the app.
| FUNCTION | DESCRIPTION |
|---|---|
lifespan |
Sets up a scheduler and updates available llms. |
lifespan
async
Sets up a scheduler and updates available llms.
This lifespan function is started on startup of FastAPI. The first part
- till yield is executed on startup and initializes a scheduler to regulary
check the LLM-API. The second part is executed on shutdown and is used to
clean up the scheduler.
The available LLMs - i.e. the LLMs where API-checks passed - are cached in
FastAPI state object as app.state.available_llms.
Source code in docs/microservices/summary/src/app.py
@asynccontextmanager
async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
"""Sets up a scheduler and updates available llms.
This lifespan function is started on startup of FastAPI. The first part
- till `yield` is executed on startup and initializes a scheduler to regulary
check the LLM-API. The second part is executed on shutdown and is used to
clean up the scheduler.
The available LLMs - i.e. the LLMs where API-checks passed - are cached in
FastAPI state object as `app.state.available_llms`.
"""
async def update_llm_state() -> None:
_app.state.available_llms = await get_available_llms()
# store available LLMs in FastAPI app state
_app.state.available_llms = await get_available_llms()
# setup a scheduler
scheduler = AsyncIOScheduler()
scheduler.add_job(
update_llm_state,
"interval",
seconds=settings.check_llm_api_interval_in_s,
)
scheduler.start()
yield
# cleanup
scheduler.shutdown()
endpoints
Defines all endpoints of the FastAPI app.
| FUNCTION | DESCRIPTION |
|---|---|
get_llms |
Returns model information of available LLMs. |
health |
Performs a health check of the summary service. |
summarize_file |
Summarizes a pdf, docx or txt file including its parsing, cleaning and chunking. |
summarize_text |
Summarizes text input including basic text cleaning and chunking of text input. |
get_llms
async
Returns model information of available LLMs.
| PARAMETER | DESCRIPTION |
|---|---|
request
|
Request-Data.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[dict[str, Any]]
|
List with information for each LLM. |
Source code in docs/microservices/summary/src/endpoints.py
@router.get(
"/llms",
summary="List available language models.",
description=("Returns a list of available language models (LLMs).\n\n"),
responses={
200: {
"description": "List of available LLMs.",
"content": {
"application/json": {
"example": [
{
"label": "test_model:mock",
"is_remote": False,
"name": "test_model_mock",
},
]
}
},
},
500: {"description": "Internal server error accessing microservice"},
},
)
async def get_llms(request: Request) -> list[dict[str, Any]]:
"""Returns model information of available LLMs.
Args:
request (Request): Request-Data.
Returns:
List with information for each LLM.
"""
app = request.app # indirectly access the FastAPI app object
return app.state.available_llms
health
async
Performs a health check of the summary service.
| RETURNS | DESCRIPTION |
|---|---|
dict[str, str]
|
Health check message as a dictionary. |
Source code in docs/microservices/summary/src/endpoints.py
@router.get(
"/",
summary="Health check endpoint",
description=(
"Returns a simple message indicating that the summary service is running.\n\n"
"Use this endpoint to verify that the service is alive and responsive."
),
responses={
200: {
"description": "Health check successful",
"content": {
"application/json": {"example": {"status": "Summary is running"}}
},
},
500: {"description": "Internal server error"},
},
)
@router.get(
"/health",
summary="Health check endpoint",
description=(
"Returns a simple message indicating that the summary service is running.\n\n"
"Use this endpoint to verify that the service is alive and responsive."
),
responses={
200: {
"description": "Health check successful",
"content": {
"application/json": {"example": {"status": "Summary is running"}}
},
},
500: {"description": "Internal server error"},
},
)
async def health() -> dict[str, str]:
"""Performs a health check of the summary service.
Returns:
Health check message as a dictionary.
"""
return {"message": f"{settings.service_name} is running"}
summarize_file
async
summarize_file(file=File(..., description='Upload a PDF, DOCX, or TXT file.'), api_input=Depends(SummaryFileAPIInputParameters.as_form))
Summarizes a pdf, docx or txt file including its parsing, cleaning and chunking.
| PARAMETER | DESCRIPTION |
|---|---|
file
|
Input file either pdf, docx or txt.
TYPE:
|
api_input
|
Containing the name of the language model, desired length of summary output as number of DIN-A4 pages, topics on which the summary should be focussed.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SummaryAPIOutput
|
The summary, a message to the user and the parsing output. |
Source code in docs/microservices/summary/src/endpoints.py
@router.post(
"/summary/file",
response_model=SummaryAPIOutput,
summary="File summary endpoint.",
description=(
"Generates a summary for a PDF, DOCX, or TXT file.\n\n"
"The endpoint parses, cleans, chunks the input file and then summarizes the text "
"according to the requested output length, focusing the summary on the user-defined topics "
"and using the specified language model."
),
openapi_extra={
"requestBody": {
"content": {
"application/json": {
"examples": SummaryFileAPIInputParameters.model_config[
"json_schema_extra"
]["openapi_examples"],
}
}
}
},
responses={
200: {
"description": "Successful summary generation.",
"content": {
"application/json": {
"examples": SummaryAPIOutput.model_config["json_schema_extra"][
"openapi_examples"
],
}
},
},
400: {"description": "Invalid request."},
424: {"description": "Failed dependency."},
408: {"description": "Request timeout of a dependency."},
},
)
async def summarize_file(
file: UploadFile = File(..., description="Upload a PDF, DOCX, or TXT file."),
api_input: SummaryFileAPIInputParameters = Depends(
SummaryFileAPIInputParameters.as_form
),
) -> SummaryAPIOutput:
"""Summarizes a pdf, docx or txt file including its parsing, cleaning and chunking.
Args:
file (UploadFile): Input file either pdf, docx or txt.
api_input (SummaryFileAPIInputParameters): Containing the name of the language model, desired length of
summary output as number of DIN-A4 pages, topics on which the summary should be focussed.
Returns:
The summary, a message to the user and the parsing output.
"""
summary_output = await request_handling(
endpoint="file",
parsing_input=file,
language_model=api_input.language_model,
output_length=api_input.output_length,
topics=api_input.topics,
)
return summary_output
summarize_text
async
Summarizes text input including basic text cleaning and chunking of text input.
| PARAMETER | DESCRIPTION |
|---|---|
api_input
|
Containing the input text, name of the language model, desired length of summary output as number of DIN-A4 pages, topics on which the summary should be focussed.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SummaryAPIOutput
|
The summary, a message to the user and the parsing output. |
Source code in docs/microservices/summary/src/endpoints.py
@router.post(
"/summary/text",
response_model=SummaryAPIOutput,
summary="Text summary endpoint.",
description=(
"Generates a summary from a plain text input.\n\n"
"The endpoint performs basic text cleaning, chunking, and then summarizes the text "
"according to the requested output length, focusing the summary on the user-defined topics "
"and using the specified language model."
),
openapi_extra={
"requestBody": {
"content": {
"application/json": {
"examples": SummaryTextAPIInput.model_config["json_schema_extra"][
"openapi_examples"
],
}
}
}
},
responses={
200: {
"description": "Successful summary generation.",
"content": {
"application/json": {
"examples": SummaryAPIOutput.model_config["json_schema_extra"][
"openapi_examples"
],
}
},
},
400: {"description": "Invalid request."},
424: {"description": "Failed dependency."},
408: {"description": "Request timeout of a dependency."},
},
)
async def summarize_text(
api_input: SummaryTextAPIInput,
) -> SummaryAPIOutput:
"""Summarizes text input including basic text cleaning and chunking of text input.
Args:
api_input (SummaryTextAPIInput): Containing the input text, name of the language model, desired length of
summary output as number of DIN-A4 pages, topics on which the summary should be focussed.
Returns:
The summary, a message to the user and the parsing output.
"""
summary_output = await request_handling(
endpoint="text",
parsing_input=api_input.text,
language_model=api_input.language_model,
output_length=api_input.output_length,
topics=api_input.topics,
)
return summary_output
input_handling
Handles logic of file and text summary endpoints.
| FUNCTION | DESCRIPTION |
|---|---|
parsing_request |
Performs error handling for the request to the parsing micro-service. |
request_handling |
This function calls the necessary functions to parse, chunk and summarize a file or text. |
parsing_request
async
Performs error handling for the request to the parsing micro-service.
This includes 3 retries in case the parsing micro-service is not available.
| PARAMETER | DESCRIPTION |
|---|---|
endpoint
|
Either "file" or "text" for endpoint selection.
TYPE:
|
parsing_input
|
Text or file that should be parsed.
TYPE:
|
chunking_input
|
Settings for chunking, which includes chunking mode and two input parameters max_llm_input_chars and min_number_of_chunks. The chunksize will be computed from these input parameters.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ParsingOutput
|
Containing the cleaned and parsed text, chunks and a message to the user. |
| RAISES | DESCRIPTION |
|---|---|
HTTPException
|
HTTP_424_FAILED_DEPENDENCY if the parsing does not finish within an acceptable time. |
HTTPException
|
HTTP_400_BAD_REQUEST if the parser gets some input that can not be parsed. |
Source code in docs/microservices/summary/src/input_handling.py
async def parsing_request(
endpoint: str, parsing_input: UploadFile | str, chunking_input: dict
) -> ParsingOutput:
"""Performs error handling for the request to the parsing micro-service.
This includes 3 retries in case the parsing micro-service is not available.
Args:
endpoint (str): Either "file" or "text" for endpoint selection.
parsing_input (UploadFile | str): Text or file that should be parsed.
chunking_input (dict): Settings for chunking, which includes chunking mode and two input parameters
max_llm_input_chars and min_number_of_chunks. The chunksize will be computed from these input parameters.
Returns:
Containing the cleaned and parsed text, chunks and a message to the user.
Raises:
HTTPException: HTTP_424_FAILED_DEPENDENCY if the parsing does not finish within an acceptable time.
HTTPException: HTTP_400_BAD_REQUEST if the parser gets some input that can not be parsed.
"""
parsing_output = None
config = settings.inter_service_communication.parser
logger.debug(f"Communication with Parser is configured using {config}")
for attempt_counter in range(config.max_attempts):
if attempt_counter > 0:
logger.warning(
f"Retrying request to parser "
f"({attempt_counter} / {config.max_attempts - 1})"
)
try:
response = await _request_parsing_output(
timeout_in_s=config.timeout_in_s,
endpoint=endpoint,
parsing_input=parsing_input,
chunking_input=chunking_input,
)
if response.status_code == httpx.codes.OK:
logger.debug(f"Response from Parser: {response.json()}.")
parsing_output = ParsingOutput(**response.json())
parsed_text_length = len(parsing_output.text)
if parsed_text_length == 0 or parsing_output.chunks is None:
logger.critical(
"Parsing failed because of incomplete parsing output (with length parsed text = "
f"{parsed_text_length} and list of chunks of type = {type(parsing_output.chunks)}). "
"Please check the compatibilty of the summary und parsing microservice versions!"
)
raise HTTPException(
status_code=status.HTTP_424_FAILED_DEPENDENCY,
detail=(
"Der Text konnte nicht verarbeitet werden."
"Bitte wenden Sie sich an Ihren technischen Support."
),
)
else:
logger.critical(
f"Parsing failed with status code {response.status_code}."
)
status_code = status.HTTP_424_FAILED_DEPENDENCY
error_msg = (
"Der Text konnte aufgrund eines unvorhergesehenen Fehlers nicht verarbeitet werden. "
"Bitte versuchen Sie es mit einer anderen Datei."
)
response_payload = response.json()
if "detail" in response_payload:
error_msg = response_payload["detail"]
if response.status_code in (
httpx.codes.BAD_REQUEST,
httpx.codes.UNPROCESSABLE_ENTITY,
):
status_code = response.status_code
raise HTTPException(
status_code=status_code,
detail=error_msg,
)
except httpx.TimeoutException:
logger.error(
"Parsing microservice could not return a parsing result within the accepted time of "
f"{config.timeout_in_s}. Therefore the text could not be summarized!"
)
raise HTTPException(
status_code=status.HTTP_424_FAILED_DEPENDENCY,
detail=(
"Der Text konnte nicht in angemessener Zeit verarbeitet werden. "
"Bitte versuchen Sie es mit einer kleineren Datei."
),
)
except httpx.RequestError as e:
if attempt_counter < config.max_attempts - 1:
logger.warning("Could not connect to parser endpoint.")
await asyncio.sleep(3)
else:
logger.critical(
f"Could not connect to parser endpoint. Giving up after maximal number of retries. Error: {e}."
)
raise HTTPException(
status_code=status.HTTP_424_FAILED_DEPENDENCY,
detail="Aus technischen Gründen kann der Text vorübergehend nicht verarbeitet werden. "
"Bitte versuchen Sie es später erneut.",
)
return parsing_output
request_handling
async
This function calls the necessary functions to parse, chunk and summarize a file or text.
This also includes the computation of parameters, which are required to chunk and summarize the parsed text, using the parsing output and user input.
| PARAMETER | DESCRIPTION |
|---|---|
endpoint
|
Either "file" or "text" for endpoint selection.
TYPE:
|
parsing_input
|
Text or file that should be parsed.
TYPE:
|
language_model
|
Name of the language model.
TYPE:
|
output_length
|
Desired length of summary output as number of DIN-A4 pages.
TYPE:
|
topics
|
User input text stating the topics the summary should focus on.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SummaryAPIOutput
|
Contains the summary, a message to the user and the parsing output. |
Source code in docs/microservices/summary/src/input_handling.py
async def request_handling(
endpoint: str,
parsing_input: UploadFile | str,
language_model: str,
output_length: float,
topics: str | None,
) -> SummaryAPIOutput:
"""This function calls the necessary functions to parse, chunk and summarize a file or text.
This also includes the computation of parameters, which are required to chunk and summarize the parsed text,
using the parsing output and user input.
Args:
endpoint (str): Either "file" or "text" for endpoint selection.
parsing_input (UploadFile | str): Text or file that should be parsed.
language_model (str): Name of the language model.
output_length (float): Desired length of summary output as number of DIN-A4 pages.
topics (str | None): User input text stating the topics the summary should focus on.
Returns:
Contains the summary, a message to the user and the parsing output.
"""
start_request_timer = time.time()
warning_msg = []
# Parsing
max_input_chars_estimate = summary_registry.estimate_max_input_chars(
language_model=language_model
)
chunking_input = {
"mode": "summary_chunking",
"min_number_of_chunks": 5,
"max_llm_input_chars": max_input_chars_estimate["max_input_chars_estimate"],
}
start_parsing_timer = time.time()
parsing_output = await parsing_request(
endpoint=endpoint,
parsing_input=parsing_input,
chunking_input=chunking_input,
)
logger.debug(
f"Whole text parsing, cleaning and chunking took {int(time.time() - start_parsing_timer)} seconds."
)
warning_msg.append(parsing_output.warning_msg)
text = parsing_output.text
chunks = _transform_chunks_to_langchaindocs(chunks=parsing_output.chunks)
summary_parameters = await summary_registry.configure_summary_parameters(
warning_msg=warning_msg,
language_model=language_model,
remaining_context_length=max_input_chars_estimate["remaining_context_length"],
desired_summary_length={
"input_length": len(text),
"output_length": output_length,
},
topics=topics,
)
start_summary_timer = time.time()
summary_output = await summary_registry.request_summary(
summarize_input=SummarizeInput(
language_model=language_model,
chunks=chunks,
summary_parameters=summary_parameters,
),
text=text,
)
end_summary_timer = time.time()
logger.debug(
f"The summarization took {int(end_summary_timer - start_summary_timer)} seconds."
)
logger.debug(
f"Whole summary request incl. parsing and chunking took {int(end_summary_timer - start_request_timer)}"
" seconds."
)
return summary_output
models
Models loading and checking API and configuration parameters.
| MODULE | DESCRIPTION |
|---|---|
api_input |
Pydantic Models for API input parameters. |
api_output |
Pydantic Models for API ouput parameters. |
general |
Defines settings and thereby assigns default values. |
graph_state |
Defines states within the LangGraph performing the summary. |
llms |
Pydantic Models describing an LLM used for LLM config. |
parser_output |
Defines the output of the parsing microservice. |
summary_parameters |
Pydantic Models for parameters neccessary for the summarization. |
api_input
Pydantic Models for API input parameters.
| CLASS | DESCRIPTION |
|---|---|
SummaryFileAPIInputParameters |
Model defining the input parameters as part of a valid Summary request for file endpoint. |
SummaryTextAPIInput |
Model defining the input of a valid Summary request for text endpoint. |
SummaryFileAPIInputParameters
Bases: BaseModel
Model defining the input parameters as part of a valid Summary request for file endpoint.
Due to technical reasons "file (Uploadfile)" can not be part of this pydantic model
even though it is part of the file endpoints input.
| ATTRIBUTE | DESCRIPTION |
|---|---|
language_model |
The name or identifier of the language model to use.
TYPE:
|
output_length |
Desired length of summary output as number of DIN-A4 pages. Default is 0, which will lead to no summary length restrictions. This is the fastest option since the LLM will decide, which length is the most suitable one.
TYPE:
|
topics |
User input text stating the topics the summary should focus on. Default is None, which will lead to a general summary without any focus topics.
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
as_form |
Creates an instance of |
Source code in docs/microservices/summary/src/models/api_input.py
class SummaryFileAPIInputParameters(BaseModel):
"""Model defining the input parameters as part of a valid Summary request for file endpoint.
Note: Due to technical reasons "file (Uploadfile)" can not be part of this pydantic model
even though it is part of the file endpoints input.
Attributes:
language_model (str): The name or identifier of the language model to use.
output_length (NonNegativeFloat, optional): Desired length of summary output as number of DIN-A4 pages.
Default is 0, which will lead to no summary length restrictions. This is the fastest option since
the LLM will decide, which length is the most suitable one.
topics (str | None): User input text stating the topics the summary should focus on.
Default is None, which will lead to a general summary without any focus topics.
"""
language_model: str
output_length: NonNegativeFloat = 0
topics: str | None = None
model_config = ConfigDict(
json_schema_extra={
"openapi_examples": {
"simple_summary": {
"summary": "Simple summary request",
"description": (
"Example input for a summary without predefined output length or focus topic settings."
),
"value": {
"file": "tests/data/txt-testfile.txt",
"language_model": "test_model_mock",
},
},
"fast_summary": {
"summary": "Fast summary request",
"description": (
"Example input for a fast summary. Using a txt file without predefined output length but "
"with focus topic setting."
),
"value": {
"file": "tests/data/txt-testfile.txt",
"language_model": "test_model_mock",
"topics": "Städte",
},
},
"output_length_summary": {
"summary": "Summary request with output length",
"description": "Example input for a summary with predefined output length.",
"value": {
"file": "tests/data/txt-testfile.txt",
"language_model": "test_model_mock",
"output_length": 2.5,
},
},
"focus_topic_summary": {
"summary": "Summary request with focus topic",
"description": "Example input for a summary with focus on a specific topic.",
"value": {
"file": "tests/data/pdf-testfile.pdf",
"language_model": "test_model_mock",
"topics": "Open Source, Community-Gedanke",
},
},
}
}
)
@classmethod
def as_form(
cls,
language_model: str = Form(
...,
description="The name or identifier of the language model to use.",
example="test_model_mock",
),
output_length: float = Form(
0,
description=(
"Desired summary length in DIN-A4 pages. "
"Default 0 = no restriction and high performance."
),
example=3.0,
ge=0,
),
topics: str | None = Form(
None,
description=(
"Comma-separated topics the summary should focus on. "
"Default None = general summary without any focus."
),
example="public administration, artificial intelligence, digitization",
),
) -> Self:
"""Creates an instance of `SummaryFileAPIInputParameters` from multipart form-data.
FastAPI does not automatically map Pydantic models from form-data in
multipart requests. This helper enables the model to be used together
with file uploads by defining how form parameters should be parsed.
Args:
language_model (str): Selected language model.
output_length (float): Desired summary length in pages.
topics (str | None): User-defined focus topics.
Returns:
A validated input parameter set.
"""
return cls(
language_model=language_model,
output_length=output_length,
topics=topics,
)
as_form
classmethod
as_form(language_model=Form(..., description='The name or identifier of the language model to use.', example='test_model_mock'), output_length=Form(0, description='Desired summary length in DIN-A4 pages. Default 0 = no restriction and high performance.', example=3.0, ge=0), topics=Form(None, description='Comma-separated topics the summary should focus on. Default None = general summary without any focus.', example='public administration, artificial intelligence, digitization'))
Creates an instance of SummaryFileAPIInputParameters from multipart form-data.
FastAPI does not automatically map Pydantic models from form-data in multipart requests. This helper enables the model to be used together with file uploads by defining how form parameters should be parsed.
| PARAMETER | DESCRIPTION |
|---|---|
language_model
|
Selected language model.
TYPE:
|
output_length
|
Desired summary length in pages.
TYPE:
|
topics
|
User-defined focus topics.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Self
|
A validated input parameter set. |
Source code in docs/microservices/summary/src/models/api_input.py
@classmethod
def as_form(
cls,
language_model: str = Form(
...,
description="The name or identifier of the language model to use.",
example="test_model_mock",
),
output_length: float = Form(
0,
description=(
"Desired summary length in DIN-A4 pages. "
"Default 0 = no restriction and high performance."
),
example=3.0,
ge=0,
),
topics: str | None = Form(
None,
description=(
"Comma-separated topics the summary should focus on. "
"Default None = general summary without any focus."
),
example="public administration, artificial intelligence, digitization",
),
) -> Self:
"""Creates an instance of `SummaryFileAPIInputParameters` from multipart form-data.
FastAPI does not automatically map Pydantic models from form-data in
multipart requests. This helper enables the model to be used together
with file uploads by defining how form parameters should be parsed.
Args:
language_model (str): Selected language model.
output_length (float): Desired summary length in pages.
topics (str | None): User-defined focus topics.
Returns:
A validated input parameter set.
"""
return cls(
language_model=language_model,
output_length=output_length,
topics=topics,
)
SummaryTextAPIInput
Bases: BaseModel
Model defining the input of a valid Summary request for text endpoint.
| ATTRIBUTE | DESCRIPTION |
|---|---|
text |
The text content to be summarized.
TYPE:
|
language_model |
The name or identifier of the language model to use.
TYPE:
|
output_length |
Desired length of summary output as number of DIN-A4 pages. Default is 0, which will lead to no summary length restrictions. This is the fastest option since the LLM will decide, which length is the most suitable one.
TYPE:
|
topics |
User input text stating the topics the summary should focus on. Default is None, which will lead to a general summary without any focus topics.
TYPE:
|
Source code in docs/microservices/summary/src/models/api_input.py
class SummaryTextAPIInput(BaseModel):
"""Model defining the input of a valid Summary request for text endpoint.
Attributes:
text (str): The text content to be summarized.
language_model (str): The name or identifier of the language model to use.
output_length (float, optional): Desired length of summary output as number of DIN-A4 pages.
Default is 0, which will lead to no summary length restrictions. This is the fastest option since
the LLM will decide, which length is the most suitable one.
topics (str | None): User input text stating the topics the summary should focus on.
Default is None, which will lead to a general summary without any focus topics.
"""
text: str
language_model: str
output_length: float = Field(0, ge=0, strict=True)
topics: str | None = None
model_config = ConfigDict(
json_schema_extra={
"openapi_examples": {
"simple_summary": {
"summary": "Simple summary request",
"description": (
"Example input for a summary without predefined output length or focus topic settings."
),
"value": {
"text": (
"F13 ist eine vollständig souveräne und Modell-agnostische KI-Assistenz, die auf eigener "
"Infrastruktur betrieben werden kann und von der Verwaltung für die Verwaltung entwickelt "
"wurde. F13 stellt eine Vielzahl modularer, flexibel einsetzbarer KI-Funktionen bereit – "
"zum Verarbeiten von Informationen, Erstellen von Texten, oder zum Recherchieren. Nutzende "
"können mit F13 gezielt, wirksam und sicher Generative Künstliche Intelligenz bei "
"täglichen Büroarbeiten einsetzen. Mit dem Übergang in eine Open-Source-Weiterentwicklung "
"wird F13 seit Juli 2025 als gemeinschaftliches Vorhaben geöffnet: Der Quellcode steht "
"frei zur Verfügung, die Nutzung ist offen. Interessierte Verwaltungen, Organisationen "
"sowie Entwicklerinnen und Entwickler sind eingela-den, das System für eigene Bedarfe "
"anzupassen, weiterzuentwickeln und diese Entwicklungen wiederum Open Source "
"bereitzustellen."
),
"language_model": "test_model_mock",
},
},
"fast_summary": {
"summary": "Fast summary request",
"description": (
"Example input for a fast summary without predefined output length but with focus topic "
"setting."
),
"value": {
"text": (
"F13 ist eine vollständig souveräne und Modell-agnostische KI-Assistenz, die auf eigener "
"Infrastruktur betrieben werden kann und von der Verwaltung für die Verwaltung entwickelt "
"wurde. F13 stellt eine Vielzahl modularer, flexibel einsetzbarer KI-Funktionen bereit – "
"zum Verarbeiten von Informationen, Erstellen von Texten, oder zum Recherchieren. Nutzende "
"können mit F13 gezielt, wirksam und sicher Generative Künstliche Intelligenz bei "
"täglichen Büroarbeiten einsetzen. Mit dem Übergang in eine Open-Source-Weiterentwicklung "
"wird F13 seit Juli 2025 als gemeinschaftliches Vorhaben geöffnet: Der Quellcode steht "
"frei zur Verfügung, die Nutzung ist offen. Interessierte Verwaltungen, Organisationen "
"sowie Entwicklerinnen und Entwickler sind eingela-den, das System für eigene Bedarfe "
"anzupassen, weiterzuentwickeln und diese Entwicklungen wiederum Open Source "
"bereitzustellen."
),
"language_model": "test_model_mock",
"topics": "KI, Papier",
},
},
"output_length_summary": {
"summary": "Summary request with output length",
"description": "Example input for a summary with predefined output length",
"value": {
"text": (
"F13 ist eine vollständig souveräne und Modell-agnostische KI-Assistenz, die auf eigener "
"Infrastruktur betrieben werden kann und von der Verwaltung für die Verwaltung entwickelt "
"wurde. F13 stellt eine Vielzahl modularer, flexibel einsetzbarer KI-Funktionen bereit – "
"zum Verarbeiten von Informationen, Erstellen von Texten, oder zum Recherchieren. Nutzende "
"können mit F13 gezielt, wirksam und sicher Generative Künstliche Intelligenz bei "
"täglichen Büroarbeiten einsetzen. Mit dem Übergang in eine Open-Source-Weiterentwicklung "
"wird F13 seit Juli 2025 als gemeinschaftliches Vorhaben geöffnet: Der Quellcode steht "
"frei zur Verfügung, die Nutzung ist offen. Interessierte Verwaltungen, Organisationen "
"sowie Entwicklerinnen und Entwickler sind eingela-den, das System für eigene Bedarfe "
"anzupassen, weiterzuentwickeln und diese Entwicklungen wiederum Open Source "
"bereitzustellen."
),
"output_length": 2.5,
"language_model": "test_model_mock",
},
},
"focus_topic_summary": {
"summary": "Summary request with focus topic",
"description": "Example input for a summary with focus on a specific topic.",
"value": {
"text": (
"F13 ist eine vollständig souveräne und Modell-agnostische KI-Assistenz, die auf eigener "
"Infrastruktur betrieben werden kann und von der Verwaltung für die Verwaltung entwickelt "
"wurde. F13 stellt eine Vielzahl modularer, flexibel einsetzbarer KI-Funktionen bereit – "
"zum Verarbeiten von Informationen, Erstellen von Texten, oder zum Recherchieren. Nutzende "
"können mit F13 gezielt, wirksam und sicher Generative Künstliche Intelligenz bei "
"täglichen Büroarbeiten einsetzen. Mit dem Übergang in eine Open-Source-Weiterentwicklung "
"wird F13 seit Juli 2025 als gemeinschaftliches Vorhaben geöffnet: Der Quellcode steht "
"frei zur Verfügung, die Nutzung ist offen. Interessierte Verwaltungen, Organisationen "
"sowie Entwicklerinnen und Entwickler sind eingeladen, das System für eigene Bedarfe "
"anzupassen, weiterzuentwickeln und diese Entwicklungen wiederum Open Source "
"bereitzustellen."
),
"topics": "Open Source, Community-Gedanke",
"language_model": "test_model_mock",
},
},
}
}
)
api_output
Pydantic Models for API ouput parameters.
| CLASS | DESCRIPTION |
|---|---|
SummaryAPIOutput |
Summary response output of summary generation. |
SummaryAPIOutput
Bases: BaseModel
Summary response output of summary generation.
| ATTRIBUTE | DESCRIPTION |
|---|---|
summary |
The generated summary text.
TYPE:
|
parsed_text |
The parsed and preprocessed source text used for summarization.
TYPE:
|
warning_msg |
Optional warning message (e.g. about text length of the generated summary).
TYPE:
|
Source code in docs/microservices/summary/src/models/api_output.py
class SummaryAPIOutput(BaseModel):
"""Summary response output of summary generation.
Attributes:
summary (str): The generated summary text.
parsed_text (str): The parsed and preprocessed source text used for summarization.
warning_msg (str): Optional warning message (e.g. about text length of the generated summary).
"""
summary: str
parsed_text: str
warning_msg: str
model_config = ConfigDict(
json_schema_extra={
"openapi_examples": {
"summary_output": {
"summary": "Summary output with warning",
"description": "Example showing a summary with a warning message.",
"value": {
"summary": "This is the generated summary of the document.",
"parsed_text": "Original source text preprocessed for summarization.",
"warning_msg": "This is a message to the user encompassing hints or warnings.",
},
},
}
}
)
general
Defines settings and thereby assigns default values.
| CLASS | DESCRIPTION |
|---|---|
ActiveLLMs |
Selects the available models for the respective use cases. |
InterServiceCommunication |
Configuration of all microservice communications. |
LogLevel |
Specifies possible log levels using a enum class. |
PostConfig |
Configuration for async_post request to other microservices (e.g. parser). |
Settings |
Specifies general settings for the service. |
ActiveLLMs
Bases: BaseModel
Selects the available models for the respective use cases.
| ATTRIBUTE | DESCRIPTION |
|---|---|
model_config |
Used to ignore other services, which are defined in the config.
TYPE:
|
summary |
List the names of available LLMs for the summary service.
TYPE:
|
Source code in docs/microservices/summary/src/models/general.py
class ActiveLLMs(BaseModel):
"""Selects the available models for the respective use cases.
Attributes:
model_config (ConfigDict): Used to ignore other services, which are defined in the config.
summary (List(str)): List the names of available LLMs for the summary service.
"""
model_config = ConfigDict(extra="ignore")
summary: list[str]
InterServiceCommunication
Bases: BaseModel
Configuration of all microservice communications.
| PARAMETER | DESCRIPTION |
|---|---|
parser
|
Default configuration for parsing microservice.
TYPE:
|
Source code in docs/microservices/summary/src/models/general.py
LogLevel
Bases: StrEnum
Specifies possible log levels using a enum class.
Source code in docs/microservices/summary/src/models/general.py
class LogLevel(StrEnum):
"""Specifies possible log levels using a enum class."""
CRITICAL = "CRITICAL"
ERROR = "ERROR"
WARNING = "WARNING"
INFO = "INFO"
DEBUG = "DEBUG"
@classmethod
def _missing_(cls, value: object) -> None:
"""Converts strings to uppercase and recheck for existence."""
if isinstance(value, str):
value = value.upper()
for level in cls:
if level == value:
return level
return None
PostConfig
Bases: BaseModel
Configuration for async_post request to other microservices (e.g. parser).
The default values in this class can be overwritten by those values stated in configs/general.yml.
| ATTRIBUTE | DESCRIPTION |
|---|---|
model_config |
Used to ignore other services, which are defined in the config.
TYPE:
|
max_attempts |
Maximum number of request attempts before returning status code 424.
TYPE:
|
timeout_in_s |
Maximum waiting duration before timeout (in seconds).
TYPE:
|
These values can be overwritten by using the following code-sniped in general.yml:
inter_service_communication: parser: max_attempts: 3 connection_timeout: 200
Source code in docs/microservices/summary/src/models/general.py
class PostConfig(BaseModel):
"""Configuration for async_post request to other microservices (e.g. parser).
The default values in this class can be overwritten by those values stated in configs/general.yml.
Attributes:
model_config (ConfigDict): Used to ignore other services, which are defined in the config.
max_attempts (PositiveInt): Maximum number of request attempts before returning status code 424.
timeout_in_s (PositiveInt): Maximum waiting duration before timeout (in seconds).
Note: These values can be overwritten by using the following code-sniped in general.yml:
inter_service_communication:
parser:
max_attempts: 3
connection_timeout: 200
"""
model_config = ConfigDict(extra="ignore")
max_attempts: PositiveInt = 3
timeout_in_s: PositiveInt = 200
Settings
Bases: BaseModel
Specifies general settings for the service.
| ATTRIBUTE | DESCRIPTION |
|---|---|
model_config |
Used to ignore other services, which are defined in the config.
TYPE:
|
service_name |
Name of service, i.e. 'summary'.
TYPE:
|
service_endpoints |
URLs of required services (e.g. parser).
TYPE:
|
active_llms |
Selection of available models for respective use cases.
TYPE:
|
log_level |
Minimal level of logging output given.
TYPE:
|
log_file_max_bytes |
(PositiveInt): Max file size for logfile.
TYPE:
|
log_file_backup_count |
Number of log-files to loop over.
TYPE:
|
log_file |
Write logfile there.
TYPE:
|
check_llm_api_interval_in_s |
Interval for checking all LLM APIs (seconds).
TYPE:
|
n_uvicorn_workers |
Number of parallel uvicorn instances.
TYPE:
|
inter_service_communictaion |
Configuration of communication with other services. |
| METHOD | DESCRIPTION |
|---|---|
ensure_log_dir |
Creates the log directory after validation. |
Source code in docs/microservices/summary/src/models/general.py
class Settings(BaseModel):
"""Specifies general settings for the service.
Attributes:
model_config (ConfigDict): Used to ignore other services, which are defined in the config.
service_name (str): Name of service, i.e. 'summary'.
service_endpoints (dict[str, AnyHttpUrl]): URLs of required services (e.g. parser).
active_llms (ActiveLLMs): Selection of available models for respective use cases.
log_level (LogLevel): Minimal level of logging output given.
log_file_max_bytes: (PositiveInt): Max file size for logfile.
log_file_backup_count (PositiveInt): Number of log-files to loop over.
log_file (FilePath): Write logfile there.
check_llm_api_interval_in_s (PositiveInt): Interval for checking all LLM APIs (seconds).
n_uvicorn_workers (PositiveInt): Number of parallel uvicorn instances.
inter_service_communictaion (InterServiceCommunication): Configuration of communication with other services.
"""
model_config = ConfigDict(extra="ignore")
service_name: str = "Summary"
service_description: str = "Generation of summaries of files and text using LLMs."
# number of parallel uvicorn instances
n_uvicorn_workers: PositiveInt = 1
active_llms: ActiveLLMs
# interval for checking all LLM APIs (seconds)
check_llm_api_interval_in_s: PositiveInt = 60
service_endpoints: dict[str, AnyHttpUrl]
log_level: LogLevel = LogLevel.INFO
log_file_max_bytes: PositiveInt = 1 * 1024 * 1024
log_file_backup_count: PositiveInt = 3
log_file: FilePath = Path("/summary/logs/log")
inter_service_communication: InterServiceCommunication = InterServiceCommunication()
@model_validator(mode="after")
def ensure_log_dir(self) -> "Settings":
"""Creates the log directory after validation."""
self.log_file.parent.mkdir(parents=True, exist_ok=True)
return self
ensure_log_dir
graph_state
Defines states within the LangGraph performing the summary.
| CLASS | DESCRIPTION |
|---|---|
OverallState |
Defines the overall state of the LangGraph performing the summary. |
SummaryState |
Defines the status for the generation of individual summaries. |
OverallState
Bases: TypedDict
Defines the overall state of the LangGraph performing the summary.
Contains contents and summaries of all chunks.
| ATTRIBUTE | DESCRIPTION |
|---|---|
contents |
List containing the content for each chunk.
TYPE:
|
summaries |
List containing the summaries for each chunks content.
TYPE:
|
collapsed_summaries |
List containing the summaries of summaries.
TYPE:
|
final_summary |
Final consolidated summary (part of summary output).
TYPE:
|
messages |
List of messages to the user regarding the summary (part of summary output).
TYPE:
|
quit_reducing |
Tracks whether the REDUCE-Loop was successful.
TYPE:
|
num_reduce_call |
Counter of the REDUCE-calls for testing purposes.
TYPE:
|
desired_summary_chars |
Desired number of chars for the final summary output (reduce-loop criterion).
TYPE:
|
max_input_chars |
Maximal number of chars to generate final summary in one LLM-call (reduce criterion).
TYPE:
|
focus_instructions_map |
MAP-prompt part instructing to summarize with focus on specified topics.
TYPE:
|
focus_instructions_reduce |
REDUCE-Prompt part structuring the summary according to topics.
TYPE:
|
focus_instructions_final |
FINAL-Prompt part structuring the summary according to topics and including a note for all topics, that are not covered by the text.
TYPE:
|
Source code in docs/microservices/summary/src/models/graph_state.py
class OverallState(TypedDict):
"""Defines the overall state of the LangGraph performing the summary.
Contains contents and summaries of all chunks.
Attributes:
contents (list[str]): List containing the content for each chunk.
summaries (list[str]): List containing the summaries for each chunks content.
collapsed_summaries (list[Document]): List containing the summaries of summaries.
final_summary (str): Final consolidated summary (part of summary output).
messages (list[str]): List of messages to the user regarding the summary (part of summary output).
quit_reducing (bool): Tracks whether the REDUCE-Loop was successful.
num_reduce_call (int): Counter of the REDUCE-calls for testing purposes.
desired_summary_chars (int): Desired number of chars for the final summary output (reduce-loop criterion).
max_input_chars (int): Maximal number of chars to generate final summary in one LLM-call (reduce criterion).
focus_instructions_map (str): MAP-prompt part instructing to summarize with focus on specified topics.
focus_instructions_reduce (str): REDUCE-Prompt part structuring the summary according to topics.
focus_instructions_final (str): FINAL-Prompt part structuring the summary according to topics and including a
note for all topics, that are not covered by the text.
"""
contents: list[str]
summaries: Annotated[list, operator.add]
collapsed_summaries: list[Document]
final_summary: str
quit_reducing: bool
num_reduce_call: int
messages: list[str]
desired_summary_chars: int
max_input_chars: int
focus_instructions_map: str
focus_instructions_reduce: str
focus_instructions_final: str
SummaryState
Bases: TypedDict
Defines the status for the generation of individual summaries.
Used during the mapping process as input for generate_summary().
| ATTRIBUTE | DESCRIPTION |
|---|---|
content |
Text section to be summarized.
TYPE:
|
focus_instructions_map |
MAP-prompt part instructing to summarize with focus on specified topics.
TYPE:
|
Source code in docs/microservices/summary/src/models/graph_state.py
class SummaryState(TypedDict):
"""Defines the status for the generation of individual summaries.
Used during the mapping process as input for generate_summary().
Attributes:
content (str): Text section to be summarized.
focus_instructions_map (str): MAP-prompt part instructing to summarize with focus on specified topics.
"""
content: str
focus_instructions_map: str
llms
Pydantic Models describing an LLM used for LLM config.
| CLASS | DESCRIPTION |
|---|---|
APIAuth |
Defines Authentification settings for LLM. |
LLM |
Defines the basic structure of a LLM config. |
LLMAPI |
Defines API-Connection to LLM. |
LLMConfig |
Defines the LLMs used for summarization. |
LLMInference |
Defines the inference parameters. |
LLMPromptComponents |
Defines the components of each prompt. |
LLMPromptConfig |
Defines the structure of a LLM prompt configuration. |
LLMPromptMaps |
Defines complete LLM prompt config. |
LLMPrompts |
Defines the prompts. |
APIAuth
Bases: BaseModel
Defines Authentification settings for LLM.
| ATTRIBUTE | DESCRIPTION |
|---|---|
type |
Either 'token' or 'basic_auth'.
TYPE:
|
secret_path |
File path where the api token or credentials are stored.
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
get_auth_header |
Generate auth part of header for http request. |
Source code in docs/microservices/summary/src/models/llms.py
class APIAuth(BaseModel):
"""Defines Authentification settings for LLM.
Attributes:
type (Literal): Either 'token' or 'basic_auth'.
secret_path (FilePath): File path where the api token or credentials are stored.
"""
type: Literal["token", "basic_auth"]
secret_path: FilePath
@property
def secret(self) -> SecretStr:
"""Load secret variable as 'secret'."""
with open(self.secret_path) as file:
return SecretStr(file.read().strip())
def get_auth_header(self) -> str:
"""Generate auth part of header for http request.
Returns:
str: Auth header.
"""
auth_header = ""
if self.type == "basic_auth":
auth_header = f"Basic {base64.b64encode(self.secret.get_secret_value().encode()).decode()}"
elif self.type == "token":
auth_header = f"Bearer {self.secret.get_secret_value()}"
return auth_header
get_auth_header
Generate auth part of header for http request.
| RETURNS | DESCRIPTION |
|---|---|
str
|
Auth header.
TYPE:
|
Source code in docs/microservices/summary/src/models/llms.py
def get_auth_header(self) -> str:
"""Generate auth part of header for http request.
Returns:
str: Auth header.
"""
auth_header = ""
if self.type == "basic_auth":
auth_header = f"Basic {base64.b64encode(self.secret.get_secret_value().encode()).decode()}"
elif self.type == "token":
auth_header = f"Bearer {self.secret.get_secret_value()}"
return auth_header
LLM
Bases: BaseModel
Defines the basic structure of a LLM config.
| ATTRIBUTE | DESCRIPTION |
|---|---|
label |
Human-readable model name that can be presented to users.
TYPE:
|
model |
Model name which is used in API call, e.g. ollama tag.
TYPE:
|
prompt_map |
Prompt map name to load LLMPromptMaps from.
TYPE:
|
is_remote |
Is this LLM hosted at an external API?
TYPE:
|
context_length |
Model's context length.
TYPE:
|
api |
API information.
TYPE:
|
inference |
Inference parameters.
TYPE:
|
prompt_config |
Prompts.
TYPE:
|
Source code in docs/microservices/summary/src/models/llms.py
class LLM(BaseModel):
"""Defines the basic structure of a LLM config.
Attributes:
label (str): Human-readable model name that can be presented to users.
model (str): Model name which is used in API call, e.g. ollama tag.
prompt_map (str): Prompt map name to load LLMPromptMaps from.
is_remote (bool | None): Is this LLM hosted at an external API?
context_length (PositiveInt): Model's context length.
api (LLMAPI): API information.
inference (LLMInference | None): Inference parameters.
prompt_config (LLMPromptConfig | None): Prompts.
"""
label: str
model: str
prompt_map: str
is_remote: bool | None = True
context_length: PositiveInt
api: LLMAPI
inference: LLMInference | None = LLMInference()
prompt_config: LLMPromptConfig | None = None
LLMAPI
Bases: BaseModel
Defines API-Connection to LLM.
| ATTRIBUTE | DESCRIPTION |
|---|---|
url |
Url of the LLM.
TYPE:
|
health_check |
Relative path to health check, i.e. '/models'.
TYPE:
|
auth |
Pydantic Model defining the authentication of the LLM.
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
get_health_check_url |
Get the URL to check if API is available. |
Source code in docs/microservices/summary/src/models/llms.py
class LLMAPI(BaseModel):
"""Defines API-Connection to LLM.
Attributes:
url (AnyHttpUrl): Url of the LLM.
health_check (str | None): Relative path to health check, i.e. '/models'.
auth (APIAuth | None): Pydantic Model defining the authentication of the LLM.
"""
url: AnyHttpUrl
health_check: str | None = None
auth: APIAuth | None = None
def get_health_check_url(self) -> str:
"""Get the URL to check if API is available."""
if self.health_check:
# make sure to remove trailing and leading slashes to not override path
return urljoin(
str(self.url).rstrip("/") + "/",
self.health_check.lstrip("/"),
)
return str(self.url)
get_health_check_url
Get the URL to check if API is available.
Source code in docs/microservices/summary/src/models/llms.py
LLMConfig
Bases: BaseModel
Defines the LLMs used for summarization.
| ATTRIBUTE | DESCRIPTION |
|---|---|
model_config |
Used to ignore other services, which are defined in the config.
TYPE:
|
summary |
Dictionary containing a name and definition of LLMs's available for summarization.
TYPE:
|
Source code in docs/microservices/summary/src/models/llms.py
class LLMConfig(BaseModel):
"""Defines the LLMs used for summarization.
Attributes:
model_config (ConfigDict): Used to ignore other services, which are defined in the config.
summary (dict[str, LLM] | None): Dictionary containing a name and definition of LLMs's
available for summarization.
"""
model_config = ConfigDict(extra="ignore")
summary: dict[str, LLM] | None = []
def __iter__(self) -> Iterator[str]:
"""Get 'keys' for automatic merge with i.e. LLMPromptConfig.
Returns:
Iterator[str]: keys
"""
return iter(self.__dict__.keys())
def __getitem__(self, service: str) -> dict[str, LLM]:
"""Get all LLMs for a given service (e.g. "summary", "rag").
Args:
service (str): The service name (e.g., "summary", "rag").
Returns:
dict[str, LLM]: All configered LLMs for the given service.
"""
return self.__getattribute__(service)
LLMInference
Bases: BaseModel
Defines the inference parameters.
| ATTRIBUTE | DESCRIPTION |
|---|---|
temperature |
Randomness / variation of the output High values indicate more creativity. Default is 0.1.
TYPE:
|
max_tokens |
Maximum number of tokens of the generated response. Default is 2048.
TYPE:
|
top_p |
Threshold for sampling only from the most likely tokens. Default is 0.1.
TYPE:
|
timeout |
Maximal waiting time before request is canceled due to absent response. Default is 600.
TYPE:
|
max_retries |
Number of request retries in case of failure. Default is 5.
TYPE:
|
frequency_penalty |
Likelihood of the model repeating the same phrases. Default is 0.1.
TYPE:
|
presence_penalty |
Penalizing tokens that have already appeared. Default is 0.1.
TYPE:
|
Source code in docs/microservices/summary/src/models/llms.py
class LLMInference(BaseModel):
"""Defines the inference parameters.
Attributes:
temperature (PositiveFloat | None): Randomness / variation of the output High values indicate more creativity.
Default is 0.1.
max_tokens (PositiveInt | None): Maximum number of tokens of the generated response. Default is 2048.
top_p (PositiveFloat | None): Threshold for sampling only from the most likely tokens. Default is 0.1.
timeout (int): Maximal waiting time before request is canceled due to absent response. Default is 600.
max_retries (int | None): Number of request retries in case of failure. Default is 5.
frequency_penalty (float | None): Likelihood of the model repeating the same phrases. Default is 0.1.
presence_penalty (float | None): Penalizing tokens that have already appeared. Default is 0.1.
"""
temperature: PositiveFloat | None = 0.1
max_tokens: PositiveInt | None = 2048
top_p: float | None = 0.1
timeout: PositiveInt = 600
max_retries: PositiveInt | None = 3
frequency_penalty: PositiveFloat | None = 0.1
presence_penalty: PositiveFloat | None = 0.1
LLMPromptComponents
Bases: BaseModel
Defines the components of each prompt.
| ATTRIBUTE | DESCRIPTION |
|---|---|
model_config |
Used to ignore other services, which are defined in the config.
TYPE:
|
main |
Prompts for MAP, REDUCE or FINAL step of summarization. It can be expanded by further instructions (e.g., focus_instructions) to activate additional summary features.
TYPE:
|
focus_instructions |
Parts of MAP, REDUCE or FINAL prompt containing the instructions to focus the summary on topics specified by the user.
TYPE:
|
Source code in docs/microservices/summary/src/models/llms.py
class LLMPromptComponents(BaseModel):
"""Defines the components of each prompt.
Attributes:
model_config (ConfigDict): Used to ignore other services, which are defined in the config.
main (str): Prompts for MAP, REDUCE or FINAL step of summarization. It can be expanded by further instructions
(e.g., focus_instructions) to activate additional summary features.
focus_instructions (str): Parts of MAP, REDUCE or FINAL prompt containing the instructions to focus the summary
on topics specified by the user.
"""
model_config = ConfigDict(extra="ignore")
main: str
focus_instructions: str
LLMPromptConfig
Bases: BaseModel
Defines the structure of a LLM prompt configuration.
| ATTRIBUTE | DESCRIPTION |
|---|---|
model_config |
Used to ignore other services, which are defined in the config.
TYPE:
|
system |
Prompt that defines the role and the rules for the LLMs behaviour.
TYPE:
|
user |
Prompt that gives specific instructions and user input to the LLM.
TYPE:
|
assistant |
Prompt that supports the user.
TYPE:
|
Source code in docs/microservices/summary/src/models/llms.py
class LLMPromptConfig(BaseModel):
"""Defines the structure of a LLM prompt configuration.
Attributes:
model_config (ConfigDict): Used to ignore other services, which are defined in the config.
system (str | None): Prompt that defines the role and the rules for the LLMs behaviour.
user (str | None): Prompt that gives specific instructions and user input to the LLM.
assistant (str | None): Prompt that supports the user.
"""
model_config = ConfigDict(extra="ignore")
system: LLMPrompts
user: LLMPrompts | None = None
assistant: LLMPrompts | None = None
LLMPromptMaps
Bases: BaseModel
Defines complete LLM prompt config.
| ATTRIBUTE | DESCRIPTION |
|---|---|
model_config |
Used to ignore other services, which are defined in the config.
TYPE:
|
summary |
Dictionary containing a name and prompts of LLMs's available for summarization.
TYPE:
|
Source code in docs/microservices/summary/src/models/llms.py
class LLMPromptMaps(BaseModel):
"""Defines complete LLM prompt config.
Attributes:
model_config (ConfigDict): Used to ignore other services, which are defined in the config.
summary (dict[str, LLMPromptConfig]): Dictionary containing a name and prompts of LLMs's available for
summarization.
"""
model_config = ConfigDict(extra="ignore")
summary: dict[str, LLMPromptConfig]
def __iter__(self) -> Iterator[str]:
"""Get 'keys' for automatic merge with i.e. LLMConfig.
Returns:
Iterator[str]: Keys
"""
return iter(self.__dict__.keys())
LLMPrompts
Bases: BaseModel
Defines the prompts.
| ATTRIBUTE | DESCRIPTION |
|---|---|
model_config |
Used to ignore other services, which are defined in the config.
TYPE:
|
map |
Prompt parameters of MAP prompt, which is used to generate a summary for a text-chunk.
TYPE:
|
reduce |
Prompt parameters of REDUCE prompt, which is used to reduce the size of the intermediate summaries.
TYPE:
|
final |
Prompt parameters of FINAL prompt, which is used to consolidate the intermediate summaries into a final summary.
TYPE:
|
prepare_focus_topics |
Prompt used to extract the topics provided by users and transform them in to the proper format (string of topics separated by comma).
TYPE:
|
Source code in docs/microservices/summary/src/models/llms.py
class LLMPrompts(BaseModel):
"""Defines the prompts.
Attributes:
model_config (ConfigDict): Used to ignore other services, which are defined in the config.
map (LLMPromptComponents): Prompt parameters of MAP prompt, which is used to generate a summary for a
text-chunk.
reduce (LLMPromptComponents): Prompt parameters of REDUCE prompt, which is used to reduce the size of the
intermediate summaries.
final (LLMPromptComponents): Prompt parameters of FINAL prompt, which is used to consolidate the intermediate
summaries into a final summary.
prepare_focus_topics (str): Prompt used to extract the topics provided by users and transform them in to the
proper format (string of topics separated by comma).
"""
model_config = ConfigDict(extra="ignore")
map: LLMPromptComponents
reduce: LLMPromptComponents
final: LLMPromptComponents
prepare_focus_topics: str
parser_output
Defines the output of the parsing microservice.
| CLASS | DESCRIPTION |
|---|---|
Chunk |
Chunk of the parsed text incl. text and metadata. |
ChunkMetadata |
Defines the metadata of each chunk. |
ParsingOutput |
Defines a parsing output model used to hold parsed text and input for chunking. |
Chunk
Bases: BaseModel
Chunk of the parsed text incl. text and metadata.
| ATTRIBUTE | DESCRIPTION |
|---|---|
chunk_content |
Text content of this chunk.
TYPE:
|
chunk_metadata |
Metadata of this chunk (as definied by ChunkMetadata).
TYPE:
|
Source code in docs/microservices/summary/src/models/parser_output.py
ChunkMetadata
Bases: BaseModel
Defines the metadata of each chunk.
| ATTRIBUTE | DESCRIPTION |
|---|---|
chunk_number |
Id of the chunk.
TYPE:
|
chunk_length |
Length of the chunks content as number of chars.
TYPE:
|
filename |
The name of the original file or in case of text input "Texteingabe" or "Default Filename".
TYPE:
|
filetype |
The type of the original file or in case of text input "string".
TYPE:
|
headings |
List of headings. Empty if there is no heading to this chunk.
TYPE:
|
pages |
List of pages within the original pdf document. None in case of chunks from text, txt or docx.
TYPE:
|
locations |
Covering boundingbox and charspan of docling metadata for pdf files only. None in case of chunks from text, txt or docx.
TYPE:
|
Source code in docs/microservices/summary/src/models/parser_output.py
class ChunkMetadata(BaseModel):
"""Defines the metadata of each chunk.
Attributes:
chunk_number (int): Id of the chunk.
chunk_length (int): Length of the chunks content as number of chars.
filename (str): The name of the original file or in case of text input "Texteingabe" or "Default Filename".
filetype (str): The type of the original file or in case of text input "string".
headings (list[str] | None): List of headings. Empty if there is no heading to this chunk.
pages (list[int] | None): List of pages within the original pdf document.
None in case of chunks from text, txt or docx.
locations (dict[str, Any] | None): Covering boundingbox and charspan of docling metadata for pdf files only.
None in case of chunks from text, txt or docx.
"""
chunk_number: int
chunk_length: int
filename: str
filetype: str
headings: list[str] | None
pages: list[int] | None
locations: list[dict[str, Any]] | None
ParsingOutput
Bases: BaseModel
Defines a parsing output model used to hold parsed text and input for chunking.
| ATTRIBUTE | DESCRIPTION |
|---|---|
text |
Cleaned and parsed text.
TYPE:
|
chunks |
Chunks
TYPE:
|
warning_msg |
Message to the user containing information about the resulting text.
TYPE:
|
model_config |
Used to ignore depricated parts of the ParsingOutput, which the summary does not use.
TYPE:
|
Source code in docs/microservices/summary/src/models/parser_output.py
class ParsingOutput(BaseModel):
"""Defines a parsing output model used to hold parsed text and input for chunking.
Attributes:
text (str): Cleaned and parsed text.
chunks (list[Chunk]): Chunks
warning_msg (str, optional): Message to the user containing information about the resulting text.
model_config (ConfigDict): Used to ignore depricated parts of the ParsingOutput, which the summary does not use.
"""
model_config = ConfigDict(extra="ignore")
text: str
chunks: list[Chunk] | None = None
warning_msg: str = ""
summary_parameters
Pydantic Models for parameters neccessary for the summarization.
| CLASS | DESCRIPTION |
|---|---|
PromptParameters |
Prompt parameters for the MAP, REDUCE and FINAL prompt according to the topics stated by the user. |
SummarizeInput |
Defines input that is used for summarizing files. |
SummaryParameters |
Parameters needed for summarization, which are computed according the users input. |
PromptParameters
Bases: BaseModel
Prompt parameters for the MAP, REDUCE and FINAL prompt according to the topics stated by the user.
These prompts cover instructions to focus and structure the summary on specific topics and to state if any topic is not covered by the input text.
These prompt parts are set by configure_prompt_parameters() in the summary registry. If there are no topics to focus on, the prompt parts will be set to empty strings by default.
| ATTRIBUTE | DESCRIPTION |
|---|---|
focus_instructions_map |
Part of MAP-Prompt setting a focus on specific topics.
TYPE:
|
focus_instructions_reduce |
Part of REDUCE-Prompt structuring the summary with respect to specific topics.
TYPE:
|
focus_instructions_final |
Part of FINAL-Prompt checking if topics are not covered by the summary.
TYPE:
|
Source code in docs/microservices/summary/src/models/summary_parameters.py
class PromptParameters(BaseModel):
"""Prompt parameters for the MAP, REDUCE and FINAL prompt according to the topics stated by the user.
These prompts cover instructions to focus and structure the summary on specific topics and to state if any topic is
not covered by the input text.
These prompt parts are set by configure_prompt_parameters() in the summary registry.
If there are no topics to focus on, the prompt parts will be set to empty strings by default.
Attributes:
focus_instructions_map (str): Part of MAP-Prompt setting a focus on specific topics.
focus_instructions_reduce (str): Part of REDUCE-Prompt structuring the summary with respect to specific topics.
focus_instructions_final (str): Part of FINAL-Prompt checking if topics are not covered by the summary.
"""
focus_instructions_map: str = ""
focus_instructions_reduce: str = ""
focus_instructions_final: str = ""
SummarizeInput
Bases: BaseModel
Defines input that is used for summarizing files.
| ATTRIBUTE | DESCRIPTION |
|---|---|
language_model |
Name of the language model.
TYPE:
|
chunks |
List of chunks with their content and metadata.
TYPE:
|
summary_parameters |
Parameters needed for summarization (e.g. settings for length and focus topics), which are computed according the users input.
TYPE:
|
Source code in docs/microservices/summary/src/models/summary_parameters.py
class SummarizeInput(BaseModel):
"""Defines input that is used for summarizing files.
Attributes:
language_model (str): Name of the language model.
chunks (list[Document]): List of chunks with their content and metadata.
summary_parameters (SummaryParameters): Parameters needed for summarization (e.g. settings for length and focus
topics), which are computed according the users input.
"""
language_model: str
chunks: list[Document]
summary_parameters: SummaryParameters
SummaryParameters
Bases: BaseModel
Parameters needed for summarization, which are computed according the users input.
| ATTRIBUTE | DESCRIPTION |
|---|---|
messages |
List of messages to the user, which regard the summary output.
TYPE:
|
desired_summary_chars |
Desired number of characters for the final summary output.
TYPE:
|
max_input_chars |
Maximal number of input characters for the current summary set up.
TYPE:
|
prompt_parameters |
Additional instructions for MAP, REDUCE and FINAL prompt according to the topics set by the user.
TYPE:
|
Source code in docs/microservices/summary/src/models/summary_parameters.py
class SummaryParameters(BaseModel):
"""Parameters needed for summarization, which are computed according the users input.
Attributes:
messages (list[str]): List of messages to the user, which regard the summary output.
desired_summary_chars (NonNegativeInt) : Desired number of characters for the final summary output.
max_input_chars (NonNegativeInt): Maximal number of input characters for the current summary set up.
prompt_parameters (PromptParameters): Additional instructions for MAP, REDUCE and FINAL prompt according to the
topics set by the user.
"""
messages: list[str]
# Output-Length Feature
desired_summary_chars: NonNegativeInt
max_input_chars: NonNegativeInt
# Focus-Topic Feature
prompt_parameters: PromptParameters = PromptParameters()
settings
Loads all settings from a central place, not hidden in utils.
summarizing
Implementation of the core logic of the summary.
| MODULE | DESCRIPTION |
|---|---|
summary |
Summary class. |
summary_registry |
Summary Registry contains all summary pipelines. |
summary
Summary class.
This class contains all important attributes and methods in order to generate a summary from chunks. The input chunks need to be langchain_core.documents.
The code is based on a guide from langchain (last access 2025-04-02): https://python.langchain.com/docs/versions/migrating_chains/map_reduce_chain/
| CLASS | DESCRIPTION |
|---|---|
Summary |
Manages the summarization of documents with a LangGraph workflow. |
Summary
Manages the summarization of documents with a LangGraph workflow.
It summarizes chunks, recursively reduces these summaries if necessary (depending on max_input_chars), generates a final consolidated summary.
| ATTRIBUTE | DESCRIPTION |
|---|---|
graph |
The compiled LangGraph used to perform the map and reduce summarize approach.
TYPE:
|
llm |
LLM that should be used to generate the summary.
TYPE:
|
auth_client |
Authentication to connect with LLM API.
TYPE:
|
model_provider |
LLM with configured parameters.
TYPE:
|
map_chain |
Chain using the map prompt.
TYPE:
|
reduce_chain |
Chain using the reduce prompt.
TYPE:
|
final_chain |
Chain using the final prompt.
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
calculate_max_input_chars |
Calculating the maximal number of characters of text input which should be summarized in one LLM call. |
configure_prompt_parameters |
Set parameters for the MAP, REDUCE and FINAL prompt according to users input. |
get_chunk_size |
Computes chuncksize minimum and maximum according to the specifications of the LLM and the input text length. |
get_info |
Returns pipeline information (label, name, placeholder, is_remote). |
load_basic_auth |
Load env-variable and check if it is missing. Split into username and password. |
load_secret |
Load env-variable and check if it is missing. |
process_desired_summary_length |
Processes the user input for the desired summary length. |
process_topic_input |
Processes the focus topics provided by the user by extracting the topics in the proper format. |
summarize |
Generates a summary for a list of chunks. |
Source code in docs/microservices/summary/src/summarizing/summary.py
class Summary:
"""Manages the summarization of documents with a LangGraph workflow.
It summarizes chunks, recursively reduces these summaries if necessary (depending on max_input_chars),
generates a final consolidated summary.
Attributes:
graph (StateGraph): The compiled LangGraph used to perform the map and reduce summarize approach.
llm (LLM): LLM that should be used to generate the summary.
auth_client (CustomAuthClient): Authentication to connect with LLM API.
model_provider (langchain_openai.llms.base.ChatOpenAI): LLM with configured parameters.
map_chain (langchain_classic.chains.base.Chain): Chain using the map prompt.
reduce_chain (langchain_classic.chains.base.Chain): Chain using the reduce prompt.
final_chain (langchain_classic.chains.base.Chain): Chain using the final prompt.
"""
def __init__(self, llm: LLM, llm_name: str) -> None:
"""Initializes the summarization pipeline.
The initialization performs the set up of the class attributes: LLM name, LLM, auth client, model provider,
the chains and the graph is constructed.
"""
self.llm_name: str = llm_name
self.llm: LLM = llm
self._setup_auth_client()
self._setup_model_provider()
self._setup_chains()
self._construct_graph()
async def summarize(self, summarize_input: SummarizeInput) -> dict[str, Any]:
"""Generates a summary for a list of chunks.
The content of each chunk is summarized by the map chain.
These summaries are then collected. If the aggregated length of these summaries
exceeds the maximum (max_input_chars), the summaries are summarized again using the reduce chain.
This process is repeated until the condition is satisfied.
Then the final summary is generated with the reduce chain.
Args:
summarize_input (SummarizeInput): Containing chunks incl. metadata, messages, prompt parameters and LLM.
Returns:
Contains the summary as str and messages to the user as list.
"""
logger.info(f"Start summarizing {len(summarize_input.chunks)} chunks.")
steps = []
async for step in self.graph.astream(
# initialize OverallState
{
"contents": [doc.page_content for doc in summarize_input.chunks],
"focus_instructions_map": summarize_input.summary_parameters.prompt_parameters.focus_instructions_map,
"focus_instructions_reduce": (
summarize_input.summary_parameters.prompt_parameters.focus_instructions_reduce
),
"focus_instructions_final": (
summarize_input.summary_parameters.prompt_parameters.focus_instructions_final
),
"desired_summary_chars": summarize_input.summary_parameters.desired_summary_chars,
"max_input_chars": summarize_input.summary_parameters.max_input_chars,
"messages": summarize_input.summary_parameters.messages,
"num_reduce_call": 0,
"quit_reducing": False,
},
{"recursion_limit": 40},
):
steps.append(step)
logger.debug("Finished summarizing.")
summary = steps[-1].get("generate_final_summary").get("final_summary")
messages = steps[-1].get("generate_final_summary").get("messages")
return {"summary": summary, "messages": messages}
def get_info(self) -> dict[str, Any]:
"""Returns pipeline information (label, name, placeholder, is_remote).
Returns:
Pipeline information (label, name, placeholder, is_remote).
"""
return {
"label": self.llm.label,
"name": self.llm_name,
"is_remote": self.llm.is_remote,
}
def calculate_max_input_chars(
self, desired_summary_chars: int, remaining_context_length: int
) -> int:
"""Calculating the maximal number of characters of text input which should be summarized in one LLM call.
The calculation is based on the specifications of the currently used LLM, which are the context length and
the maximum length for generated output. These values are stated in the llms.yml.
The context length needs to encompass the length of the system-prompt, the length of the text input and
the length of the summary as output. Therefore the desired summary length set by the user is factored into
the calculation to adapt the output length. Furthermore the context window should not be maxed out in order to
ensure high quality summaries (therefore we only use 70% the theoretically remaining max input length).
The desired output length (incl. additional 20% buffer) determines the max input length, if set.
If the user does not set a desired length, the remaining characters are split by 70:30 for input length
and output length.
max_input_chars is used to determine the number of recursions of the reduce part of the summarization process.
Args:
desired_summary_chars (int): Desired summary length as number of chars.
remaining_context_length (int): remaining context length after substracting all prompt lengths and a 30%
buffer during max_input_chars estimation.
Returns:
Maximal number of input characters for the current summary set up.
"""
if desired_summary_chars > 0:
output_buffer = 1.2 # 20% extra as buffer for summary output length
max_llm_output_chars = (
self.llm.inference.max_tokens * 4
) # 1 token ~ 4 chars
output_chars = min(
int(desired_summary_chars * output_buffer),
max_llm_output_chars,
)
max_input_chars = remaining_context_length - output_chars
logger.debug(
f"The max_input_chars are set to {max_input_chars} according to:"
f"remaining_context_length {remaining_context_length} - output_chars {output_chars}."
f"with remaining_context_length = 80% of (LLMs maximal number of input chars - the longest"
f" prompt length (map/reduce/final prompt)) and with output_chars = min( desired summary length"
f" in chars * 1.2 (buffer), maximal number of output chars of the LLM."
)
else:
max_input_chars = int(remaining_context_length * 0.7)
logger.debug(
f"The max_input_chars are set to {max_input_chars} = 70% of remaining_context_length "
f"{remaining_context_length} with remaining_context_length = 80% of (LLMs maximal number of "
f"input chars - the longest prompt length (map/reduce/final prompt))."
)
return max_input_chars
def process_desired_summary_length(
self, desired_summary_length: dict[str, Any]
) -> dict[str, Any]:
"""Processes the user input for the desired summary length.
Checks if the value is valid. If not, set to default value to ignore user input.
Calculates the summary length in characters (desired_summary_chars) using an estimate of 4000 chars per page.
Args:
desired_summary_length (dict): Containing the information needed to compute the desired summary
length from the two keys output_length (desired length of the summary output
as number of pages) and input_length length of the summary input text
(parsing output length as number of characters).
Returns:
Dictionary with following keys:
- desired_summary_chars (int) : Desired number of characters for the final summary output.
- messages (list[str]) : List of messages to the user regarding the summary.
"""
messages = []
# get number of characters for summary length parameters
minimal_length = 500 # prevent hallucinations
half_text_length = int(
0.5 * desired_summary_length["input_length"]
) # 4000 chars ~ 1 DIN A4 page
max_length = 60000 # 6000 chars ~ 15 DIN A4 pages
if desired_summary_length["output_length"] <= 0:
logger.warning("Using default summary length.")
desired_summary_chars = 0
messages.append(
"Die Zusammenfassungslänge entspricht der Standardeinstellung."
)
else:
desired_summary_chars = int(desired_summary_length["output_length"] * 4000)
desired_summary_chars = max(minimal_length, desired_summary_chars)
desired_summary_chars = min(
half_text_length, desired_summary_chars, max_length
)
if desired_summary_chars == minimal_length:
messages.append(
"Die erstellte Zusammenfassung weicht von der Ziellänge ab. "
"Mehr Informationen hierzu finden Sie in den FAQ."
)
logger.debug(
f"The desired summary length is set to the minimum of {minimal_length} chars."
)
elif desired_summary_chars == half_text_length:
messages.append(
"Die erstellte Zusammenfassung weicht von der Ziellänge ab. "
"Mehr Informationen hierzu finden Sie in den FAQ."
)
logger.debug(
f"The desired summary length is set to the maximum of {half_text_length} chars, "
"which is half the input text length."
)
logger.info(
f"The desired summary length is set to {desired_summary_chars} chars based on "
f"desired {desired_summary_length['output_length']} pages (and "
f"half input text length = {half_text_length} chars)."
)
return {
"desired_summary_chars": desired_summary_chars,
"messages": messages,
}
def get_chunk_size(self, text_length: int, max_input_chars: int) -> dict[str, int]:
"""Computes chuncksize minimum and maximum according to the specifications of the LLM and the input text length.
Ensures that max_chunk_size does not exceed 20% of text length or 25% of max input length of the LLM (
max_input_chars as number of characters). These 25% could be modified.
Ensure minimal chunk size of 500 characters to avoid hallucinations and ensure proper range of chunk sizes by
limiting min chunk size to 45% of max chunk size. The chunker demands the following ratio:
min_chunk_size * 2 <= max_chunk_size.
Args:
text_length (int): Length of the parsed text.
max_input_chars (int): Maximal number of input chars the LLM can process in one call.
Returns:
Dictionary containing the minimum and maximum chunk size as number of characters.
"""
min_number_of_chunks = 5
max_chunk_size = min(
int(0.25 * max_input_chars),
max(int(text_length / min_number_of_chunks), 500),
)
logger.debug(
f"The maximal chunk size is set to {max_chunk_size} based on: "
f"min(int(0.25 * max_input_chars), max(int(text_length/{min_number_of_chunks}), 500)) = min("
f"{int(0.25 * max_input_chars)}, max({int(text_length / min_number_of_chunks)}, 500))."
)
min_chunk_size = max(int(0.5 * max_chunk_size), 500)
logger.debug(
f"The minimal chunk size is set to {min_chunk_size} based on: max(500, 0.45 * maximal chunk size) "
f"(={int(0.45 * max_chunk_size)}))."
)
return {"min_chunk_size": min_chunk_size, "max_chunk_size": max_chunk_size}
def configure_prompt_parameters(self, topics: str | None) -> PromptParameters:
"""Set parameters for the MAP, REDUCE and FINAL prompt according to users input.
This covers instructions to focus on specific topics and stating if any topic is not covered by the input text.
Args:
topics (str | None): A comma-separated string listing the topics the summary should focus on.
Returns:
PromptParameters: Parts of MAP, REDUCE and FINAL Prompts containing the instructions to focus the summary on
topics, if specified by the user.
"""
if topics:
prompt_parameters = PromptParameters(
focus_instructions_map=self.llm.prompt_config.system.map.focus_instructions.format(
topics=topics
),
focus_instructions_reduce=self.llm.prompt_config.system.reduce.focus_instructions,
focus_instructions_final=self.llm.prompt_config.system.final.focus_instructions.format(
topics=topics
),
)
logger.debug(
f"MAP prompt part for topic focus instructions {prompt_parameters.focus_instructions_map=}.\n"
f"REDUCE prompt part for topic focus instructions {prompt_parameters.focus_instructions_reduce}.\n"
f"FINAL prompt part for topic focus instructions {prompt_parameters.focus_instructions_final}."
)
else:
prompt_parameters = PromptParameters()
logger.info(
"The input field for focus topics is empty, so a general summary will be generated."
)
return prompt_parameters
async def process_topic_input(self, topics: str) -> str:
"""Processes the focus topics provided by the user by extracting the topics in the proper format.
This converts user input that does not meet the required format, such as short sentences, into a comma-separated
string listing the topics the summary should focus on.
Args:
topics (str): User input text stating the topics the summary should focus on.
Returns:
str: A comma-separated string listing the topics the summary should focus on.
"""
if topics.strip() != "":
prep_topics_prompt_input = {"topics": "topics"}
prep_topics_prompt = ChatPromptTemplate.from_template(
template=self.llm.prompt_config.system.prepare_focus_topics,
template_format="f-string",
partial_variables=prep_topics_prompt_input,
)
logger.debug(f"Prompt for topic preparation: '{prep_topics_prompt}'.")
self.prep_topics_chain = (
prep_topics_prompt | self.model_provider | StrOutputParser()
)
topics = await self.prep_topics_chain.ainvoke({"topics": topics})
logger.info(f"Topics after preparation: '{topics}'.")
if topics.strip() == "":
topics = None
return topics
def _setup_auth_client(self) -> None:
"""Set up authentication client for various APIs.
Sets up an authentication client using either a token, credentials or no authentication method.
Note: for Ollama usage no authentication method is needed.
"""
if self.llm.api.auth:
secret = self.load_secret(self.llm.api.auth.secret_path)
auth_client = CustomAuthClient(
secret=secret, auth_type=self.llm.api.auth.type
)
else:
auth_client = CustomAuthClient()
self.auth_client = auth_client
def _setup_model_provider(self) -> None:
"""Set up LLM provider using OpenAI API.
Initializing LLM with values form llms.yml
"""
self.model_provider = ChatOpenAI(
model_name=self.llm.model,
http_async_client=self.auth_client,
api_key=" ",
base_url=str(self.llm.api.url),
timeout=self.llm.inference.timeout,
max_tokens=self.llm.inference.max_tokens,
max_retries=self.llm.inference.max_retries,
temperature=self.llm.inference.temperature,
top_p=self.llm.inference.top_p,
frequency_penalty=self.llm.inference.frequency_penalty,
presence_penalty=self.llm.inference.presence_penalty,
streaming=False,
)
def _construct_graph(self) -> None:
"""Creates the LangGraph for the summary. Using a Map and Reduce Approach."""
graph = StateGraph(OverallState)
# nodes / components
graph.add_node("generate_summary", self._generate_summary)
graph.add_node("collect_summaries", self._collect_summaries)
graph.add_node("collapse_summaries", self._collapse_summaries)
graph.add_node("generate_final_summary", self._generate_final_summary)
# edges / connections
graph.add_conditional_edges(START, self._map_summaries, ["generate_summary"])
graph.add_edge("generate_summary", "collect_summaries")
graph.add_conditional_edges("collect_summaries", self._should_collapse)
graph.add_conditional_edges("collapse_summaries", self._should_collapse)
graph.add_edge("generate_final_summary", END)
self.graph = graph.compile()
def _setup_chains(self) -> None:
"""Set up chains using map, reduce and final prompts and chains."""
map_prompt_input = {
"content": "content",
"focus_instructions": "focus_instructions",
}
map_prompt = ChatPromptTemplate.from_template(
template=self.llm.prompt_config.system.map.main,
template_format="f-string",
partial_variables=map_prompt_input,
)
logger.debug(f"MAP Prompt: '{map_prompt}'.")
self.map_chain = map_prompt | self.model_provider | StrOutputParser()
reduce_prompt_input = {
"summaries": "summaries",
"focus_instructions": "focus_instructions",
}
reduce_prompt = ChatPromptTemplate.from_template(
template=self.llm.prompt_config.system.reduce.main,
template_format="f-string",
partial_variables=reduce_prompt_input,
)
logger.debug(f"REDUCE Prompt: '{reduce_prompt}'.")
self.reduce_chain = reduce_prompt | self.model_provider | StrOutputParser()
final_prompt_input = {
"summaries": "summaries",
"focus_instructions": "focus_instructions",
}
final_prompt = ChatPromptTemplate.from_template(
template=self.llm.prompt_config.system.final.main,
template_format="f-string",
partial_variables=final_prompt_input,
)
logger.debug(f"FINAL Prompt: '{final_prompt}'.")
self.final_chain = final_prompt | self.model_provider | StrOutputParser()
def _map_summaries(self, state: OverallState) -> list:
"""Prepares the parallel summarization of each chunks contents.
Args:
state (OverallState): Contains the state of the summary process.
Returns:
List of processing steps as 'Send' objects. Each 'Send' object consists of the name of a node in the graph
as well as the state to send to that node.
"""
return [
Send(
"generate_summary",
{
"content": content,
"focus_instructions_map": state["focus_instructions_map"],
},
)
for content in state["contents"]
]
async def _generate_summary(self, summary_state: SummaryState) -> dict[str, str]:
"""Generates a summary of a text section.
Args:
summary_state (SummaryState): State of the node which contains the content which we want to summarize
and prompt parts as additional instructions according to the user input
(e.g. setting a focus on specified topics).
Returns:
A dictionary with key "summaries", which contains a list of summaries resulting from the map prompt.
"""
response = await self.map_chain.ainvoke(
{
"content": summary_state["content"],
"focus_instructions": summary_state["focus_instructions_map"],
}
)
logger.debug("LLM call with map prompt.")
if response is None:
response = self._retry_ainvoke(
chain="map",
chain_input={
"content": summary_state["content"],
"focus_instructions_map": summary_state["focus_instructions_map"],
},
)
return {"summaries": [response]}
def _collect_summaries(self, state: OverallState) -> dict[str, Any]:
"""Collect summaries from mapping step and store them in OverallState for collapse/reduce step.
Args:
state (OverallState): Contains the state of the summary process with summaries.
Returns:
Update of "collapsed summaries" as part of OverallState of the summary graph.
"collapsed summaries" contains the list of summaries as Documents.
"""
new_state = {
"collapsed_summaries": [Document(summary) for summary in state["summaries"]]
}
return new_state
def _should_collapse(
self, state: OverallState
) -> Literal["collapse_summaries", "generate_final_summary"]:
"""Checks if the collected summaries are short enough to generate a final summary.
If their total length exceeds the maximal length (max_input_chars) or if the desired summary length (with a
tolerance of 25%) is exceeded the summaries need to be collapsed first to reduce their size.
If previous reduction failed no further reduce loops are started and the graph will continue.
Args:
state : Contains the state of the summary process with summaries.
Returns:
Name of the next node.
"""
logger.debug(
f"The current number of reduce calls is '{state['num_reduce_call']}'."
)
length = self._length_function(documents=state["collapsed_summaries"])
logger.debug(
f"The intermediate summaries are currently {length} chars long, "
f"with a desired summary length of {state['desired_summary_chars']} chars "
)
reduce_condition = False
if (not state["quit_reducing"]) and state["desired_summary_chars"] != 0:
tolerance = 1.25
exceeded_desired_length = length > (
tolerance * state["desired_summary_chars"]
)
reduce_condition = (
length > state["max_input_chars"]
) or exceeded_desired_length
logger.debug(
"Does the current summary exceeded desired summary length with a tolerance of "
f"{tolerance}? {exceeded_desired_length}"
)
else:
reduce_condition = length > state["max_input_chars"]
if (not state["quit_reducing"]) and reduce_condition:
logger.debug(
f"Start reduce-loop, because current {len(state['collapsed_summaries'])} summaries "
f"are {length} characters long. And need to be < desired summary length "
f"{state['desired_summary_chars']} and < max_input_chars {state['max_input_chars']}. "
f"These conditions currently are {reduce_condition}."
)
return "collapse_summaries"
else:
if state["quit_reducing"]:
logger.warning(
"Reduce-loop could not reduce summary length. Moving on to generation of final summary."
)
logger.info(
f"Input for final summary generation: {len(state['collapsed_summaries'])} intermediate summaries with "
f"a length of {length} characters"
)
return "generate_final_summary"
async def _collapse_summaries(self, state: OverallState) -> dict[str, Any]:
"""Reduces a long list of summaries by further summarizing them using the reduce chain.
Afterwards the list of summaries does not exceed max_input_chars anymore.
Args:
state (OverallState): Contains the state of the summary process.
Returns:
Update of the of OverallState of the summary graph for the following attributes:
- "collapsed_summaries" containing a list of summaries as Documents,
- "num_reduce_call" tracking the number of calls using the reduce prompt,
- "quit_reducing" tracking wether this reduce loop was successful,
- "message" a message containing relevant information for the user.
"""
doc_lists = split_list_of_docs(
state["collapsed_summaries"],
self._length_function,
state["max_input_chars"],
)
collapse_results = []
num_reduce_calls = state["num_reduce_call"]
for doc_list in doc_lists:
collapse_results.append(
await self._acollapse_docs(
docs=doc_list,
focus_instructions_reduce=state["focus_instructions_reduce"],
)
)
num_reduce_calls += 1
quit_reducing = not self._check_reduce_success(
state=state, collapse_results=collapse_results
)
new_state = {
"collapsed_summaries": collapse_results,
"num_reduce_call": num_reduce_calls,
"quit_reducing": quit_reducing,
"messages": state["messages"],
}
return new_state
async def _acollapse_docs(
self, docs: list[Document], focus_instructions_reduce: str
) -> Document:
"""Execute a collapse function on a set of documents and merge their metadata.
This function is a adapted version of acollapse_docs from langchain_classic.chains.combine_documents.reduce
to avoid entering lists into the reduce prompt.
All metadata values are strings, and in case of overlapping keys across the input documents the values
get joined by ", ".
Args:
docs (list[Document]): A list of Documents to combine.
focus_instructions_reduce (str) : REDUCE-Prompt part structuring the summary according to topics.
Returns:
A single Document with the output of reduce chain for the page content
and the combined metadata's of all the input documents.
"""
doc_contents = ""
for doc in docs:
doc_contents = doc_contents + doc.page_content + "\n\n"
combined_content = await self.reduce_chain.ainvoke(
{
"summaries": doc_contents,
"focus_instructions": focus_instructions_reduce,
}
)
logger.debug(
f"LLM call with reduce prompt: reducing {len(docs)} chunks into one chunk"
)
if combined_content is None:
combined_content = self._retry_ainvoke(
chain="reduce",
chain_input={
"summaries": doc_contents,
"focus_instructions_reduce": focus_instructions_reduce,
},
)
combined_metadata = {k: str(v) for k, v in docs[0].metadata.items()}
for doc in docs[1:]:
for k, v in doc.metadata.items():
if k in combined_metadata:
combined_metadata[k] += f", {v}"
else:
combined_metadata[k] = str(v)
combined_doc = Document(
page_content=combined_content, metadata=combined_metadata
)
return combined_doc
def _length_function(self, documents: list[Document]) -> int:
"""Computes the cumulative length for a list of documents.
To this end each documents content (excluding metadata) is measured and summed up.
Args:
documents (List[Document]): Each document consists of content and metadata.
Returns:
Total length of all documents contents.
"""
return sum(len(doc.page_content) for doc in documents)
def _check_reduce_success(
self, state: OverallState, collapse_results: list[Document]
) -> bool:
"""Checks if _collapse_summaries was successful and returns a bool used for error handling.
Args:
state (OverallState): Contains the state of the summary process.
collapse_results (list[Document]): List of summaries as documents after reduce step.
Returns:
True if reduce chain did successfuly reduce the length of the summaries.
False if the reduce chain failed and needs to be quit to prevent infinite loops.
"""
length_collapsed_summaries = self._length_function(documents=collapse_results)
length_summaries_before = self._length_function(
documents=state["collapsed_summaries"]
)
if length_collapsed_summaries >= length_summaries_before:
quit_reducing = True
else:
quit_reducing = state["quit_reducing"]
logger.debug(
f"successfuly reduced length of intermediate summaries (length after collapse {length_collapsed_summaries}"
f" = length before collapse {length_summaries_before}? -> "
f"{length_collapsed_summaries == length_summaries_before}"
)
return not quit_reducing
async def _generate_final_summary(self, state: OverallState) -> dict[str, Any]:
"""Generate the final summary from the collapsed summaries.
Args:
state (OverallState): Contains the state of the summary process with reduced summaries.
Returns:
New state as dictionary containing the final summary and a messages to the user.
"""
messages = state["messages"]
response = None
if (
len(state["collapsed_summaries"]) == 1
and state["focus_instructions_final"] == ""
):
# if there is just one Summary after REDUCE step use it as final summary
logger.debug(
"Skipping final prompt because we already have only 1 summary, which has the desired length."
)
response = state["collapsed_summaries"][0].page_content
else:
# gather input for final summary
collapsed_summaries = ""
for summary in state["collapsed_summaries"]:
collapsed_summaries = (
collapsed_summaries + summary.page_content + "\n\n"
)
if (
len("".join(collapsed_summaries.split())) < 1
and state["focus_instructions"] != ""
):
# handle empty summary in case of topic mismatch
logger.warning(
"There is no infomation in the text matching the topics specified by the user."
"Therfore there is no summary to reply with."
)
response = (
"Der Text enthält keine Informationen zu den gewünschten Themen."
)
messages.append(
"Entferne die eingegebenen Themen und wiederhole deine Anfrage, "
"um eine allgemeine Zusammenfassung des Textes zu erhalten."
)
else:
# generate final summary which considers the topics set by the user
logger.debug(
f"Generate final summary from the {len(state['collapsed_summaries'])} intermediate summaries."
)
response = await self.final_chain.ainvoke(
{
"summaries": collapsed_summaries,
"focus_instructions": state["focus_instructions_final"],
}
)
logger.debug("LLM call with final prompt.")
if response is None:
response = self._retry_ainvoke(
chain="final",
chain_input={
"summaries": collapsed_summaries,
"focus_instructions_final": state[
"focus_instructions_final"
],
},
)
if state["desired_summary_chars"] != 0:
# handle mismatch of the desired summary length and the actual summary length
not_reduced = state["num_reduce_call"] == 0
summary_length = len(response)
tolerance = 0.5
too_short = summary_length < (
state["desired_summary_chars"] * (1 - tolerance)
)
too_long = summary_length > (
state["desired_summary_chars"] * (1 + tolerance)
)
logger.info(
f"Final summary has a length of {summary_length}. Is this {too_short=} ...because chunk summaries "
f"already have been to short? {too_short and not_reduced} or is it {too_long=} ... because the reduce "
f"loop could not reduce any further? {state['quit_reducing']}."
)
if (not_reduced and too_short) or (state["quit_reducing"] and too_long):
messages.append(
"Die erstellte Zusammenfassung weicht von der Ziellänge ab. "
"Mehr Informationen hierzu finden Sie in den FAQ."
)
new_state = {"final_summary": response, "messages": messages}
return new_state
async def _retry_ainvoke(self, chain: str, chain_input: dict) -> str:
"""This function performs a retry mechanism for LLM calls.
It can be used with map chain, reduce chain or final chain.
Args:
chain (str): Selecting the chain that should be run (map, reduce or final).
chain_input (dict): Input for map/reduce/final chain call.
Returns:
Resulting summary from map/reduce/final chain call.
Raises:
HTTPException: HTTP_502_BAD_GATEWAY raised if the selected language model did not respond after the maximal
number of retries.
"""
response = None
max_retries = self.llm.inference.max_retries
for attempt_counter in range(max_retries):
try:
if chain == "map":
response = await self.map_chain.ainvoke(chain_input)
elif chain == "reduce":
response = await self.reduce_chain.ainvoke(chain_input)
elif chain == "final":
response = await self.final_chain.ainvoke(chain_input)
else:
logger.error(
"Retry function did not get a valid chain. "
f"Got '{chain}' instead of either 'map', 'reduce' or 'final'."
)
if response is not None:
break
except Exception as e:
if attempt_counter < max_retries - 1:
logger.warning(
f"LLM API did not respond. Retrying ({attempt_counter + 1} / {max_retries})."
)
await asyncio.sleep(1)
else:
logger.critical(
f"LLM API did not respond after maximal number of retries. Giving up. Error: '{str(e)}'."
)
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=(
"Aus technischen Gründen kann vorübergehend keine Zusammenfassung erstellt werden. "
"Bitte versuchen Sie es später erneut. "
),
)
return response
@staticmethod
def load_secret(file_path: str | Path) -> str:
"""Load env-variable and check if it is missing.
Args:
file_path (str | Path): Path of token file to be loaded.
Returns:
Loaded token
Raises:
Exception: Raised in case of an FileNotFoundError because the token can not be loaded.
"""
try:
with open(file_path) as f:
token = f.read().splitlines()[0]
return token
except FileNotFoundError:
logger.critical(f"Could not find Token - Check your folder: '{file_path}'")
raise Exception
@staticmethod
def load_basic_auth(file_path: str | Path) -> HTTPBasicAuth:
"""Load env-variable and check if it is missing. Split into username and password.
Args:
file_path (str | Path): Path of token file to be loaded.
Returns:
Loaded username and password.
Raises:
ValueError: Raised in case the credentials can not be loaded.
"""
credentials = Summary.load_secret(file_path)
try:
username, password = credentials.split(":")
except ValueError as e:
logger.debug(
"Credentials could not be loaded. Please check the credentials."
"Hint: credentials should be in the format 'username:password'."
f" Error: {str(e)}"
)
raise ValueError(
"Unable to establish connection: Invalid credentials format."
)
return HTTPBasicAuth(username, password)
calculate_max_input_chars
Calculating the maximal number of characters of text input which should be summarized in one LLM call.
The calculation is based on the specifications of the currently used LLM, which are the context length and the maximum length for generated output. These values are stated in the llms.yml. The context length needs to encompass the length of the system-prompt, the length of the text input and the length of the summary as output. Therefore the desired summary length set by the user is factored into the calculation to adapt the output length. Furthermore the context window should not be maxed out in order to ensure high quality summaries (therefore we only use 70% the theoretically remaining max input length).
The desired output length (incl. additional 20% buffer) determines the max input length, if set. If the user does not set a desired length, the remaining characters are split by 70:30 for input length and output length.
max_input_chars is used to determine the number of recursions of the reduce part of the summarization process.
| PARAMETER | DESCRIPTION |
|---|---|
desired_summary_chars
|
Desired summary length as number of chars.
TYPE:
|
remaining_context_length
|
remaining context length after substracting all prompt lengths and a 30% buffer during max_input_chars estimation.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
int
|
Maximal number of input characters for the current summary set up. |
Source code in docs/microservices/summary/src/summarizing/summary.py
def calculate_max_input_chars(
self, desired_summary_chars: int, remaining_context_length: int
) -> int:
"""Calculating the maximal number of characters of text input which should be summarized in one LLM call.
The calculation is based on the specifications of the currently used LLM, which are the context length and
the maximum length for generated output. These values are stated in the llms.yml.
The context length needs to encompass the length of the system-prompt, the length of the text input and
the length of the summary as output. Therefore the desired summary length set by the user is factored into
the calculation to adapt the output length. Furthermore the context window should not be maxed out in order to
ensure high quality summaries (therefore we only use 70% the theoretically remaining max input length).
The desired output length (incl. additional 20% buffer) determines the max input length, if set.
If the user does not set a desired length, the remaining characters are split by 70:30 for input length
and output length.
max_input_chars is used to determine the number of recursions of the reduce part of the summarization process.
Args:
desired_summary_chars (int): Desired summary length as number of chars.
remaining_context_length (int): remaining context length after substracting all prompt lengths and a 30%
buffer during max_input_chars estimation.
Returns:
Maximal number of input characters for the current summary set up.
"""
if desired_summary_chars > 0:
output_buffer = 1.2 # 20% extra as buffer for summary output length
max_llm_output_chars = (
self.llm.inference.max_tokens * 4
) # 1 token ~ 4 chars
output_chars = min(
int(desired_summary_chars * output_buffer),
max_llm_output_chars,
)
max_input_chars = remaining_context_length - output_chars
logger.debug(
f"The max_input_chars are set to {max_input_chars} according to:"
f"remaining_context_length {remaining_context_length} - output_chars {output_chars}."
f"with remaining_context_length = 80% of (LLMs maximal number of input chars - the longest"
f" prompt length (map/reduce/final prompt)) and with output_chars = min( desired summary length"
f" in chars * 1.2 (buffer), maximal number of output chars of the LLM."
)
else:
max_input_chars = int(remaining_context_length * 0.7)
logger.debug(
f"The max_input_chars are set to {max_input_chars} = 70% of remaining_context_length "
f"{remaining_context_length} with remaining_context_length = 80% of (LLMs maximal number of "
f"input chars - the longest prompt length (map/reduce/final prompt))."
)
return max_input_chars
configure_prompt_parameters
Set parameters for the MAP, REDUCE and FINAL prompt according to users input.
This covers instructions to focus on specific topics and stating if any topic is not covered by the input text.
| PARAMETER | DESCRIPTION |
|---|---|
topics
|
A comma-separated string listing the topics the summary should focus on.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
PromptParameters
|
Parts of MAP, REDUCE and FINAL Prompts containing the instructions to focus the summary on topics, if specified by the user.
TYPE:
|
Source code in docs/microservices/summary/src/summarizing/summary.py
def configure_prompt_parameters(self, topics: str | None) -> PromptParameters:
"""Set parameters for the MAP, REDUCE and FINAL prompt according to users input.
This covers instructions to focus on specific topics and stating if any topic is not covered by the input text.
Args:
topics (str | None): A comma-separated string listing the topics the summary should focus on.
Returns:
PromptParameters: Parts of MAP, REDUCE and FINAL Prompts containing the instructions to focus the summary on
topics, if specified by the user.
"""
if topics:
prompt_parameters = PromptParameters(
focus_instructions_map=self.llm.prompt_config.system.map.focus_instructions.format(
topics=topics
),
focus_instructions_reduce=self.llm.prompt_config.system.reduce.focus_instructions,
focus_instructions_final=self.llm.prompt_config.system.final.focus_instructions.format(
topics=topics
),
)
logger.debug(
f"MAP prompt part for topic focus instructions {prompt_parameters.focus_instructions_map=}.\n"
f"REDUCE prompt part for topic focus instructions {prompt_parameters.focus_instructions_reduce}.\n"
f"FINAL prompt part for topic focus instructions {prompt_parameters.focus_instructions_final}."
)
else:
prompt_parameters = PromptParameters()
logger.info(
"The input field for focus topics is empty, so a general summary will be generated."
)
return prompt_parameters
get_chunk_size
Computes chuncksize minimum and maximum according to the specifications of the LLM and the input text length.
Ensures that max_chunk_size does not exceed 20% of text length or 25% of max input length of the LLM ( max_input_chars as number of characters). These 25% could be modified. Ensure minimal chunk size of 500 characters to avoid hallucinations and ensure proper range of chunk sizes by limiting min chunk size to 45% of max chunk size. The chunker demands the following ratio: min_chunk_size * 2 <= max_chunk_size.
| PARAMETER | DESCRIPTION |
|---|---|
text_length
|
Length of the parsed text.
TYPE:
|
max_input_chars
|
Maximal number of input chars the LLM can process in one call.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
dict[str, int]
|
Dictionary containing the minimum and maximum chunk size as number of characters. |
Source code in docs/microservices/summary/src/summarizing/summary.py
def get_chunk_size(self, text_length: int, max_input_chars: int) -> dict[str, int]:
"""Computes chuncksize minimum and maximum according to the specifications of the LLM and the input text length.
Ensures that max_chunk_size does not exceed 20% of text length or 25% of max input length of the LLM (
max_input_chars as number of characters). These 25% could be modified.
Ensure minimal chunk size of 500 characters to avoid hallucinations and ensure proper range of chunk sizes by
limiting min chunk size to 45% of max chunk size. The chunker demands the following ratio:
min_chunk_size * 2 <= max_chunk_size.
Args:
text_length (int): Length of the parsed text.
max_input_chars (int): Maximal number of input chars the LLM can process in one call.
Returns:
Dictionary containing the minimum and maximum chunk size as number of characters.
"""
min_number_of_chunks = 5
max_chunk_size = min(
int(0.25 * max_input_chars),
max(int(text_length / min_number_of_chunks), 500),
)
logger.debug(
f"The maximal chunk size is set to {max_chunk_size} based on: "
f"min(int(0.25 * max_input_chars), max(int(text_length/{min_number_of_chunks}), 500)) = min("
f"{int(0.25 * max_input_chars)}, max({int(text_length / min_number_of_chunks)}, 500))."
)
min_chunk_size = max(int(0.5 * max_chunk_size), 500)
logger.debug(
f"The minimal chunk size is set to {min_chunk_size} based on: max(500, 0.45 * maximal chunk size) "
f"(={int(0.45 * max_chunk_size)}))."
)
return {"min_chunk_size": min_chunk_size, "max_chunk_size": max_chunk_size}
get_info
Returns pipeline information (label, name, placeholder, is_remote).
| RETURNS | DESCRIPTION |
|---|---|
dict[str, Any]
|
Pipeline information (label, name, placeholder, is_remote). |
Source code in docs/microservices/summary/src/summarizing/summary.py
load_basic_auth
staticmethod
Load env-variable and check if it is missing. Split into username and password.
| PARAMETER | DESCRIPTION |
|---|---|
file_path
|
Path of token file to be loaded.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
HTTPBasicAuth
|
Loaded username and password. |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
Raised in case the credentials can not be loaded. |
Source code in docs/microservices/summary/src/summarizing/summary.py
@staticmethod
def load_basic_auth(file_path: str | Path) -> HTTPBasicAuth:
"""Load env-variable and check if it is missing. Split into username and password.
Args:
file_path (str | Path): Path of token file to be loaded.
Returns:
Loaded username and password.
Raises:
ValueError: Raised in case the credentials can not be loaded.
"""
credentials = Summary.load_secret(file_path)
try:
username, password = credentials.split(":")
except ValueError as e:
logger.debug(
"Credentials could not be loaded. Please check the credentials."
"Hint: credentials should be in the format 'username:password'."
f" Error: {str(e)}"
)
raise ValueError(
"Unable to establish connection: Invalid credentials format."
)
return HTTPBasicAuth(username, password)
load_secret
staticmethod
Load env-variable and check if it is missing.
| PARAMETER | DESCRIPTION |
|---|---|
file_path
|
Path of token file to be loaded.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
str
|
Loaded token |
| RAISES | DESCRIPTION |
|---|---|
Exception
|
Raised in case of an FileNotFoundError because the token can not be loaded. |
Source code in docs/microservices/summary/src/summarizing/summary.py
@staticmethod
def load_secret(file_path: str | Path) -> str:
"""Load env-variable and check if it is missing.
Args:
file_path (str | Path): Path of token file to be loaded.
Returns:
Loaded token
Raises:
Exception: Raised in case of an FileNotFoundError because the token can not be loaded.
"""
try:
with open(file_path) as f:
token = f.read().splitlines()[0]
return token
except FileNotFoundError:
logger.critical(f"Could not find Token - Check your folder: '{file_path}'")
raise Exception
process_desired_summary_length
Processes the user input for the desired summary length.
Checks if the value is valid. If not, set to default value to ignore user input. Calculates the summary length in characters (desired_summary_chars) using an estimate of 4000 chars per page.
| PARAMETER | DESCRIPTION |
|---|---|
desired_summary_length
|
Containing the information needed to compute the desired summary length from the two keys output_length (desired length of the summary output as number of pages) and input_length length of the summary input text (parsing output length as number of characters).
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
dict[str, Any]
|
Dictionary with following keys: - desired_summary_chars (int) : Desired number of characters for the final summary output. - messages (list[str]) : List of messages to the user regarding the summary. |
Source code in docs/microservices/summary/src/summarizing/summary.py
def process_desired_summary_length(
self, desired_summary_length: dict[str, Any]
) -> dict[str, Any]:
"""Processes the user input for the desired summary length.
Checks if the value is valid. If not, set to default value to ignore user input.
Calculates the summary length in characters (desired_summary_chars) using an estimate of 4000 chars per page.
Args:
desired_summary_length (dict): Containing the information needed to compute the desired summary
length from the two keys output_length (desired length of the summary output
as number of pages) and input_length length of the summary input text
(parsing output length as number of characters).
Returns:
Dictionary with following keys:
- desired_summary_chars (int) : Desired number of characters for the final summary output.
- messages (list[str]) : List of messages to the user regarding the summary.
"""
messages = []
# get number of characters for summary length parameters
minimal_length = 500 # prevent hallucinations
half_text_length = int(
0.5 * desired_summary_length["input_length"]
) # 4000 chars ~ 1 DIN A4 page
max_length = 60000 # 6000 chars ~ 15 DIN A4 pages
if desired_summary_length["output_length"] <= 0:
logger.warning("Using default summary length.")
desired_summary_chars = 0
messages.append(
"Die Zusammenfassungslänge entspricht der Standardeinstellung."
)
else:
desired_summary_chars = int(desired_summary_length["output_length"] * 4000)
desired_summary_chars = max(minimal_length, desired_summary_chars)
desired_summary_chars = min(
half_text_length, desired_summary_chars, max_length
)
if desired_summary_chars == minimal_length:
messages.append(
"Die erstellte Zusammenfassung weicht von der Ziellänge ab. "
"Mehr Informationen hierzu finden Sie in den FAQ."
)
logger.debug(
f"The desired summary length is set to the minimum of {minimal_length} chars."
)
elif desired_summary_chars == half_text_length:
messages.append(
"Die erstellte Zusammenfassung weicht von der Ziellänge ab. "
"Mehr Informationen hierzu finden Sie in den FAQ."
)
logger.debug(
f"The desired summary length is set to the maximum of {half_text_length} chars, "
"which is half the input text length."
)
logger.info(
f"The desired summary length is set to {desired_summary_chars} chars based on "
f"desired {desired_summary_length['output_length']} pages (and "
f"half input text length = {half_text_length} chars)."
)
return {
"desired_summary_chars": desired_summary_chars,
"messages": messages,
}
process_topic_input
async
Processes the focus topics provided by the user by extracting the topics in the proper format.
This converts user input that does not meet the required format, such as short sentences, into a comma-separated string listing the topics the summary should focus on.
| PARAMETER | DESCRIPTION |
|---|---|
topics
|
User input text stating the topics the summary should focus on.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
str
|
A comma-separated string listing the topics the summary should focus on.
TYPE:
|
Source code in docs/microservices/summary/src/summarizing/summary.py
async def process_topic_input(self, topics: str) -> str:
"""Processes the focus topics provided by the user by extracting the topics in the proper format.
This converts user input that does not meet the required format, such as short sentences, into a comma-separated
string listing the topics the summary should focus on.
Args:
topics (str): User input text stating the topics the summary should focus on.
Returns:
str: A comma-separated string listing the topics the summary should focus on.
"""
if topics.strip() != "":
prep_topics_prompt_input = {"topics": "topics"}
prep_topics_prompt = ChatPromptTemplate.from_template(
template=self.llm.prompt_config.system.prepare_focus_topics,
template_format="f-string",
partial_variables=prep_topics_prompt_input,
)
logger.debug(f"Prompt for topic preparation: '{prep_topics_prompt}'.")
self.prep_topics_chain = (
prep_topics_prompt | self.model_provider | StrOutputParser()
)
topics = await self.prep_topics_chain.ainvoke({"topics": topics})
logger.info(f"Topics after preparation: '{topics}'.")
if topics.strip() == "":
topics = None
return topics
summarize
async
Generates a summary for a list of chunks.
The content of each chunk is summarized by the map chain. These summaries are then collected. If the aggregated length of these summaries exceeds the maximum (max_input_chars), the summaries are summarized again using the reduce chain. This process is repeated until the condition is satisfied. Then the final summary is generated with the reduce chain.
| PARAMETER | DESCRIPTION |
|---|---|
summarize_input
|
Containing chunks incl. metadata, messages, prompt parameters and LLM.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
dict[str, Any]
|
Contains the summary as str and messages to the user as list. |
Source code in docs/microservices/summary/src/summarizing/summary.py
async def summarize(self, summarize_input: SummarizeInput) -> dict[str, Any]:
"""Generates a summary for a list of chunks.
The content of each chunk is summarized by the map chain.
These summaries are then collected. If the aggregated length of these summaries
exceeds the maximum (max_input_chars), the summaries are summarized again using the reduce chain.
This process is repeated until the condition is satisfied.
Then the final summary is generated with the reduce chain.
Args:
summarize_input (SummarizeInput): Containing chunks incl. metadata, messages, prompt parameters and LLM.
Returns:
Contains the summary as str and messages to the user as list.
"""
logger.info(f"Start summarizing {len(summarize_input.chunks)} chunks.")
steps = []
async for step in self.graph.astream(
# initialize OverallState
{
"contents": [doc.page_content for doc in summarize_input.chunks],
"focus_instructions_map": summarize_input.summary_parameters.prompt_parameters.focus_instructions_map,
"focus_instructions_reduce": (
summarize_input.summary_parameters.prompt_parameters.focus_instructions_reduce
),
"focus_instructions_final": (
summarize_input.summary_parameters.prompt_parameters.focus_instructions_final
),
"desired_summary_chars": summarize_input.summary_parameters.desired_summary_chars,
"max_input_chars": summarize_input.summary_parameters.max_input_chars,
"messages": summarize_input.summary_parameters.messages,
"num_reduce_call": 0,
"quit_reducing": False,
},
{"recursion_limit": 40},
):
steps.append(step)
logger.debug("Finished summarizing.")
summary = steps[-1].get("generate_final_summary").get("final_summary")
messages = steps[-1].get("generate_final_summary").get("messages")
return {"summary": summary, "messages": messages}
summary_registry
Summary Registry contains all summary pipelines.
| CLASS | DESCRIPTION |
|---|---|
SummaryRegistry |
Manages summary pipelines including parameter configurations. |
SummaryRegistry
Manages summary pipelines including parameter configurations.
| ATTRIBUTE | DESCRIPTION |
|---|---|
summary_pipelines |
: A list containing all pipelines (one for each available LLM).
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
configure_summary_parameters |
Computes parameter based on parsing output and user input to finish initialization of the summary pipeline. |
estimate_max_input_chars |
Calculating the maximal number of characters of text input which should be summarized in one LLM call. |
request_summary |
Start the summarization of chunks according to the given parameters. |
Source code in docs/microservices/summary/src/summarizing/summary_registry.py
class SummaryRegistry:
"""Manages summary pipelines including parameter configurations.
Attributes:
summary_pipelines (list[Summary]):: A list containing all pipelines (one for each available LLM).
"""
def __init__(
self,
) -> None:
"""Initializes the list of Summary instances."""
self.summary_pipelines = self._initialize_pipelines()
def _initialize_pipelines(self) -> list[Summary]:
"""Load all available summary pipelines based on custom configuration.
Returns:
A list containing all pipelines (one for each available LLM).
"""
pipelines = []
for llm_name, llm in llm_config.summary.items():
pipelines.append(Summary(llm, llm_name))
logger.info(f"Initialized {len(pipelines)} summary pipelines.")
return pipelines
async def request_summary(
self,
summarize_input: SummarizeInput,
text: str,
) -> SummaryAPIOutput:
"""Start the summarization of chunks according to the given parameters.
Args:
summarize_input (SummarizeInput): Containing chunks incl. metadata, messages, prompt_parameters and LLM.
text (str): Parsed input file or text.
Returns:
Contains the summary, a message to the user and the parsing output.
Raises:
HTTPException: HTTP_424_FAILED_DEPENDENCY raised if the text could not be summarized due to missing chunks.
HTTP_400_BAD_REQUEST raised if the selected language model was invalid.
"""
# errorhandling for missing chunks
if not summarize_input.chunks:
logger.error(
"No summarization possible because there are no chunks to summarize."
)
raise HTTPException(
status_code=status.HTTP_424_FAILED_DEPENDENCY,
detail=(
"Der Text konnte nicht verarbeitet werden. Bitte versuchen Sie es mit einer anderen Datei."
),
)
# select and start pipeline
summary_result = {}
for pipeline in self.summary_pipelines:
if pipeline.llm_name == summarize_input.language_model:
summary_pipeline = pipeline
summary_result = await summary_pipeline.summarize(
summarize_input=summarize_input,
)
break
# errorhandling for missing pipeline
if not summary_pipeline:
logger.error(
f"Warning: Invalid 'language_model' requested: {summarize_input.language_model}."
)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=(
f"Mit dem ausgewählten Sprachmodell {summarize_input.language_model} konnte keine Zusammenfassung "
"generiert werden. Bitte versuchen Sie es mit einem anderen Sprachmodell."
),
)
return SummaryAPIOutput(
summary=summary_result["summary"],
warning_msg=" ".join(set(summary_result["messages"])),
parsed_text=text,
)
async def configure_summary_parameters(
self,
warning_msg: list[str],
language_model: str,
remaining_context_length: int,
desired_summary_length: dict,
topics: str | None,
) -> SummaryParameters:
"""Computes parameter based on parsing output and user input to finish initialization of the summary pipeline.
Computes chuncksize minimum and maximum according to the specifications of the LLM and the input text length.
Calculating the maximal input length (max_input_chars) which should be summarized in one LLM call,
otherwise the summary needs to perform another recursion of its reduce part.
The desired summary length in chars is needed as reduce-loop criterion and to influence the length of the LLM
response to the map and reduce prompt.
Args:
warning_msg (list[str]): List of messages to the user, which regard the summary output.
language_model (str): Name of the LLM as stated in llms.yml
desired_summary_length (dict): Containing the information needed to compute the desired summary
length from the two keys output_length (float) and input_length (int).
remaining_context_length (int): Remaining context length after substracting all prompt lengths and a 30%
buffer during max_input_chars estimation.
topics (str | None): User input text stating the topics the summary should focus on.
Returns:
SummaryParameters: Parameters needed for summarization, which are computed according the users input.
Raises:
HTTPException: HTTP_400_BAD_REQUEST raised if the selected language model was invalid.
"""
summary_pipeline = None
messages = warning_msg
# get summary pipeline for calculations
for pipeline in self.summary_pipelines:
if pipeline.llm_name == language_model:
summary_pipeline = pipeline
break
if not summary_pipeline:
logger.error(
"Parameters can not be computed because there is no summary pipeline initialized, "
f"which uses {language_model}."
)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=(
f"Es wurde ein ungültiges Sprachmodell ausgewählt: {language_model}."
" Bitte versuchen Sie es mit einem anderen Sprachmodell."
),
)
# prepare topics and set them as focus for summarization
if topics:
topics = await summary_pipeline.process_topic_input(topics=topics)
prompt_parameters = summary_pipeline.configure_prompt_parameters(topics=topics)
# compute desired summary length and maximum of input characters
summary_length_parameters = summary_pipeline.process_desired_summary_length(
desired_summary_length=desired_summary_length
)
messages.extend(summary_length_parameters["messages"])
max_input_chars = summary_pipeline.calculate_max_input_chars(
desired_summary_chars=summary_length_parameters["desired_summary_chars"],
remaining_context_length=remaining_context_length,
)
summary_parameters = SummaryParameters(
messages=messages,
desired_summary_chars=summary_length_parameters["desired_summary_chars"],
max_input_chars=max_input_chars,
prompt_parameters=prompt_parameters,
)
logger.debug(
f"All parameters for the summarization have been computed: '{summary_parameters}'"
)
return summary_parameters
def estimate_max_input_chars(
self,
language_model: str,
) -> int:
"""Calculating the maximal number of characters of text input which should be summarized in one LLM call.
The calculation is based on the context length of the currently used LLM (stated in the llms.yml).
The context length needs to encompass the length of the system-prompt, the length of the text input and
the length of the summary as output.
A estimate of 30% for text input and summary output is used.
Furthermore the context window should not be maxed out in order to ensure high quality summaries (therefore we
only use 70% the theoretically remaining max input length).
Args:
language_model (str): Name of the LLM as stated in llms.yml
Returns:
dict: containing a estimation of the maximal number of input characters for the current summary set up
(which will be used to compute the ideal chunksize) and the remaining context length after substracting
all prompt lengths and a 30% buffer (which will be used to refine the aximal number of input characters
for further usage in the reduce loop of the summarization)
"""
# get summary pipeline for calculations
summary_pipeline = None
for pipeline in self.summary_pipelines:
if pipeline.llm_name == language_model:
summary_pipeline = pipeline
break
if not summary_pipeline:
logger.error(
"Parameters can not be computed because there is no summary pipeline initialized, "
f"which uses {language_model}."
)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=(
f"Es wurde ein ungültiges Sprachmodell ausgewählt: {language_model}."
" Bitte versuchen Sie es mit einem anderen Sprachmodell."
),
)
max_prompt_length = max(
(len(prompt.main) + len(prompt.focus_instructions))
for prompt in [
summary_pipeline.llm.prompt_config.system.map,
summary_pipeline.llm.prompt_config.system.reduce,
summary_pipeline.llm.prompt_config.system.final,
]
)
context_length_chars = (
summary_pipeline.llm.context_length * 4
) # 1 token ~ 4 chars
remaining_context_length = int(
(context_length_chars - max_prompt_length)
* 0.7 # cap at 70% context-length
)
max_input_chars = int(remaining_context_length * 0.7)
logger.debug(
f"The max_input_chars are set to {max_input_chars} = 70% of remaining_context_length "
f"{remaining_context_length} with remaining_context_length = 70% of (LLMs maximal number of "
f"input chars - the longest prompt length (map/reduce/final prompt))."
)
return {
"max_input_chars_estimate": max_input_chars,
"remaining_context_length": remaining_context_length,
}
configure_summary_parameters
async
configure_summary_parameters(warning_msg, language_model, remaining_context_length, desired_summary_length, topics)
Computes parameter based on parsing output and user input to finish initialization of the summary pipeline.
Computes chuncksize minimum and maximum according to the specifications of the LLM and the input text length. Calculating the maximal input length (max_input_chars) which should be summarized in one LLM call, otherwise the summary needs to perform another recursion of its reduce part. The desired summary length in chars is needed as reduce-loop criterion and to influence the length of the LLM response to the map and reduce prompt.
| PARAMETER | DESCRIPTION |
|---|---|
warning_msg
|
List of messages to the user, which regard the summary output.
TYPE:
|
language_model
|
Name of the LLM as stated in llms.yml
TYPE:
|
desired_summary_length
|
Containing the information needed to compute the desired summary length from the two keys output_length (float) and input_length (int).
TYPE:
|
remaining_context_length
|
Remaining context length after substracting all prompt lengths and a 30% buffer during max_input_chars estimation.
TYPE:
|
topics
|
User input text stating the topics the summary should focus on.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SummaryParameters
|
Parameters needed for summarization, which are computed according the users input.
TYPE:
|
| RAISES | DESCRIPTION |
|---|---|
HTTPException
|
HTTP_400_BAD_REQUEST raised if the selected language model was invalid. |
Source code in docs/microservices/summary/src/summarizing/summary_registry.py
async def configure_summary_parameters(
self,
warning_msg: list[str],
language_model: str,
remaining_context_length: int,
desired_summary_length: dict,
topics: str | None,
) -> SummaryParameters:
"""Computes parameter based on parsing output and user input to finish initialization of the summary pipeline.
Computes chuncksize minimum and maximum according to the specifications of the LLM and the input text length.
Calculating the maximal input length (max_input_chars) which should be summarized in one LLM call,
otherwise the summary needs to perform another recursion of its reduce part.
The desired summary length in chars is needed as reduce-loop criterion and to influence the length of the LLM
response to the map and reduce prompt.
Args:
warning_msg (list[str]): List of messages to the user, which regard the summary output.
language_model (str): Name of the LLM as stated in llms.yml
desired_summary_length (dict): Containing the information needed to compute the desired summary
length from the two keys output_length (float) and input_length (int).
remaining_context_length (int): Remaining context length after substracting all prompt lengths and a 30%
buffer during max_input_chars estimation.
topics (str | None): User input text stating the topics the summary should focus on.
Returns:
SummaryParameters: Parameters needed for summarization, which are computed according the users input.
Raises:
HTTPException: HTTP_400_BAD_REQUEST raised if the selected language model was invalid.
"""
summary_pipeline = None
messages = warning_msg
# get summary pipeline for calculations
for pipeline in self.summary_pipelines:
if pipeline.llm_name == language_model:
summary_pipeline = pipeline
break
if not summary_pipeline:
logger.error(
"Parameters can not be computed because there is no summary pipeline initialized, "
f"which uses {language_model}."
)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=(
f"Es wurde ein ungültiges Sprachmodell ausgewählt: {language_model}."
" Bitte versuchen Sie es mit einem anderen Sprachmodell."
),
)
# prepare topics and set them as focus for summarization
if topics:
topics = await summary_pipeline.process_topic_input(topics=topics)
prompt_parameters = summary_pipeline.configure_prompt_parameters(topics=topics)
# compute desired summary length and maximum of input characters
summary_length_parameters = summary_pipeline.process_desired_summary_length(
desired_summary_length=desired_summary_length
)
messages.extend(summary_length_parameters["messages"])
max_input_chars = summary_pipeline.calculate_max_input_chars(
desired_summary_chars=summary_length_parameters["desired_summary_chars"],
remaining_context_length=remaining_context_length,
)
summary_parameters = SummaryParameters(
messages=messages,
desired_summary_chars=summary_length_parameters["desired_summary_chars"],
max_input_chars=max_input_chars,
prompt_parameters=prompt_parameters,
)
logger.debug(
f"All parameters for the summarization have been computed: '{summary_parameters}'"
)
return summary_parameters
estimate_max_input_chars
Calculating the maximal number of characters of text input which should be summarized in one LLM call.
The calculation is based on the context length of the currently used LLM (stated in the llms.yml). The context length needs to encompass the length of the system-prompt, the length of the text input and the length of the summary as output.
A estimate of 30% for text input and summary output is used.
Furthermore the context window should not be maxed out in order to ensure high quality summaries (therefore we only use 70% the theoretically remaining max input length).
| PARAMETER | DESCRIPTION |
|---|---|
language_model
|
Name of the LLM as stated in llms.yml
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
dict
|
containing a estimation of the maximal number of input characters for the current summary set up (which will be used to compute the ideal chunksize) and the remaining context length after substracting all prompt lengths and a 30% buffer (which will be used to refine the aximal number of input characters for further usage in the reduce loop of the summarization)
TYPE:
|
Source code in docs/microservices/summary/src/summarizing/summary_registry.py
def estimate_max_input_chars(
self,
language_model: str,
) -> int:
"""Calculating the maximal number of characters of text input which should be summarized in one LLM call.
The calculation is based on the context length of the currently used LLM (stated in the llms.yml).
The context length needs to encompass the length of the system-prompt, the length of the text input and
the length of the summary as output.
A estimate of 30% for text input and summary output is used.
Furthermore the context window should not be maxed out in order to ensure high quality summaries (therefore we
only use 70% the theoretically remaining max input length).
Args:
language_model (str): Name of the LLM as stated in llms.yml
Returns:
dict: containing a estimation of the maximal number of input characters for the current summary set up
(which will be used to compute the ideal chunksize) and the remaining context length after substracting
all prompt lengths and a 30% buffer (which will be used to refine the aximal number of input characters
for further usage in the reduce loop of the summarization)
"""
# get summary pipeline for calculations
summary_pipeline = None
for pipeline in self.summary_pipelines:
if pipeline.llm_name == language_model:
summary_pipeline = pipeline
break
if not summary_pipeline:
logger.error(
"Parameters can not be computed because there is no summary pipeline initialized, "
f"which uses {language_model}."
)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=(
f"Es wurde ein ungültiges Sprachmodell ausgewählt: {language_model}."
" Bitte versuchen Sie es mit einem anderen Sprachmodell."
),
)
max_prompt_length = max(
(len(prompt.main) + len(prompt.focus_instructions))
for prompt in [
summary_pipeline.llm.prompt_config.system.map,
summary_pipeline.llm.prompt_config.system.reduce,
summary_pipeline.llm.prompt_config.system.final,
]
)
context_length_chars = (
summary_pipeline.llm.context_length * 4
) # 1 token ~ 4 chars
remaining_context_length = int(
(context_length_chars - max_prompt_length)
* 0.7 # cap at 70% context-length
)
max_input_chars = int(remaining_context_length * 0.7)
logger.debug(
f"The max_input_chars are set to {max_input_chars} = 70% of remaining_context_length "
f"{remaining_context_length} with remaining_context_length = 70% of (LLMs maximal number of "
f"input chars - the longest prompt length (map/reduce/final prompt))."
)
return {
"max_input_chars_estimate": max_input_chars,
"remaining_context_length": remaining_context_length,
}
request_summary
async
Start the summarization of chunks according to the given parameters.
| PARAMETER | DESCRIPTION |
|---|---|
summarize_input
|
Containing chunks incl. metadata, messages, prompt_parameters and LLM.
TYPE:
|
text
|
Parsed input file or text.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SummaryAPIOutput
|
Contains the summary, a message to the user and the parsing output. |
| RAISES | DESCRIPTION |
|---|---|
HTTPException
|
HTTP_424_FAILED_DEPENDENCY raised if the text could not be summarized due to missing chunks. HTTP_400_BAD_REQUEST raised if the selected language model was invalid. |
Source code in docs/microservices/summary/src/summarizing/summary_registry.py
async def request_summary(
self,
summarize_input: SummarizeInput,
text: str,
) -> SummaryAPIOutput:
"""Start the summarization of chunks according to the given parameters.
Args:
summarize_input (SummarizeInput): Containing chunks incl. metadata, messages, prompt_parameters and LLM.
text (str): Parsed input file or text.
Returns:
Contains the summary, a message to the user and the parsing output.
Raises:
HTTPException: HTTP_424_FAILED_DEPENDENCY raised if the text could not be summarized due to missing chunks.
HTTP_400_BAD_REQUEST raised if the selected language model was invalid.
"""
# errorhandling for missing chunks
if not summarize_input.chunks:
logger.error(
"No summarization possible because there are no chunks to summarize."
)
raise HTTPException(
status_code=status.HTTP_424_FAILED_DEPENDENCY,
detail=(
"Der Text konnte nicht verarbeitet werden. Bitte versuchen Sie es mit einer anderen Datei."
),
)
# select and start pipeline
summary_result = {}
for pipeline in self.summary_pipelines:
if pipeline.llm_name == summarize_input.language_model:
summary_pipeline = pipeline
summary_result = await summary_pipeline.summarize(
summarize_input=summarize_input,
)
break
# errorhandling for missing pipeline
if not summary_pipeline:
logger.error(
f"Warning: Invalid 'language_model' requested: {summarize_input.language_model}."
)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=(
f"Mit dem ausgewählten Sprachmodell {summarize_input.language_model} konnte keine Zusammenfassung "
"generiert werden. Bitte versuchen Sie es mit einem anderen Sprachmodell."
),
)
return SummaryAPIOutput(
summary=summary_result["summary"],
warning_msg=" ".join(set(summary_result["messages"])),
parsed_text=text,
)
utils
Utils functions for logging, LLM availability check, LLM authentication and configuration processing.
| MODULE | DESCRIPTION |
|---|---|
base_logger |
Set up the root logger for the entire application. This logger will log messages to the console and a file. |
check_model_api_availability |
This module provides functions to check LLM-APIs for availability. |
openai_custom_auth |
Customized Httpx Authentication Client. |
process_configs |
Methods to load and config and start checks of config integrity. |
base_logger
Set up the root logger for the entire application. This logger will log messages to the console and a file.
| FUNCTION | DESCRIPTION |
|---|---|
setup_logger |
Initializes the logger with the desired log level and add handlers. |
setup_logger
Initializes the logger with the desired log level and add handlers.
Sets up the root logger, which all other loggers inherit from. Adds file, console and exit handlers to the logger and sets the format.
Source code in docs/microservices/summary/src/utils/base_logger.py
def setup_logger() -> None:
"""Initializes the logger with the desired log level and add handlers.
Sets up the root logger, which all other loggers inherit from.
Adds file, console and exit handlers to the logger and sets the format.
"""
logger = logging.getLogger()
# create different handlers for log file and console
file_handler = logging.handlers.RotatingFileHandler(
filename=settings.log_file,
maxBytes=settings.log_file_max_bytes,
backupCount=settings.log_file_backup_count,
)
console_handler = logging.StreamHandler()
# define log format and set for each handler
formatter = logging.Formatter(
fmt="%(asctime)s - %(levelname)8s - %(module)s - %(funcName)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S%z",
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# add handlers to the logger
logger.addHandler(file_handler)
logger.addHandler(console_handler)
logger.setLevel(settings.log_level)
check_model_api_availability
This module provides functions to check LLM-APIs for availability.
To check a certain LLM use await check_model_api(llm).
To get all LLMs that are activated in configs/general.yml, use await get_available_llms().
| FUNCTION | DESCRIPTION |
|---|---|
get_available_llms |
Returns a list of available LLMs. |
is_model_api_available |
Checks if API is available using credentials. |
get_available_llms
async
Returns a list of available LLMs.
| RETURNS | DESCRIPTION |
|---|---|
list[dict[str, Any]]
|
List of available LLMs with selected infos |
Source code in docs/microservices/summary/src/utils/check_model_api_availability.py
async def get_available_llms() -> list[dict[str, Any]]:
"""Returns a list of available LLMs.
Returns:
List of available LLMs with selected infos
"""
available_llms = []
# iterate over model_groups (services), i.e. chat, RAG, embedding, ...
for model_group_key in llm_config:
logger.debug(f"Checking APIs for {model_group_key}-LLMs.")
model_group = llm_config[model_group_key]
for llm_name, llm in model_group.items():
logger.debug(f"Checking availability of {llm_name}")
if await is_model_api_available(llm.api, llm_name):
llm_dict = llm.model_dump(include=["label", "is_remote"])
llm_dict["name"] = llm_name
available_llms.append(llm_dict)
return available_llms
is_model_api_available
async
Checks if API is available using credentials.
Availability is checked by sending a HEAD, GET, or POST request. If a health_check endpoint is provided, the request is sent to that endpoint; otherwise, it is sent to the main API URL.
| PARAMETER | DESCRIPTION |
|---|---|
llm_api
|
The LLMAPI instance to check.
TYPE:
|
llm_name
|
ID of the LLM as used in the config file as reference.
TYPE:
|
timeout_in_s
|
Http timeout in seconds; defaults to 10.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
bool
|
Whether the model API is available or not. |
Source code in docs/microservices/summary/src/utils/check_model_api_availability.py
async def is_model_api_available(
llm_api: LLMAPI,
llm_name: str,
timeout_in_s: int = 10,
) -> bool:
"""Checks if API is available using credentials.
Availability is checked by sending a HEAD, GET, or POST request. If a health_check endpoint is provided,
the request is sent to that endpoint; otherwise, it is sent to the main API URL.
Args:
llm_api (LLMAPI): The LLMAPI instance to check.
llm_name (str): ID of the LLM as used in the config file as reference.
timeout_in_s (int): Http timeout in seconds; defaults to 10.
Returns:
Whether the model API is available or not.
"""
headers = {"Content-type": "application/json"}
# Authorization is not always needed
if llm_api.auth:
headers["Authorization"] = llm_api.auth.get_auth_header()
url = llm_api.get_health_check_url()
# test health check endpoint with HEAD, GET and POST
try:
async with httpx.AsyncClient() as client:
response = await client.get(
url,
headers=headers,
timeout=timeout_in_s,
)
logger.debug(
f"{url} health check via GET request: {response.status_code=}, LLM: '{llm_name}"
)
# test with POST
if response.status_code != HTTPStatus.OK:
async with httpx.AsyncClient() as client:
response = await client.post(
url,
headers=headers,
timeout=timeout_in_s,
)
logger.debug(
f"{url} health check via POST request: {response.status_code=}, LLM: '{llm_name}"
)
except Exception as e:
logger.warning(
f"Exception when trying to reach LLM API. Error: {e}, LLM: '{llm_name}"
)
return False
if response.status_code != HTTPStatus.OK:
logger.warning(
f"LLM unavailable: Could not establish connection to LLM-API. LLM: '{llm_name}"
)
return response.status_code == HTTPStatus.OK
openai_custom_auth
Customized Httpx Authentication Client.
| CLASS | DESCRIPTION |
|---|---|
CustomAuthClient |
Custom HTTP transport for OpenAI client. |
CustomAuthClient
Bases: AsyncClient
Custom HTTP transport for OpenAI client.
This class supports both Bearer Token Authentication and Basic Authentication.
If auth_type is 'token', the secret is expected to be the API key.
If auth_type is 'basic_auth', the secret is expected to be a base64-encoded string of 'username:password'.
| ATTRIBUTE | DESCRIPTION |
|---|---|
auth_header |
Authentication header for the httpx client.
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
a_send |
Asynchronous method for sending HTTP requests. |
send |
Synchronous method for sending HTTP requests. |
Source code in docs/microservices/summary/src/utils/openai_custom_auth.py
class CustomAuthClient(httpx.AsyncClient):
"""Custom HTTP transport for OpenAI client.
This class supports both Bearer Token Authentication and Basic Authentication.
If `auth_type` is 'token', the `secret` is expected to be the API key.
If `auth_type` is 'basic_auth', the `secret` is expected to be a base64-encoded string of 'username:password'.
Attributes:
auth_header (str): Authentication header for the httpx client.
Methods:
a_send(request, *args, **kwargs): Asynchronous method for sending HTTP requests.
send(request, *args, **kwargs): Synchronous method for sending HTTP requests.
"""
def __init__(
self,
secret: str | None = None,
auth_type: Literal["token", "basic_auth"] | None = None,
*args: object,
**kwargs: object,
) -> None:
"""Initializes the custom HTTP transport for OpenAI client.
Initialization expects authentication per 'token' (here the `secret` is expected to be the OpenAI API key)
or 'basic_auth'(here the `secret` is expected to be a base64-encoded string of 'username:password').
Args:
secret (str | None): OpenAI API Key or Basic Auth credentials (username:password).
This is required depending on the `auth_type`. If `auth_type` is 'token', the `secret` should be
the API key. If `auth_type` is 'basic_auth', the `secret` should be a base64-encoded string of
'username:password'.
auth_type (Literal | None): The type of authentication to use. It can be 'token' or 'basic_auth'.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Raises:
ValueError: If `auth_type` is provided but `secret` is not provided.
"""
super().__init__(*args, **kwargs)
self.auth_header = ""
if auth_type and not secret:
raise ValueError("API credentials are required but missing.")
if auth_type == "token":
self.auth_header = f"Bearer {secret}"
elif auth_type == "basic_auth":
encoded_credentials = base64.b64encode(secret.encode()).decode()
self.auth_header = f"Basic {encoded_credentials}"
async def a_send(
self,
request: httpx.Request,
*args: object,
**kwargs: object,
) -> httpx.Response:
"""Asynchronous version of the send method to handle requests asynchronously.
Args:
request (httpx.Request): Request to send asynchronously.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Returns:
Response to the request.
"""
if "Authorization" in request.headers:
del request.headers["Authorization"]
if self.auth_header:
request.headers["Authorization"] = self.auth_header
return await super().a_send(request, *args, **kwargs)
def send(
self,
request: httpx.Request,
*args: object,
**kwargs: object,
) -> httpx.Response:
"""Version of the send method to handle requests synchronously.
Args:
request (httpx.Request): Request to send synchronously.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Returns:
Response to the request.
"""
if "Authorization" in request.headers:
del request.headers["Authorization"]
if self.auth_header:
request.headers["Authorization"] = self.auth_header
return super().send(request, *args, **kwargs)
a_send
async
Asynchronous version of the send method to handle requests asynchronously.
| PARAMETER | DESCRIPTION |
|---|---|
request
|
Request to send asynchronously.
TYPE:
|
*args
|
Variable length argument list.
TYPE:
|
**kwargs
|
Arbitrary keyword arguments.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Response
|
Response to the request. |
Source code in docs/microservices/summary/src/utils/openai_custom_auth.py
async def a_send(
self,
request: httpx.Request,
*args: object,
**kwargs: object,
) -> httpx.Response:
"""Asynchronous version of the send method to handle requests asynchronously.
Args:
request (httpx.Request): Request to send asynchronously.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Returns:
Response to the request.
"""
if "Authorization" in request.headers:
del request.headers["Authorization"]
if self.auth_header:
request.headers["Authorization"] = self.auth_header
return await super().a_send(request, *args, **kwargs)
send
Version of the send method to handle requests synchronously.
| PARAMETER | DESCRIPTION |
|---|---|
request
|
Request to send synchronously.
TYPE:
|
*args
|
Variable length argument list.
TYPE:
|
**kwargs
|
Arbitrary keyword arguments.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Response
|
Response to the request. |
Source code in docs/microservices/summary/src/utils/openai_custom_auth.py
def send(
self,
request: httpx.Request,
*args: object,
**kwargs: object,
) -> httpx.Response:
"""Version of the send method to handle requests synchronously.
Args:
request (httpx.Request): Request to send synchronously.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Returns:
Response to the request.
"""
if "Authorization" in request.headers:
del request.headers["Authorization"]
if self.auth_header:
request.headers["Authorization"] = self.auth_header
return super().send(request, *args, **kwargs)
process_configs
Methods to load and config and start checks of config integrity.
| FUNCTION | DESCRIPTION |
|---|---|
load_all_configs |
Loads config settings from respective paths. |
load_from_yml_in_pydantic_model |
Loads config from 'list_of_yaml_paths' into given pydantic-Model. |
load_yaml |
Loads yaml files. |
merge_specific_cfgs_in_place |
Copies prompt-config to appropriate section in general llm_config. |
postprocess_configs |
Post-Processes loaded configs. |
remove_unavailable_models |
Removes models from all useacases, if they are not in 'active_models'. |
load_all_configs
Loads config settings from respective paths.
| PARAMETER | DESCRIPTION |
|---|---|
general_config_paths
|
Path to config, matching 'Settings'.
TYPE:
|
path_to_llm_prompts
|
Path to config, matching 'LLMPromptMaps'.
TYPE:
|
path_to_llm_model_configs
|
Path to config, matching 'LLMConfig'.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
tuple[Settings, LLMConfig]
|
Config loaded into their Pydantic Model. |
Source code in docs/microservices/summary/src/utils/process_configs.py
def load_all_configs(
general_config_paths: Path,
path_to_llm_prompts: Path,
path_to_llm_model_configs: Path,
) -> tuple[Settings, LLMConfig]:
"""Loads config settings from respective paths.
Args:
general_config_paths (Path): Path to config, matching 'Settings'.
path_to_llm_prompts (Path): Path to config, matching 'LLMPromptMaps'.
path_to_llm_model_configs (Path): Path to config, matching 'LLMConfig'.
Returns:
Config loaded into their Pydantic Model.
"""
settings = load_from_yml_in_pydantic_model(general_config_paths, Settings)
llm_prompts = load_from_yml_in_pydantic_model(path_to_llm_prompts, LLMPromptMaps)
llm_config = load_from_yml_in_pydantic_model(path_to_llm_model_configs, LLMConfig)
postprocess_configs(settings, llm_prompts, llm_config)
return settings, llm_config
load_from_yml_in_pydantic_model
Loads config from 'list_of_yaml_paths' into given pydantic-Model.
| PARAMETER | DESCRIPTION |
|---|---|
yaml_path
|
Yaml to load.
TYPE:
|
pydantic_reference_model
|
Pydantic model to load yaml into.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
BaseModel
|
BaseModel derived Pydantic data class. |
| RAISES | DESCRIPTION |
|---|---|
ValidationError
|
Raised in case of an invalid configurations from the yaml file. |
Source code in docs/microservices/summary/src/utils/process_configs.py
def load_from_yml_in_pydantic_model(
yaml_path: Path, pydantic_reference_model: BaseModel
) -> BaseModel:
"""Loads config from 'list_of_yaml_paths' into given pydantic-Model.
Args:
yaml_path (Path): Yaml to load.
pydantic_reference_model (BaseModel): Pydantic model to load yaml into.
Returns:
BaseModel derived Pydantic data class.
Raises:
ValidationError: Raised in case of an invalid configurations from the yaml file.
"""
data = load_yaml(yaml_path)
try:
pydantic_class = pydantic_reference_model(**data)
logger.info(f"Config loaded from: '{yaml_path}'")
return pydantic_class
except ValidationError as e:
logger.critical(f"Error loading config: '{e}'")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=(
"Die Zusammenfassungsfunktion steht aufgrund eines technischen Fehlers derzeit nicht zur "
"Verfügung. "
),
)
load_yaml
Loads yaml files.
| PARAMETER | DESCRIPTION |
|---|---|
yaml_path
|
Path to yaml.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
dict[str, Any]
|
Content of loaded yaml. |
| RAISES | DESCRIPTION |
|---|---|
FileNotFoundError
|
Raised in case of an invalid path to the yaml file. |
Source code in docs/microservices/summary/src/utils/process_configs.py
def load_yaml(yaml_path: Path) -> dict[str, Any]:
"""Loads yaml files.
Args:
yaml_path (list[Path]): Path to yaml.
Returns:
Content of loaded yaml.
Raises:
FileNotFoundError: Raised in case of an invalid path to the yaml file.
"""
if not yaml_path.exists():
logger.error(f"Invalid path: '{yaml_path}'")
raise FileNotFoundError
with open(yaml_path) as file:
return yaml.safe_load(file)
merge_specific_cfgs_in_place
Copies prompt-config to appropriate section in general llm_config.
Only if 'prompt_map' in LLMConfig can be found in LLMPromptMaps, it will be merged. i.e. try to generalize sth. like this:
cfg["phi3:mini"].prompt_config = prompt[cfg["phi3:mini"].prompt_map]
| PARAMETER | DESCRIPTION |
|---|---|
llm_config
|
Target for merge of Prompt parameter.
TYPE:
|
llm_prompts
|
Source to merge Prompt parameter from.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
bool
|
True if no problems occurred. |
Source code in docs/microservices/summary/src/utils/process_configs.py
def merge_specific_cfgs_in_place(
llm_config: LLMConfig, llm_prompts: LLMPromptMaps
) -> bool:
"""Copies prompt-config to appropriate section in general llm_config.
Only if 'prompt_map' in LLMConfig can be found in LLMPromptMaps, it will be merged.
i.e. try to generalize sth. like this:
cfg["phi3:mini"].prompt_config = prompt[cfg["phi3:mini"].prompt_map]
Args:
llm_config (LLMConfig): Target for merge of Prompt parameter.
llm_prompts (LLMPromptMaps): Source to merge Prompt parameter from.
Returns:
True if no problems occurred.
"""
no_issues_occurred = True
for usecase in llm_config:
# load identical usecases, i.e. chat, RAG
try:
cfg = getattr(llm_config, usecase)
prompt = getattr(llm_prompts, usecase)
except AttributeError:
logger.warning(
f"Usecase '{usecase}' not matching between prompt- and general llm config. \
Skipping cfg-merge for '{usecase}' .."
)
no_issues_occurred = False
continue
# copy prompt config to its usecase- and model-counterpart
for model in cfg:
prompt_map_to_use = cfg[model].prompt_map
if prompt_map_to_use in prompt:
cfg[model].prompt_config = prompt[prompt_map_to_use]
else:
logger.warning(
f"'prompt_map: {prompt_map_to_use}' from LLM-config not in prompt-config for '{usecase}'. \
Skipping .."
)
no_issues_occurred = False
continue
return no_issues_occurred
postprocess_configs
Post-Processes loaded configs.
Remove unused models (from settings.active_models), merge LLMPromptMaps into LLMConfig.
| PARAMETER | DESCRIPTION |
|---|---|
settings
|
Config matching pydantic 'Settings'.
TYPE:
|
llm_prompts
|
Config matching pydantic 'LLMPromptMaps'.
TYPE:
|
llm_config
|
Config matching pydantic 'LLMConfig'.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
LLMConfig
|
Merged and filtered LLM configuration. |
Source code in docs/microservices/summary/src/utils/process_configs.py
def postprocess_configs(
settings: Settings, llm_prompts: LLMPromptMaps, llm_config: LLMConfig
) -> LLMConfig:
"""Post-Processes loaded configs.
Remove unused models (from settings.active_models), merge LLMPromptMaps into LLMConfig.
Args:
settings (Settings): Config matching pydantic 'Settings'.
llm_prompts (LLMPromptMaps): Config matching pydantic 'LLMPromptMaps'.
llm_config (LLMConfig): Config matching pydantic 'LLMConfig'.
Returns:
Merged and filtered LLM configuration.
"""
remove_unavailable_models(llm_config, settings.active_llms)
merge_specific_cfgs_in_place(llm_config, llm_prompts)
return llm_config
remove_unavailable_models
Removes models from all useacases, if they are not in 'active_models'.
| PARAMETER | DESCRIPTION |
|---|---|
input_config
|
Config to change.
TYPE:
|
active_models
|
Models to keep - remove other.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
None
|
None |
Source code in docs/microservices/summary/src/utils/process_configs.py
def remove_unavailable_models(
input_config: LLMConfig, active_models: list[str]
) -> None:
"""Removes models from all useacases, if they are not in 'active_models'.
Args:
input_config (LLMConfig): Config to change.
active_models (list[str]): Models to keep - remove other.
Returns:
None
"""
for usecase in input_config:
cfg = getattr(input_config, usecase)
available_models_for_usecase = getattr(active_models, usecase)
for model in list(cfg):
if model not in available_models_for_usecase:
cfg.pop(model)