RAG
rag
| MODULE | DESCRIPTION |
|---|---|
main |
Main module of the application. |
src |
Source code of the chat containing core components and utilities. |
main
Main module of the application.
This module serves as the entry point for the program. It imports necessary modules, sets up any initial configuration or data structures, and possibly defines main functions or classes that are used throughout the application.
src
Source code of the chat containing core components and utilities.
| MODULE | DESCRIPTION |
|---|---|
app |
Initialize the app. |
endpoints |
This module provides API endpoints for RAG. |
models |
Data model classes for loading and validation API and configuration parameters. |
rag |
Implementation of the core logic and interaction flow of the RAG. |
settings |
Load all settings from a central place, not hidden in utils. |
utils |
Utils functions for logging, LLM availability check and configuration processing. |
app
Initialize the app.
| FUNCTION | DESCRIPTION |
|---|---|
lifespan |
Sets up a scheduler and updates available llms. |
lifespan
async
Sets up a scheduler and updates available llms.
This lifespan function is started on startup of FastAPI. The first part
- till yield is executed on startup and initializes a scheduler to regulary
check the LLM-API. The second part is executed on shutdown and is used to
clean up the scheduler.
The available LLMs - i.e. the LLMs where API-checks passed - are cached in
FastAPI state object as app.state.available_llms.
| PARAMETER | DESCRIPTION |
|---|---|
_app
|
fastapi.applications.FastAPI object
TYPE:
|
Source code in docs/microservices/rag/src/app.py
@asynccontextmanager
async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
"""Sets up a scheduler and updates available llms.
This lifespan function is started on startup of FastAPI. The first part
- till `yield` is executed on startup and initializes a scheduler to regulary
check the LLM-API. The second part is executed on shutdown and is used to
clean up the scheduler.
The available LLMs - i.e. the LLMs where API-checks passed - are cached in
FastAPI state object as `app.state.available_llms`.
Args:
_app (FastAPI): fastapi.applications.FastAPI object
"""
async def update_llm_state() -> None:
_app.state.available_llms = await get_available_llms()
# store available LLMs in FastAPI app state
_app.state.available_llms = await get_available_llms()
# setup a scheduler
scheduler = AsyncIOScheduler()
scheduler.add_job(
update_llm_state,
"interval",
seconds=settings.check_llm_api_interval_in_s,
)
scheduler.add_job(cleanup_elasticsearch_tempindex_task, "interval", seconds=120)
scheduler.start()
yield
# cleanup
scheduler.shutdown()
endpoints
This module provides API endpoints for RAG.
| FUNCTION | DESCRIPTION |
|---|---|
fetch_database_rag |
Endpoint for question answering (RAG) on a vector database. |
fetch_file_rag |
Endpoint for question answering (RAG) on a given list of files. |
get_llms |
Returns model information of available LLMs. |
get_sources_for_rag |
This endpoint returns a list of available sources for database RAG. |
health |
Return a health check message. |
ingest_docs_to_database |
Endpoint to ingest a list of documents into the vector database. |
fetch_database_rag
async
Endpoint for question answering (RAG) on a vector database.
| PARAMETER | DESCRIPTION |
|---|---|
rag_input
|
A client-defined connection input containing the query data.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[RAGOutput]
|
Inference results returned by the vector database pipeline. |
Source code in docs/microservices/rag/src/endpoints.py
@router.post(
"/database",
response_model=list[RAGOutput],
summary="RAG knowledge database endpoint.",
description=(
"Performs a RAG (Retrieval-Augmented Generation) query on the knowledge database.\n\n"
"The endpoint retrieves relevant documents based on the input filters and question, "
"then generates answers using the specified language model."
),
openapi_extra={
"requestBody": {
"content": {
"application/json": {
"examples": {} # empty example added to avoid CI issues
}
}
}
},
responses={
200: {
"description": "Successful RAG response with retrieved documents.",
"content": {
"application/json": {
"examples": RAGOutput.model_config["json_schema_extra"][
"openapi_examples"
],
}
},
},
400: {
"description": "Invalid language model.",
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {"detail": {"type": "string"}},
"example": {
"detail": (
"Es wurde ein ungültiges Sprachmodell ausgewählt. "
"Bitte versuchen Sie es mit einem anderen Modell."
)
},
}
}
},
},
500: {"description": "Internal server error."},
},
)
async def fetch_database_rag(rag_input: RAGInput) -> list[RAGOutput]:
"""Endpoint for question answering (RAG) on a vector database.
Args:
rag_input (RAGInput): A client-defined connection input containing the query data.
Returns:
Inference results returned by the vector database pipeline.
"""
return await rag_registry.run_database_rag(rag_input)
fetch_file_rag
async
fetch_file_rag(request_timestamp=Form(None, description='Unix timestamp of the request.', example=1731252767, deprecated=True), question=Form(..., description="The user's input question to be answered.", example='What did the parties decide?'), language_model=Form(..., description='Identifier for the language model to use.', example='test_model_mock'), max_chunks_to_use=Form(None, description='Optional upper limit on the number of text chunks used for the response.', example=5), files=File(..., description='One or more files (pdf, txt, docx).'))
Endpoint for question answering (RAG) on a given list of files.
| PARAMETER | DESCRIPTION |
|---|---|
request_timestamp
|
The timestamp of the request.
TYPE:
|
question
|
The question to be answered, encoded as FormData.
TYPE:
|
language_model
|
The selected language model. LLMs are defined in configs/llm_models.yml.
TYPE:
|
max_chunks_to_use
|
The number of chunks to use for answer generation.
TYPE:
|
files
|
List of binary-encoded uploaded files (e.g., PDF, DOCX/ODT, TXT), sent as FormData.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[RAGOutput]
|
The generated answer as an inference result. |
Source code in docs/microservices/rag/src/endpoints.py
@router.post(
"/file",
response_model=list[RAGOutput],
summary="RAG file endpoint with multiple uploads.",
description=(
"Performs a RAG (Retrieval-Augmented Generation) query based on uploaded files.\n\n"
"The endpoint parses and chunks the uploaded files, retrieves relevant documents, "
"and generates answers using the specified language model."
),
responses={
200: {
"description": "Successful RAG response with retrieved documents.",
"content": {
"application/json": {
"examples": RAGOutput.model_config["json_schema_extra"][
"openapi_examples"
],
}
},
},
400: {"description": "Invalid language model or request."},
500: {"description": "Internal server error."},
},
)
async def fetch_file_rag(
request_timestamp: int | None = Form(
None,
description="Unix timestamp of the request.",
example=1731252767,
deprecated=True,
),
question: str = Form(
...,
description="The user's input question to be answered.",
example="What did the parties decide?",
),
language_model: str = Form(
...,
description="Identifier for the language model to use.",
example="test_model_mock",
),
max_chunks_to_use: int | None = Form(
None,
description="Optional upper limit on the number of text chunks used for the response.",
example=5,
),
files: list[UploadFile] = File(
...,
description="One or more files (pdf, txt, docx).",
),
) -> list[RAGOutput]:
"""Endpoint for question answering (RAG) on a given list of files.
Args:
request_timestamp (int): The timestamp of the request.
question (str): The question to be answered, encoded as FormData.
language_model (str): The selected language model. LLMs are defined in configs/llm_models.yml.
max_chunks_to_use (int): The number of chunks to use for answer generation.
files (list): List of binary-encoded uploaded files (e.g., PDF, DOCX/ODT, TXT), sent as FormData.
Returns:
The generated answer as an inference result.
"""
rag_input = RAGInput(
request_timestamp=request_timestamp,
question=question,
meta_data_filters=None,
max_chunks_to_use=max_chunks_to_use,
language_model=language_model,
)
return await rag_registry.run_file_rag(rag_input, files)
get_llms
async
Returns model information of available LLMs.
| PARAMETER | DESCRIPTION |
|---|---|
request
|
Request-Data.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[dict]
|
List with information for each LLM. |
Source code in docs/microservices/rag/src/endpoints.py
@router.get(
"/llms",
summary="List available language models.",
description=("Returns a list of available language models (LLMs).\n\n"),
responses={
200: {
"description": "List of available LLMs.",
"content": {
"application/json": {
"example": [
{
"label": "test_model:mock",
"is_remote": False,
"name": "test_model_mock",
},
]
}
},
},
500: {"description": "Internal server error accessing microservice"},
},
)
async def get_llms(request: Request) -> list[dict]:
"""Returns model information of available LLMs.
Args:
request (Request): Request-Data.
Returns:
List with information for each LLM.
"""
app = request.app # indirectly access the FastAPI app object
return app.state.available_llms
get_sources_for_rag
async
This endpoint returns a list of available sources for database RAG.
| RETURNS | DESCRIPTION |
|---|---|
list[Source]
|
List of available sources. |
Source code in docs/microservices/rag/src/endpoints.py
@router.get(
"/sources",
response_model=list[Source],
summary="List available RAG database sources.",
description=(
"Returns a list of all available sources in the knowledge database.\n\n"
"Each source contains its name and whether date-based filtering is applied."
),
responses={
200: {
"description": "List of available database sources.",
"content": {
"application/json": {
"examples": Source.model_config["json_schema_extra"][
"openapi_examples"
],
}
},
}
},
)
async def get_sources_for_rag() -> list[Source]:
"""This endpoint returns a list of available sources for database RAG.
Returns:
List of available sources.
"""
return rag_pipeline_config.pipeline.sources
health
async
Return a health check message.
| RETURNS | DESCRIPTION |
|---|---|
dict[str, str]
|
The health check message as a dictionary. |
Source code in docs/microservices/rag/src/endpoints.py
@router.get(
"/",
summary="Health check endpoint",
description=(
"Returns a simple message indicating that the RAG service is running.\n\n"
"Use this endpoint to verify that the service is alive and responsive."
),
responses={
200: {
"description": "Health check successful",
"content": {"application/json": {"example": {"status": "RAG is running"}}},
},
500: {"description": "Internal server error"},
},
)
@router.get(
"/health",
summary="Health check endpoint",
description=(
"Returns a simple message indicating that the RAG service is running.\n\n"
"Use this endpoint to verify that the service is alive and responsive."
),
responses={
200: {
"description": "Health check successful",
"content": {"application/json": {"example": {"status": "RAG is running"}}},
},
500: {"description": "Internal server error"},
},
)
async def health() -> dict[str, str]:
"""Return a health check message.
Returns:
The health check message as a dictionary.
"""
return {"message": f"{settings.service_name} is running"}
ingest_docs_to_database
async
Endpoint to ingest a list of documents into the vector database.
Each document must be an instance of IngestionDocument and include the required
content and metadata fields. The endpoint processes the documents and inserts
them into the database asynchronously.
For contents containing page breaks, the meta data page_number is calculated automatically
during ingestion starting with page_number=1. Page breaks are represented as \f in the text.
E.g. "content": "Text on page 1. \f Text on page 2. \f Text on page 3.
Input structure: { "documents": [ { "content": "string", "metadata": { "source": "string", # Required; must not be empty after trimming. "title": "string", # Optional. "date": "YYYY-MM-DD", # Optional. "url": "string", # Optional. } } ], "batch_size": 1 # Optional; defaults to 1. }
| PARAMETER | DESCRIPTION |
|---|---|
db_ingestion
|
Client-defined ingestion input, including connection and document data.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
dict[str, Any]
|
A status and message response, for example: |
dict[str, Any]
|
{
"status": "success",
"message": "Ingested |
dict[str, Any]
|
} |
Notes
Large document batches may cause connection timeouts. In such cases, ingestion may not complete successfully. Small batch sizes or sequential submissions are recommended.
Source code in docs/microservices/rag/src/endpoints.py
@router.post(
"/database_ingestion",
response_model=dict[str, Any],
summary="Ingest a list of documents into the vector database.",
description=(
"Each document must be an instance of `IngestionDocument` and include the required"
"`content` and `metadata` fields. The endpoint processes the documents and inserts"
"them into the database asynchronously."
""
"For contents containing page breaks, the meta data `page_number` is calculated automatically"
"during ingestion starting with `page_number=1`. Page breaks are represented as `\\f` in the text."
"E.g. `content`: `Text on page 1. \\f Text on page 2. \\f Text on page 3.`"
),
openapi_extra={
"requestBody": {
"content": {
"application/json": {
"examples": {} # empty example added to avoid CI issues
}
}
}
},
)
async def ingest_docs_to_database(
db_ingestion: DBIngestionInput,
) -> dict[str, Any]:
r"""Endpoint to ingest a list of documents into the vector database.
Each document must be an instance of `IngestionDocument` and include the required
`content` and `metadata` fields. The endpoint processes the documents and inserts
them into the database asynchronously.
For contents containing page breaks, the meta data `page_number` is calculated automatically
during ingestion starting with `page_number=1`. Page breaks are represented as `\f` in the text.
E.g. `"content": "Text on page 1. \f Text on page 2. \f Text on page 3.`
Input structure:
{
"documents": [
{
"content": "string",
"metadata": {
"source": "string", # Required; must not be empty after trimming.
"title": "string", # Optional.
"date": "YYYY-MM-DD", # Optional.
"url": "string", # Optional.
}
}
],
"batch_size": 1 # Optional; defaults to 1.
}
Args:
db_ingestion (DBIngestionInput): Client-defined ingestion input, including connection
and document data.
Returns:
A status and message response, for example:
{
"status": "success",
"message": "Ingested <n> documents and <m> chunks into the database."
}
Notes:
Large document batches may cause connection timeouts. In such cases, ingestion
may not complete successfully. Small batch sizes or sequential submissions are
recommended.
"""
status = await rag_registry.index_database_rag(db_ingestion=db_ingestion)
return status
models
Data model classes for loading and validation API and configuration parameters.
| MODULE | DESCRIPTION |
|---|---|
api_input |
Pydantic models for input parameters of the RAG API. |
api_output |
Pydantic models for API output parameters of the RAG service. |
general |
Load and check Settings from yml. |
llms |
pydantic model for LLM config. |
rag_config |
pydantic models describing a RAG pipeline. |
api_input
Pydantic models for input parameters of the RAG API.
| CLASS | DESCRIPTION |
|---|---|
DBIngestionInput |
Defines the structure of valid input for document ingestion into the database. |
DocumentMetadata |
Defines the metadata associated with a document. |
DoubleDocsHandling |
Defines options for handling already embedded docs during input indexing. |
IngestionDocument |
Defines the structure of a document to be ingested into the database. |
MetadataFilter |
Defines metadata-based filters for RAG document retrieval. |
RAGInput |
Model defining the input of a valid RAG request. |
DBIngestionInput
Bases: BaseModel
Defines the structure of valid input for document ingestion into the database.
| ATTRIBUTE | DESCRIPTION |
|---|---|
documents |
List of documents to be ingested.
TYPE:
|
batch_size |
Number of documents per upload batch. (optional, defaults to 1)
TYPE:
|
Source code in docs/microservices/rag/src/models/api_input.py
class DBIngestionInput(BaseModel):
"""Defines the structure of valid input for document ingestion into the database.
Attributes:
documents (list[IngestionDocument]): List of documents to be ingested.
batch_size (PositiveInt): Number of documents per upload batch. (optional, defaults to 1)
"""
documents: list[IngestionDocument]
batch_size: PositiveInt = Field(1, description="Split Upload in Batches.")
double_doc_handling: DoubleDocsHandling = DoubleDocsHandling.IGNORE
DocumentMetadata
Bases: BaseModel
Defines the metadata associated with a document.
| ATTRIBUTE | DESCRIPTION |
|---|---|
source |
Required source identifier (must not be empty).
TYPE:
|
title |
Optional document title.
TYPE:
|
date |
Optional date of the document.
TYPE:
|
url |
Optional source URL.
TYPE:
|
Notes
Extra fields are allowed to support flexible metadata extension.
| METHOD | DESCRIPTION |
|---|---|
trim_strings |
Trim leading and trailing whitespace from string fields. |
validate_non_empty_source |
Ensure that the 'source' field is not empty after trimming. |
Source code in docs/microservices/rag/src/models/api_input.py
class DocumentMetadata(BaseModel):
"""Defines the metadata associated with a document.
Attributes:
source (str): Required source identifier (must not be empty).
title (str | None): Optional document title.
date (datetime.date | None): Optional date of the document.
url (str | None): Optional source URL.
Notes:
Extra fields are allowed to support flexible metadata extension.
"""
model_config = ConfigDict(extra="allow")
source: str
title: str | None = None
date: datetime.date | None = None
url: str | None = None
@field_validator("title", "source")
@classmethod
def trim_strings(cls, value: str) -> str:
"""Trim leading and trailing whitespace from string fields."""
if value is None:
return value
return value.strip()
@field_validator("source")
@classmethod
def validate_non_empty_source(cls, value: str) -> str:
"""Ensure that the 'source' field is not empty after trimming."""
if not value:
raise ValueError("Source cannot be empty")
return value
trim_strings
classmethod
Trim leading and trailing whitespace from string fields.
validate_non_empty_source
classmethod
Ensure that the 'source' field is not empty after trimming.
DoubleDocsHandling
Bases: StrEnum
Defines options for handling already embedded docs during input indexing.
Documents can either be overwritten or skipped in the embedding process.
Values
FORCE: "force" – Forces new embedding of the file into the database. IGNORE: "ignore" – Ignores the duplicate document, no embedding.
Source code in docs/microservices/rag/src/models/api_input.py
class DoubleDocsHandling(StrEnum):
"""Defines options for handling already embedded docs during input indexing.
Documents can either be overwritten or skipped in the embedding process.
Values:
FORCE: "force" – Forces new embedding of the file into the database.
IGNORE: "ignore" – Ignores the duplicate document, no embedding.
"""
FORCE = "force"
IGNORE = "ignore"
IngestionDocument
Bases: BaseModel
Defines the structure of a document to be ingested into the database.
| ATTRIBUTE | DESCRIPTION |
|---|---|
content |
The document's text content (must not be empty).
TYPE:
|
meta |
Associated metadata, including the required source field.
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
trim_strings |
Trim leading and trailing whitespace from content. |
validate_non_empty_source |
Ensure that the 'content' field is not empty after trimming. |
Source code in docs/microservices/rag/src/models/api_input.py
class IngestionDocument(BaseModel):
"""Defines the structure of a document to be ingested into the database.
Attributes:
content (str): The document's text content (must not be empty).
meta (DocumentMetadata): Associated metadata, including the required source field.
"""
content: str = Field(..., description="Document content text")
meta: DocumentMetadata = Field(
..., description="Document metadata. Must include source."
)
@field_validator("content")
@classmethod
def trim_strings(cls, value: str) -> str:
"""Trim leading and trailing whitespace from content."""
if value is None:
return value
return value.strip()
@field_validator("content")
@classmethod
def validate_non_empty_source(cls, value: str) -> str:
"""Ensure that the 'content' field is not empty after trimming."""
if not value:
raise ValueError("Source cannot be empty")
return value
trim_strings
classmethod
Trim leading and trailing whitespace from content.
validate_non_empty_source
classmethod
Ensure that the 'content' field is not empty after trimming.
MetadataFilter
Bases: BaseModel
Defines metadata-based filters for RAG document retrieval.
| ATTRIBUTE | DESCRIPTION |
|---|---|
source |
Optional source identifier. If None, all sources are considered.
TYPE:
|
start_date |
Optional start date for filtering by date.
TYPE:
|
end_date |
Optional end date for filtering by date.
TYPE:
|
Source code in docs/microservices/rag/src/models/api_input.py
class MetadataFilter(BaseModel):
"""Defines metadata-based filters for RAG document retrieval.
Attributes:
source (str | None): Optional source identifier. If None, all sources are considered.
start_date (datetime.date | None): Optional start date for filtering by date.
end_date (datetime.date | None): Optional end date for filtering by date.
"""
source: str | None = None
start_date: datetime.date | None = None
end_date: datetime.date | None = None
RAGInput
Bases: BaseModel
Model defining the input of a valid RAG request.
A RAG request can include one or more metadata filters. Each filter acts as an OR condition: a document is considered a match if it satisfies any of the given filters.
| ATTRIBUTE | DESCRIPTION |
|---|---|
question |
The user's input question to be answered.
TYPE:
|
meta_data_filters |
Optional list of metadata filters.
TYPE:
|
language_model |
Identifier for the language model to use.
TYPE:
|
request_timestamp |
Unix timestamp indicating when the request was made.
TYPE:
|
max_chunks_to_use |
Optional upper limit on the number of text chunks used for the response.
TYPE:
|
Source code in docs/microservices/rag/src/models/api_input.py
class RAGInput(BaseModel):
"""Model defining the input of a valid RAG request.
A RAG request can include one or more metadata filters. Each filter acts as an OR condition:
a document is considered a match if it satisfies *any* of the given filters.
Attributes:
question (str): The user's input question to be answered.
meta_data_filters (list[MetadataFilter] | None): Optional list of metadata filters.
language_model (str): Identifier for the language model to use.
request_timestamp (int): Unix timestamp indicating when the request was made.
max_chunks_to_use (int | None): Optional upper limit on the number of text chunks used for the response.
"""
question: str
meta_data_filters: list[MetadataFilter] | None = None
language_model: Annotated[
str, StringConstraints(strip_whitespace=True, min_length=1)
]
request_timestamp: int | None = Field(
None,
description="Unix timestamp indicating when the request was made.",
deprecated=True,
)
max_chunks_to_use: int | None = None
model_config = ConfigDict(
json_schema_extra={
"openapi_examples": {
"simple": {
"summary": "Simple RAG input",
"description": "A minimal example of a RAG request with a question and default model, no filters.",
"value": {
"question": "What did the political parties decide regarding the pension system?",
"meta_data_filters": [],
"language_model": "test_model_mock",
"request_timestamp": 1731252767,
"max_chunks_to_use": None,
},
},
"with_filters": {
"summary": "RAG input with a single metadata filter",
"description": "Example of a RAG request that applies a metadata filter to restrict the sources.",
"value": {
"question": "Between which parties was the coalition agreement concluded?",
"meta_data_filters": [
{"source": "Coalition Agreement"},
],
"language_model": "test_model_mock",
"request_timestamp": 1731252767,
"max_chunks_to_use": 5,
},
},
}
}
)
api_output
Pydantic models for API output parameters of the RAG service.
| CLASS | DESCRIPTION |
|---|---|
RAGOutput |
RAG response model defining RAG output. |
Source |
Model defining source object of RAG output. |
RAGOutput
Bases: BaseModel
RAG response model defining RAG output.
| ATTRIBUTE | DESCRIPTION |
|---|---|
question |
The user question that triggered the retrieval.
TYPE:
|
answer |
The generated answer based on the retrieved sources.
TYPE:
|
sources |
List of source documents used in the response.
TYPE:
|
Source code in docs/microservices/rag/src/models/api_output.py
class RAGOutput(BaseModel):
"""RAG response model defining RAG output.
Attributes:
question (str): The user question that triggered the retrieval.
answer (str): The generated answer based on the retrieved sources.
sources (list[RAGSourceDocument]): List of source documents used in the response.
"""
question: str
answer: str
sources: list[Source]
model_config = ConfigDict(
json_schema_extra={
"openapi_examples": {
"simple": {
"summary": "RAG output example",
"description": (
"Example of a RAG response including the user question, ",
"answer and retrieved sources.",
),
"value": {
"question": "What did the political parties decide regarding the pension system?",
"answer": "The parties decided to improve the pension system.",
"sources": [
{
"content": "We decided to improve the pension system.",
"meta": {
"source": "Coalition Agreement",
"date": "2025-11-07",
},
"url": "https://example.com/coalitionagreement.pdf",
}
],
},
}
}
}
)
Source
Bases: BaseModel
Model defining source object of RAG output.
| ATTRIBUTE | DESCRIPTION |
|---|---|
content |
The text content of the retrieved document.
TYPE:
|
meta |
Metadata about the document (e.g. title, date, source type).
TYPE:
|
url |
Optional URL pointing to the original document.
TYPE:
|
Source code in docs/microservices/rag/src/models/api_output.py
class Source(BaseModel):
"""Model defining source object of RAG output.
Attributes:
content (str): The text content of the retrieved document.
meta (dict): Metadata about the document (e.g. title, date, source type).
url (str | None): Optional URL pointing to the original document.
"""
content: str
meta: dict
url: str | None = None
general
Load and check Settings from yml.
| CLASS | DESCRIPTION |
|---|---|
ActiveLLMs |
Selects the available models for the respective use cases. |
LogLevel |
Enum class specifying possible log levels. |
Settings |
Specifies general settings for the service. |
ActiveLLMs
Bases: BaseModel
Selects the available models for the respective use cases.
| ATTRIBUTE | DESCRIPTION |
|---|---|
model_config |
Used to ignore other services, which are defined in the config.
TYPE:
|
summary |
List the names of available LLMs for the summary service.
TYPE:
|
Source code in docs/microservices/rag/src/models/general.py
class ActiveLLMs(BaseModel):
"""Selects the available models for the respective use cases.
Attributes:
model_config (ConfigDict): Used to ignore other services, which are defined in the config.
summary (List(str)): List the names of available LLMs for the summary service.
"""
model_config = ConfigDict(extra="ignore")
rag: list[str]
embedding: list[str]
LogLevel
Bases: StrEnum
Enum class specifying possible log levels.
Source code in docs/microservices/rag/src/models/general.py
class LogLevel(StrEnum):
"""Enum class specifying possible log levels."""
CRITICAL = "CRITICAL"
ERROR = "ERROR"
WARNING = "WARNING"
INFO = "INFO"
DEBUG = "DEBUG"
@classmethod
def _missing_(cls, value: object) -> None:
"""Convert strings to uppercase and recheck for existence."""
if isinstance(value, str):
value = value.upper()
for level in cls:
if level == value:
return level
return None
Settings
Bases: BaseModel
Specifies general settings for the service.
| ATTRIBUTE | DESCRIPTION |
|---|---|
model_config |
Ignores extra configuration keys not used by this service.
TYPE:
|
service_name |
Name of the current service (e.g., "rag").
TYPE:
|
n_uvicorn_workers |
Number of parallel Uvicorn worker processes.
TYPE:
|
service_endpoints |
URLs to required dependent services (e.g., parser).
TYPE:
|
active_llms |
Configuration of LLMs available for different use cases.
TYPE:
|
log_level |
Minimum logging level for general logs.
TYPE:
|
log_file_max_bytes |
Maximum size (in bytes) of a single log file before rotation.
TYPE:
|
log_file_backup_count |
Number of rotated log files to retain.
TYPE:
|
log_file |
File path where logs will be written.
TYPE:
|
check_llm_api_interval_in_s |
Interval (in seconds) to check LLM API health.
TYPE:
|
llm_api_timeout |
Timeout (in seconds) for LLM API requests.
TYPE:
|
haystack_log_level |
Logging level specific to Haystack components.
TYPE:
|
debug_haystack_pipelines |
If True, activates debug logging for Haystack pipelines.
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
ensure_log_dir |
Create the log directory after validation. |
Source code in docs/microservices/rag/src/models/general.py
class Settings(BaseModel):
"""Specifies general settings for the service.
Attributes:
model_config (ConfigDict): Ignores extra configuration keys not used by this service.
service_name (str): Name of the current service (e.g., "rag").
n_uvicorn_workers (PositiveInt): Number of parallel Uvicorn worker processes.
service_endpoints (dict[str, AnyHttpUrl]): URLs to required dependent services (e.g., parser).
active_llms (ActiveLLMs): Configuration of LLMs available for different use cases.
log_level (LogLevel): Minimum logging level for general logs.
log_file_max_bytes (PositiveInt): Maximum size (in bytes) of a single log file before rotation.
log_file_backup_count (PositiveInt): Number of rotated log files to retain.
log_file (FilePath): File path where logs will be written.
check_llm_api_interval_in_s (PositiveInt): Interval (in seconds) to check LLM API health.
llm_api_timeout (int): Timeout (in seconds) for LLM API requests.
haystack_log_level (LogLevel): Logging level specific to Haystack components.
debug_haystack_pipelines (bool): If True, activates debug logging for Haystack pipelines.
"""
model_config = ConfigDict(extra="ignore")
service_name: str = "RAG"
service_description: str = (
"Retrival-Augmented-Generation for file and database using LLMs."
)
n_uvicorn_workers: PositiveInt = 1
service_endpoints: dict[str, AnyHttpUrl]
active_llms: ActiveLLMs
log_level: LogLevel = LogLevel.INFO
log_file_max_bytes: PositiveInt = 1 * 1024 * 1024
log_file_backup_count: PositiveInt = 3
log_file: FilePath = Path("/rag/logs/log")
check_llm_api_interval_in_s: PositiveInt = 120
llm_api_timeout: int = 60
haystack_log_level: LogLevel = LogLevel.WARNING
debug_haystack_pipelines: bool = False
@model_validator(mode="after")
def ensure_log_dir(self) -> "Settings":
"""Create the log directory after validation."""
self.log_file.parent.mkdir(parents=True, exist_ok=True)
return self
ensure_log_dir
llms
pydantic model for LLM config.
| CLASS | DESCRIPTION |
|---|---|
APIAuth |
Defines Authentification settings for LLM. |
EmbeddingModel |
Defines the configuration of an embedding model instance. |
LLM |
Defines the basic structure of a LLM config. |
LLMAPI |
Defines API-Connection to LLM. |
LLMConfig |
Container for all LLM configurations used by the service. |
LLMInference |
Defines the inference parameters. |
LLMPromptConfig |
Defines the structure of a LLM prompt configuration. |
LLMPromptMaps |
Defines the LLMs used for summarization. |
LLMPrompts |
Defines the prompt parameters. |
APIAuth
Bases: BaseModel
Defines Authentification settings for LLM.
| ATTRIBUTE | DESCRIPTION |
|---|---|
type |
Either 'token' or 'basic_auth'.
TYPE:
|
secret_path |
File path where the api token or credentials are stored.
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
get_auth_header |
Generate auth part of header for http request. |
Source code in docs/microservices/rag/src/models/llms.py
class APIAuth(BaseModel):
"""Defines Authentification settings for LLM.
Attributes:
type (Literal): Either 'token' or 'basic_auth'.
secret_path (FilePath): File path where the api token or credentials are stored.
"""
type: Literal["token", "basic_auth"]
secret_path: FilePath # file path where the api token or credentials are stored
@property
def secret(self) -> SecretStr:
"""Load secret variable as 'secret'."""
with open(self.secret_path) as file:
return SecretStr(file.read().strip())
def get_auth_header(self) -> str:
"""Generate auth part of header for http request.
Returns:
The auth header.
"""
auth_header = ""
if self.type == "basic_auth":
auth_header = f"Basic {base64.b64encode(self.secret.get_secret_value().encode()).decode()}"
elif self.type == "token":
auth_header = f"Bearer {self.secret.get_secret_value()}"
return auth_header
get_auth_header
Generate auth part of header for http request.
| RETURNS | DESCRIPTION |
|---|---|
str
|
The auth header. |
Source code in docs/microservices/rag/src/models/llms.py
def get_auth_header(self) -> str:
"""Generate auth part of header for http request.
Returns:
The auth header.
"""
auth_header = ""
if self.type == "basic_auth":
auth_header = f"Basic {base64.b64encode(self.secret.get_secret_value().encode()).decode()}"
elif self.type == "token":
auth_header = f"Bearer {self.secret.get_secret_value()}"
return auth_header
EmbeddingModel
Bases: BaseModel
Defines the configuration of an embedding model instance.
| ATTRIBUTE | DESCRIPTION |
|---|---|
model_config |
Ignores extra configuration entries not relevant to this class.
TYPE:
|
label |
Human-readable name of the embedding model, suitable for UI display.
TYPE:
|
model |
Internal model identifier, used in API calls (e.g., Ollama tag).
TYPE:
|
api |
API details specifying how to interact with the embedding model.
TYPE:
|
is_remote |
Indicates whether the model is hosted on an external API.
TYPE:
|
Source code in docs/microservices/rag/src/models/llms.py
class EmbeddingModel(BaseModel):
"""Defines the configuration of an embedding model instance.
Attributes:
model_config (ConfigDict): Ignores extra configuration entries not relevant to this class.
label (str): Human-readable name of the embedding model, suitable for UI display.
model (str): Internal model identifier, used in API calls (e.g., Ollama tag).
api (LLMAPI): API details specifying how to interact with the embedding model.
is_remote (bool): Indicates whether the model is hosted on an external API.
"""
model_config = ConfigDict(extra="ignore")
label: str
model: str
api: LLMAPI
is_remote: bool
LLM
Bases: BaseModel
Defines the basic structure of a LLM config.
| ATTRIBUTE | DESCRIPTION |
|---|---|
label |
Human-readable model name that can be presented to users.
TYPE:
|
model |
Model name which is used in API call, e.g. ollama tag.
TYPE:
|
prompt_map |
Prompt map name to load LLMPromptMaps from.
TYPE:
|
is_remote |
Is this LLM hosted at an external API?
TYPE:
|
descrition |
Human-readable description of the model that can be provided to the user.
TYPE:
|
api |
API information.
TYPE:
|
inference |
Inference parameters.
TYPE:
|
max_chunks_to_use |
Maximum number of chunks/documents to use for answer generation (Default: 4)
TYPE:
|
prompt_config |
Prompts (initially None to merge it in another step).
TYPE:
|
Source code in docs/microservices/rag/src/models/llms.py
class LLM(BaseModel):
"""Defines the basic structure of a LLM config.
Attributes:
label (str): Human-readable model name that can be presented to users.
model (str): Model name which is used in API call, e.g. ollama tag.
prompt_map (str): Prompt map name to load LLMPromptMaps from.
is_remote (bool): Is this LLM hosted at an external API?
descrition (str): Human-readable description of the model that can be provided to the user.
api (LLMAPI): API information.
inference (LLMInference): Inference parameters.
max_chunks_to_use (int): Maximum number of chunks/documents to use for answer generation (Default: 4)
prompt_config (LLMPromptConfig): Prompts (initially None to merge it in another step).
"""
label: str
model: str
prompt_map: str
is_remote: bool
description: str = ""
api: LLMAPI
inference: LLMInference
max_chunks_to_use: int = None
prompt_config: LLMPromptConfig = None
LLMAPI
Bases: BaseModel
Defines API-Connection to LLM.
| ATTRIBUTE | DESCRIPTION |
|---|---|
url |
Url of the LLM.
TYPE:
|
health_check |
Relative path to health check, i.e. '/models'.
TYPE:
|
auth |
Pydantic Model defining the authentication of the LLM.
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
get_health_check_url |
Get the URL to check if API is available. |
Source code in docs/microservices/rag/src/models/llms.py
class LLMAPI(BaseModel):
"""Defines API-Connection to LLM.
Attributes:
url (AnyHttpUrl): Url of the LLM.
health_check (str | None): Relative path to health check, i.e. '/models'.
auth (APIAuth | None): Pydantic Model defining the authentication of the LLM.
"""
url: AnyHttpUrl
health_check: str | None = None
auth: APIAuth | None = None
def get_health_check_url(self) -> str:
"""Get the URL to check if API is available."""
if self.health_check:
# make sure to remove trailing and leading slashes to not override path
return urljoin(
str(self.url).rstrip("/") + "/",
self.health_check.lstrip("/"),
)
return str(self.url)
get_health_check_url
Get the URL to check if API is available.
Source code in docs/microservices/rag/src/models/llms.py
LLMConfig
Bases: BaseModel
Container for all LLM configurations used by the service.
| ATTRIBUTE | DESCRIPTION |
|---|---|
model_config |
Ignores extra configuration entries not relevant to this class.
TYPE:
|
rag |
Mapping of model names to LLM configurations for RAG use cases.
TYPE:
|
embedding |
Mapping of model names to embedding model configurations.
TYPE:
|
Source code in docs/microservices/rag/src/models/llms.py
class LLMConfig(BaseModel):
"""Container for all LLM configurations used by the service.
Attributes:
model_config (ConfigDict): Ignores extra configuration entries not relevant to this class.
rag (dict[str, LLM]): Mapping of model names to LLM configurations for RAG use cases.
embedding (dict[str, EmbeddingModel]): Mapping of model names to embedding model configurations.
"""
model_config = ConfigDict(extra="ignore")
rag: dict[str, LLM]
embedding: dict[str, EmbeddingModel]
def __iter__(self) -> Iterator[str]:
"""Get 'keys' for automatic merge with i.e. LLMPromptConfig."""
return iter(self.__dict__.keys())
def __getitem__(self, service: str) -> dict[str, LLM]:
"""Get all LLMs for a given service (e.g. "chat", "rag").
Args:
service (str): The service name (e.g., "chat", "rag").
Returns:
All configered LLMs for the given service.
"""
return self.__getattribute__(service)
LLMInference
Bases: BaseModel
Defines the inference parameters.
| ATTRIBUTE | DESCRIPTION |
|---|---|
temperature |
Randomness / variation of the output High values indicate more creativity. Default is 0.7.
TYPE:
|
max_new_tokens |
Maximum number of tokens of the generated response. Default is 2048.
TYPE:
|
top_p |
Threshold for sampling only from the most likely tokens. Default is 0.7.
TYPE:
|
Source code in docs/microservices/rag/src/models/llms.py
class LLMInference(BaseModel):
"""Defines the inference parameters.
Attributes:
temperature (PositiveFloat | None): Randomness / variation of the output High values indicate more creativity.
Default is 0.7.
max_new_tokens (PositiveInt | None): Maximum number of tokens of the generated response. Default is 2048.
top_p (PositiveFloat | None): Threshold for sampling only from the most likely tokens. Default is 0.7.
"""
temperature: PositiveFloat | None = 0.7
max_new_tokens: PositiveInt | None = 2048
top_p: PositiveFloat | None = 0.7
LLMPromptConfig
Bases: BaseModel
Defines the structure of a LLM prompt configuration.
| ATTRIBUTE | DESCRIPTION |
|---|---|
model_config |
Used to ignore other services, which are defined in the config.
TYPE:
|
system |
System prompts.
TYPE:
|
user |
User prompts.
TYPE:
|
assistant |
Assistant prompts.
TYPE:
|
Source code in docs/microservices/rag/src/models/llms.py
class LLMPromptConfig(BaseModel):
"""Defines the structure of a LLM prompt configuration.
Attributes:
model_config (ConfigDict): Used to ignore other services, which are defined in the config.
system (LLMPrompts): System prompts.
user (LLMPrompts | None): User prompts.
assistant (LLMPrompts | None): Assistant prompts.
"""
# if there are more prompt types defined that are not used in this service: just ignore them
model_config = ConfigDict(extra="ignore")
system: LLMPrompts
user: LLMPrompts | None = None
assistant: LLMPrompts | None = None
LLMPromptMaps
Bases: BaseModel
Defines the LLMs used for summarization.
| ATTRIBUTE | DESCRIPTION |
|---|---|
model_config |
Used to ignore other services, which are defined in the config.
TYPE:
|
rag |
Dictionary containing a name and definition of LLMs's available for RAG.
TYPE:
|
Source code in docs/microservices/rag/src/models/llms.py
class LLMPromptMaps(BaseModel):
"""Defines the LLMs used for summarization.
Attributes:
model_config (ConfigDict): Used to ignore other services, which are defined in the config.
rag (dict[str, LLM]): Dictionary containing a name and definition of LLMs's available for RAG.
"""
# if there are more services defined in the config: just ignore them
model_config = ConfigDict(extra="ignore")
rag: dict[str, LLMPromptConfig]
def __iter__(self) -> Iterator[str]:
"""Get 'keys' for automatic merge with i.e. LLMConfig."""
return iter(self.__dict__.keys())
LLMPrompts
Bases: BaseModel
Defines the prompt parameters.
| ATTRIBUTE | DESCRIPTION |
|---|---|
model_config |
Used to ignore other services, which are defined in the config.
TYPE:
|
generate |
Generate prompt generating response of the RAG.
TYPE:
|
Source code in docs/microservices/rag/src/models/llms.py
class LLMPrompts(BaseModel):
"""Defines the prompt parameters.
Attributes:
model_config (ConfigDict): Used to ignore other services, which are defined in the config.
generate (str): Generate prompt generating response of the RAG.
"""
# if there are more prompts defined that are not used in this service: just ignore them
model_config = ConfigDict(extra="ignore")
generate: str = ""
rag_config
pydantic models describing a RAG pipeline.
| CLASS | DESCRIPTION |
|---|---|
DB2UIMap |
Defines the structure for the mapping from DB-metadata to UI strings. |
GenerationConfig |
Defines the structure of the configuration of a RAG generation component. |
IndexingPipelineConfig |
Defines the structure of the configuration of a RAG indexing component. |
PipelineConfig |
Defines the structure of RAG pipeline configuration. |
PipelineSaveConfig |
Defines the structure of the configuration of saving a RAG component. |
RAGPipelineConfig |
Defines the structure of a RAG pipeline configuration file. |
RetrievalConfig |
Defines the configuration parameter for the retrieval pipeline. |
Source |
Model defining Database-Sources. |
DB2UIMap
Bases: BaseModel
Defines the structure for the mapping from DB-metadata to UI strings.
| METHOD | DESCRIPTION |
|---|---|
ensure_all_values_are_strings |
Validates that all keys in the provided data, including additional ones, are strings. |
validate_non_empty_source |
Ensures that the 'source' field is not empty. |
Source code in docs/microservices/rag/src/models/rag_config.py
class DB2UIMap(BaseModel):
"""Defines the structure for the mapping from DB-metadata to UI strings."""
model_config = ConfigDict(extra="allow")
source: str
@field_validator("source")
@classmethod
def validate_non_empty_source(cls, value: str) -> str:
"""Ensures that the 'source' field is not empty.
This check intentionally runs after 'trim_strings' validator.
"""
if not value:
raise ValueError("Source cannot be empty")
return value
@model_validator(mode="before")
@classmethod
def ensure_all_values_are_strings(cls, values: dict[str, Any]) -> dict[str, Any]:
"""Validates that all keys in the provided data, including additional ones, are strings."""
for key, value in values.items():
if not isinstance(value, str):
raise ValueError(
f"Field '{key}' must be a string, got {type(value).__name__}"
)
return values
ensure_all_values_are_strings
classmethod
Validates that all keys in the provided data, including additional ones, are strings.
Source code in docs/microservices/rag/src/models/rag_config.py
@model_validator(mode="before")
@classmethod
def ensure_all_values_are_strings(cls, values: dict[str, Any]) -> dict[str, Any]:
"""Validates that all keys in the provided data, including additional ones, are strings."""
for key, value in values.items():
if not isinstance(value, str):
raise ValueError(
f"Field '{key}' must be a string, got {type(value).__name__}"
)
return values
validate_non_empty_source
classmethod
Ensures that the 'source' field is not empty.
This check intentionally runs after 'trim_strings' validator.
Source code in docs/microservices/rag/src/models/rag_config.py
GenerationConfig
Bases: BaseModel
Defines the structure of the configuration of a RAG generation component.
Source code in docs/microservices/rag/src/models/rag_config.py
IndexingPipelineConfig
PipelineConfig
Bases: BaseModel
Defines the structure of RAG pipeline configuration.
Source code in docs/microservices/rag/src/models/rag_config.py
class PipelineConfig(BaseModel):
"""Defines the structure of RAG pipeline configuration."""
# if there are more services defined in the config: just ignore them
model_config = ConfigDict(extra="ignore")
index: dict[str, str]
embedding_model_name: str
retrieval_config: RetrievalConfig = RetrievalConfig()
sources: list[Source]
metadata_title_separator: str = "----"
PipelineSaveConfig
Bases: BaseModel
Defines the structure of the configuration of saving a RAG component.
Source code in docs/microservices/rag/src/models/rag_config.py
RAGPipelineConfig
RetrievalConfig
Bases: BaseModel
Defines the configuration parameter for the retrieval pipeline.
retriever_top_k: The number of documents to retrieve per query. include_reranker: Defines if a ranker should be included into the retrieval pipeline. max_chunks_to_use: The number of documents to use for answer generation. ranker_score_threshold: The score threshold to use for ranking. ranker_model: Huggingface repo_id where the re-ranker model is published. ranker_model_path: Path to local model.
Source code in docs/microservices/rag/src/models/rag_config.py
class RetrievalConfig(BaseModel):
"""Defines the configuration parameter for the retrieval pipeline.
retriever_top_k: The number of documents to retrieve per query.
include_reranker: Defines if a ranker should be included into the retrieval pipeline.
max_chunks_to_use: The number of documents to use for answer generation.
ranker_score_threshold: The score threshold to use for ranking.
ranker_model: Huggingface repo_id where the re-ranker model is published.
ranker_model_path: Path to local model.
"""
retriever_top_k: int = 50
include_ranker: bool = False
max_chunks_to_use: int = 20
ranker_score_threshold: float | None = None
ranker_model: str = "svalabs/cross-electra-ms-marco-german-uncased" # Huggingface model repo of ranker model to use
ranker_model_path: str = "/rag/models" # path for re-ranker model files
Source
Bases: BaseModel
Model defining Database-Sources.
| ATTRIBUTE | DESCRIPTION |
|---|---|
name |
The name of the document source. E.g. 'Landesdrucksachen'
TYPE:
|
date_filter |
If set to true, this document source can be filtered by date range. Documents like 'Koalitionsvertrag' will probably not need a filtering by date. 'Pressemitteilungen' probably will need that filter.
TYPE:
|
Source code in docs/microservices/rag/src/models/rag_config.py
class Source(BaseModel):
"""Model defining Database-Sources.
Attributes:
name: The name of the document source. E.g. 'Landesdrucksachen'
date_filter: If set to true, this document source can be filtered by date range. Documents
like 'Koalitionsvertrag' will probably not need a filtering by date. 'Pressemitteilungen'
probably will need that filter.
"""
name: str
date_filter: bool
model_config = ConfigDict(
json_schema_extra={
"openapi_examples": {
"standard": {
"summary": "Standard database source",
"description": "A typical database source with date-based filtering enabled.",
"value": {
"name": "Coalition Agreement",
"date_filter": False,
},
},
}
}
)
rag
Implementation of the core logic and interaction flow of the RAG.
| MODULE | DESCRIPTION |
|---|---|
pipeline_configuration |
This module initializes the RAG pipelines (database-rag and file-rag). |
pipelines |
Implementation of the RAG pipelines. |
rag_registry |
RAG-Registry class for setting-up, storing and accessing RAG pipelines. |
utils |
This module contains helper functions for RAG pipelines. |
pipeline_configuration
This module initializes the RAG pipelines (database-rag and file-rag).
| FUNCTION | DESCRIPTION |
|---|---|
setup_rag |
File and database rag pipelines. |
setup_rag
File and database rag pipelines.
| RETURNS | DESCRIPTION |
|---|---|
tuple[FileRagPipe, DatabaseRagPipe]
|
The file_rag and database_rag pipelines. |
Source code in docs/microservices/rag/src/rag/pipeline_configuration.py
def setup_rag(
*, debug_haystack_pipelines: bool = False
) -> tuple[FileRagPipe, DatabaseRagPipe]:
"""File and database rag pipelines.
Returns:
The file_rag and database_rag pipelines.
"""
if debug_haystack_pipelines:
tracing.tracer.is_content_tracing_enabled = (
True # to enable tracing/logging content (inputs/outputs)
)
tracing.enable_tracing(
LoggingTracer(
tags_color_strings={
"haystack.component.input": "\x1b[1;31m",
"haystack.component.name": "\x1b[1;34m",
}
)
)
logger.debug("Initialize database-rag pipelines.")
database_rag = DatabaseRagPipe(
pipe_name="database",
indexing_pipeline=pipeline_registry["database_indexing"],
rag_pipeline=pipeline_registry["database_rag"],
)
database_rag.deploy_pipelines()
logger.debug("Initialize file-rag pipelines.")
file_rag = FileRagPipe(
pipe_name="file",
indexing_pipeline=pipeline_registry["file_indexing"],
rag_pipeline=pipeline_registry["file_rag"],
file_deletion_pipeline=pipeline_registry["file_deleter"],
)
file_rag.deploy_pipelines()
return file_rag, database_rag
pipelines
Implementation of the RAG pipelines.
| MODULE | DESCRIPTION |
|---|---|
baseclass_rag |
This class serves as a foundation for creating RAG pipelines. |
components |
Implementation of the pipeline components of the RAG. |
database_rag |
This module implements a RAG system based on database retrievals as data source. |
draw_pipeline |
Visualizes haystack pipelines. |
file_rag |
This module implements a RAG system based on files as data source. |
openai_custom_auth_client |
This module provides an OpenAI client that supports Bearer-Token-Authentication and Basic-Authentication. |
pipeline_definitions |
Implementation of the pipeline definitions of the RAG. |
baseclass_rag
This class serves as a foundation for creating RAG pipelines.
| CLASS | DESCRIPTION |
|---|---|
BaseRagPipe |
Base class for RAG Pipelines based on haystack pipelines. |
BaseRagPipe
Bases: ABC
Base class for RAG Pipelines based on haystack pipelines.
This class serves as a foundation for creating RAG-based pipelines, leveraging the capabilities of the haystack library.
| ATTRIBUTE | DESCRIPTION |
|---|---|
name |
The name of the pipeline instance.
TYPE:
|
rag_pipe |
The haystack RAG pipeline.
TYPE:
|
indexing_pipe |
The haystack indexing pipeline.
TYPE:
|
id_retriever |
Component to retrieve document IDs.
TYPE:
|
llm_generators |
Dictionary of LLM generation configurations from pipeline metadata.
TYPE:
|
_required_metadata_keys |
Required metadata keys expected in the pipeline.
TYPE:
|
_required_components |
Required component names expected in the pipeline.
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
add_llm_router_arg_inplace |
Select the answer generation pipeline that corresponds to the language model. |
deploy_indexing_pipe |
Initializes and warms up the indexing pipeline. |
deploy_pipelines |
Initializes and warms up the indexing and rag pipeline. |
deploy_rag_pipe |
Initializes and warms up the rag pipeline. |
generate_answer |
Run the RAG pipeline to generate an answer based on a query. |
indexing |
Run the file indexing pipeline on the provided files and add ingestion_date as metadata. |
Source code in docs/microservices/rag/src/rag/pipelines/baseclass_rag.py
class BaseRagPipe(ABC):
"""Base class for RAG Pipelines based on haystack pipelines.
This class serves as a foundation for creating RAG-based pipelines,
leveraging the capabilities of the haystack library.
Attributes:
name (str): The name of the pipeline instance.
rag_pipe (AsyncPipeline): The haystack RAG pipeline.
indexing_pipe (AsyncPipeline): The haystack indexing pipeline.
id_retriever (DocumentIDRetriever): Component to retrieve document IDs.
llm_generators (dict): Dictionary of LLM generation configurations from pipeline metadata.
_required_metadata_keys (list[str]): Required metadata keys expected in the pipeline.
_required_components (list[str]): Required component names expected in the pipeline.
"""
def __init__(
self,
indexing_pipeline: AsyncPipeline,
rag_pipeline: AsyncPipeline,
pipe_name: str,
) -> None:
"""Initialize a new instance of BaseRagPipe.
Pipeline Metadata Requirements:
- `rag_answers_keys`: A list of relevant keys to access nested RAG answers.
- `retrieved_docs_keys`: A list of relevant keys to access nested retrieved documents.
- `llm_generators`: A dict of dictionaries with the available answer generation LLMs.
- The models are selected with a Haystack component called `llm_router`.
Pipeline Component Requirements:
The following components are required in the pipeline: ["query", "filters"].
Args:
indexing_pipeline (AsyncPipeline): haystack indexing pipeline.
rag_pipeline (AsyncPipeline): haystack RAG pipeline.
pipe_name (str): The name of this specific pipeline.
"""
self.name = pipe_name
self._required_metadata_keys = [
"rag_answers_keys",
"retrieved_docs_keys",
"llm_generators",
"max_chunks_to_use_component",
]
self._required_components = ["query", "filters"]
self.rag_pipe = rag_pipeline
self.indexing_pipe = indexing_pipeline
self._init_id_retriever()
self.llm_generators = self.rag_pipe.metadata["llm_generators"]
@abstractmethod
def indexing(self, files: list[bytes]) -> dict[str, object]:
"""Run the file indexing pipeline on the provided files and add ingestion_date as metadata.
Args:
files (list[bytes]): A list of byte streams representing files.
Returns:
The results of the indexing pipeline.
Raises:
AttributeError: If the indexing pipeline is not deployed.
"""
result = {"indexing_information": "some information"}
return result
@abstractmethod
def generate_answer(
self, query: str, language_model: str
) -> tuple[str, list[Document]]:
"""Run the RAG pipeline to generate an answer based on a query.
This method processes the input query through a Retrieval-Augmented Generation (RAG) pipeline,
which retrieves relevant documents and generates an answer based on that context.
Args:
query (str): The input query for which an answer is to be generated.
language_model (str): The model used for generating the answer.
Returns:
A tuple containing the generated answer and the retrieved documents.
"""
answer = "here is my answer"
documents = [
Document(
content="this are the retrieved documents",
meta={"source": "test", "title": "my_title"},
)
]
return answer, documents
def _init_id_retriever(self) -> None:
document_store = self.indexing_pipe.get_component(
"document_writer"
).document_store
self.id_retriever = DocumentIDRetriever(document_store=document_store)
def deploy_pipelines(self) -> None:
"""Initializes and warms up the indexing and rag pipeline."""
self.deploy_rag_pipe()
self.deploy_indexing_pipe()
def deploy_rag_pipe(self) -> None:
"""Initializes and warms up the rag pipeline."""
self.rag_pipe.warm_up()
def deploy_indexing_pipe(self) -> None:
"""Initializes and warms up the indexing pipeline."""
self.indexing_pipe.warm_up()
def _check_rag_config_requirements(self) -> None:
"""Validating rag configuration."""
self._check_rag_component_requirements()
self._check_rag_metadata_requirements()
def _check_rag_metadata_requirements(self) -> None:
"""Validating metadata in rag configuration."""
for key in self._required_metadata_keys:
if key not in self.rag_pipe.metadata:
raise KeyError(f"Missing {key=} in `self.rag_config['metadata']`")
def _check_rag_component_requirements(self) -> None:
"""Validating rag configuration."""
for key in self._required_components:
if key not in self.rag_pipe.graph.nodes():
raise KeyError(f"Missing component {key=} in `self.rag_pipe`")
def _compute_doc_hash(
self, document: Document | UploadFile | list[Document] | list[UploadFile]
) -> str | list[str]:
"""Compute a hash for the given document(s).
Args:
document (IngestionDocument | Document | UploadFile | bytes | list):
The document or list of documents for which to compute the hash.
Returns:
str | list[str]: SHA256 hash or list of SHA256 hashes for each document.
Raises:
TypeError: If input is not a supported type.
"""
def _single_doc_hash(doc: object) -> str:
if isinstance(doc, Document):
return self._compute_doc_hash_haystack_document(doc)
elif isinstance(doc, UploadFile):
return self._compute_doc_hash_uploadfile(doc)
else:
raise TypeError(
f"Input must be of type Document or UploadFile or a list of these types "
f"but is {type(doc)}"
)
if isinstance(document, list):
return [_single_doc_hash(doc) for doc in document]
else:
return _single_doc_hash(document)
def _compute_doc_hash_haystack_document(
self, document: Document
) -> list[str] | str:
"""Compute a hash for the given documents.
Args:
document: IngestionDocument: The documents for which to compute the hash.
Returns:
str: List of SHA256 hashes for each document.
"""
try:
data = f"{document.content}"
doc_hash = hashlib.sha256(data.encode("utf-8")).hexdigest()
except Exception as e:
logger.error(f"Compute document hash for Haystack Document failed. {e}")
return doc_hash
def _compute_doc_hash_uploadfile(
self, document: list[UploadFile] | UploadFile
) -> list[str] | str:
"""Compute a hash for the given documents.
Args:
document: | UploadFile: The documents for which to compute the hash.
Returns:
str: List of SHA256 hashes for each document.
"""
try:
document.file.seek(0)
data = f"{document.file.read()}{document.filename}"
docs_hash = hashlib.sha256(data.encode("utf-8")).hexdigest()
except Exception as e:
logger.error(f"Compute document hash for Uploadfile failed. {e}")
return docs_hash
def add_llm_router_arg_inplace(
self,
data: dict,
language_model_key: str = None,
) -> None:
"""Select the answer generation pipeline that corresponds to the language model.
Args:
language_model_key (str): The language model to select.
data (dict): Data dictionary for the haystack pipeline.
Raises:
KeyError: If the key is not found in the valid LLM generator keys.
"""
self._validate_language_model_key(language_model_key)
# add language_model to data
if "llm_router" in self.rag_pipe.graph.nodes():
data.update({"llm_router": {"generator_model": language_model_key}})
def _validate_language_model_key(self, language_model_key: str) -> None:
"""Validates that the provided language model key exists in the list of valid LLM generator keys.
Args:
language_model_key (str): The key to validate.
Raises:
KeyError: If the key is not found in the valid LLM generator keys.
"""
if language_model_key not in self.llm_generators:
raise KeyError(f"{language_model_key=} is not in pipeline-llms")
def _check_if_file_id_exists(self, doc_id: str) -> bool:
"""Check if a document with the given ID already exists in the document store.
Args:
doc_id (str): The ID of the document to check.
Returns:
bool: True if a document with the given ID exists, False otherwise.
Raises:
AttributeError: If the document store is not initialized.
"""
filters = {"field": "meta.source_id", "operator": "==", "value": doc_id}
if self.rag_pipe is None:
raise AttributeError(
"`self.rag_pipe` is not deployed. Please run `self.deploy_rag_pipe`"
)
existing_docs = self.id_retriever.run(filters=filters)
return len(existing_docs["document_ids"]) > 0, existing_docs["documents"]
def _update_ingestion_date(
self, existing_docs: list[Document], ingestion_date: datetime
) -> None:
"""Update the ingestion date of existing documents in the document store."""
document_store = self.indexing_pipe.get_component(
"document_writer"
).document_store
es_client = document_store.client
for doc in existing_docs:
response = es_client.update(
index=document_store._index,
id=doc.id,
body={"doc": {"meta.ingestion_date": ingestion_date}},
)
if response["_shards"]["failed"] > 0:
logger.error("Error updating ingestion date. Nothing updated.")
def _update_all_meta_data(self, existing_docs: list[Document], meta: dict) -> None:
document_store = self.indexing_pipe.get_component(
"document_writer"
).document_store
es_client = document_store.client
for doc in existing_docs:
flattened_meta = {f"meta.{key}": value for key, value in meta.items()}
response = es_client.update(
index=document_store._index, id=doc.id, body={"doc": flattened_meta}
)
if response["_shards"]["failed"] > 0:
logger.error("Error updating meta data. Nothing updated.")
add_llm_router_arg_inplace
Select the answer generation pipeline that corresponds to the language model.
| PARAMETER | DESCRIPTION |
|---|---|
language_model_key
|
The language model to select.
TYPE:
|
data
|
Data dictionary for the haystack pipeline.
TYPE:
|
| RAISES | DESCRIPTION |
|---|---|
KeyError
|
If the key is not found in the valid LLM generator keys. |
Source code in docs/microservices/rag/src/rag/pipelines/baseclass_rag.py
def add_llm_router_arg_inplace(
self,
data: dict,
language_model_key: str = None,
) -> None:
"""Select the answer generation pipeline that corresponds to the language model.
Args:
language_model_key (str): The language model to select.
data (dict): Data dictionary for the haystack pipeline.
Raises:
KeyError: If the key is not found in the valid LLM generator keys.
"""
self._validate_language_model_key(language_model_key)
# add language_model to data
if "llm_router" in self.rag_pipe.graph.nodes():
data.update({"llm_router": {"generator_model": language_model_key}})
deploy_indexing_pipe
deploy_pipelines
deploy_rag_pipe
generate_answer
abstractmethod
Run the RAG pipeline to generate an answer based on a query.
This method processes the input query through a Retrieval-Augmented Generation (RAG) pipeline, which retrieves relevant documents and generates an answer based on that context.
| PARAMETER | DESCRIPTION |
|---|---|
query
|
The input query for which an answer is to be generated.
TYPE:
|
language_model
|
The model used for generating the answer.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
tuple[str, list[Document]]
|
A tuple containing the generated answer and the retrieved documents. |
Source code in docs/microservices/rag/src/rag/pipelines/baseclass_rag.py
@abstractmethod
def generate_answer(
self, query: str, language_model: str
) -> tuple[str, list[Document]]:
"""Run the RAG pipeline to generate an answer based on a query.
This method processes the input query through a Retrieval-Augmented Generation (RAG) pipeline,
which retrieves relevant documents and generates an answer based on that context.
Args:
query (str): The input query for which an answer is to be generated.
language_model (str): The model used for generating the answer.
Returns:
A tuple containing the generated answer and the retrieved documents.
"""
answer = "here is my answer"
documents = [
Document(
content="this are the retrieved documents",
meta={"source": "test", "title": "my_title"},
)
]
return answer, documents
indexing
abstractmethod
Run the file indexing pipeline on the provided files and add ingestion_date as metadata.
| PARAMETER | DESCRIPTION |
|---|---|
files
|
A list of byte streams representing files.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
dict[str, object]
|
The results of the indexing pipeline. |
| RAISES | DESCRIPTION |
|---|---|
AttributeError
|
If the indexing pipeline is not deployed. |
Source code in docs/microservices/rag/src/rag/pipelines/baseclass_rag.py
@abstractmethod
def indexing(self, files: list[bytes]) -> dict[str, object]:
"""Run the file indexing pipeline on the provided files and add ingestion_date as metadata.
Args:
files (list[bytes]): A list of byte streams representing files.
Returns:
The results of the indexing pipeline.
Raises:
AttributeError: If the indexing pipeline is not deployed.
"""
result = {"indexing_information": "some information"}
return result
components
Implementation of the pipeline components of the RAG.
| MODULE | DESCRIPTION |
|---|---|
document_deleter |
This module implements a haystack pipeline component that deletes documents from a DocumentStore. |
document_id_retriever |
This module provides an implementation of a haystack pipeline component that retrieves IDs of documents. |
f13_openai_chat_generator |
This module provides an adaption of the haystack OpenAIGenerator for F13. |
f13_openai_document_embedder |
This module provides an implementation of a haystack-pipeline-component for document embedding. |
f13_openai_text_embedder |
A component for embedding strings using OpenAI-conform API. |
id_setter |
This module implements a haystack pipeline component that ssets the document_id hash. |
document_deleter
This module implements a haystack pipeline component that deletes documents from a DocumentStore.
| CLASS | DESCRIPTION |
|---|---|
DocumentDeleter |
Deletes documents from a DocumentStore. |
DocumentDeleter
Deletes documents from a DocumentStore.
This component provides deletion capabilities for documents stored in a Haystack DocumentStore.
| ATTRIBUTE | DESCRIPTION |
|---|---|
document_store |
The DocumentStore instance from which documents will be deleted.
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
from_dict |
Deserializes the component from a dictionary. |
run |
Run the DocumentWriter on the given input data. |
to_dict |
Serializes the component to a dictionary. |
Source code in docs/microservices/rag/src/rag/pipelines/components/document_deleter.py
@component
class DocumentDeleter:
"""Deletes documents from a DocumentStore.
This component provides deletion capabilities for documents stored in a Haystack DocumentStore.
Attributes:
document_store (DocumentStore): The DocumentStore instance from which documents will be deleted.
"""
def __init__(self, document_store: DocumentStore) -> None:
"""Create a DocumentDeleter component.
Args:
document_store (DocumentStore): The DocumentStore where the documents are to be written.
"""
self.document_store = document_store
def to_dict(self) -> dict[str, Any]:
"""Serializes the component to a dictionary.
Returns:
Dictionary with serialized data.
"""
return default_to_dict(self, document_store=self.document_store.to_dict())
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "DocumentDeleter":
"""Deserializes the component from a dictionary.
Args:
data (dict[str, Any]): The dictionary to deserialize from.
Returns:
The deserialized component.
Raises:
DeserializationError: If the document store is not properly specified in
the serialization data or its type cannot be imported.
"""
init_params = data.get("init_parameters", {})
if "document_store" not in init_params:
raise DeserializationError("Missing 'document_store' in serialization data")
if "type" not in init_params["document_store"]:
raise DeserializationError(
"Missing 'type' in document store's serialization data"
)
try:
module_name, type_ = init_params["document_store"]["type"].rsplit(".", 1)
logger.debug(f"Trying to import module '{module_name}'")
module = importlib.import_module(module_name)
except (ImportError, DeserializationError) as e:
raise DeserializationError(
f"DocumentStore of type '{init_params['document_store']['type']}' not correctly imported"
) from e
docstore_class = getattr(module, type_)
docstore = docstore_class.from_dict(init_params["document_store"])
data["init_parameters"]["document_store"] = docstore
return default_from_dict(cls, data)
@component.output_types(documents_deleted=int)
def run(self, document_ids: list[str]) -> dict[str, int]:
"""Run the DocumentWriter on the given input data.
Args:
document_ids (list[str]): A list of document IDs to delete from the store.
Returns:
A dictionary containing the number of documents deleted.
Raises:
ValueError: If the specified document store is not found.
"""
self.document_store.delete_documents(document_ids=document_ids)
return {"documents_deleted": len(document_ids)}
from_dict
classmethod
Deserializes the component from a dictionary.
| PARAMETER | DESCRIPTION |
|---|---|
data
|
The dictionary to deserialize from.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
DocumentDeleter
|
The deserialized component. |
| RAISES | DESCRIPTION |
|---|---|
DeserializationError
|
If the document store is not properly specified in the serialization data or its type cannot be imported. |
Source code in docs/microservices/rag/src/rag/pipelines/components/document_deleter.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "DocumentDeleter":
"""Deserializes the component from a dictionary.
Args:
data (dict[str, Any]): The dictionary to deserialize from.
Returns:
The deserialized component.
Raises:
DeserializationError: If the document store is not properly specified in
the serialization data or its type cannot be imported.
"""
init_params = data.get("init_parameters", {})
if "document_store" not in init_params:
raise DeserializationError("Missing 'document_store' in serialization data")
if "type" not in init_params["document_store"]:
raise DeserializationError(
"Missing 'type' in document store's serialization data"
)
try:
module_name, type_ = init_params["document_store"]["type"].rsplit(".", 1)
logger.debug(f"Trying to import module '{module_name}'")
module = importlib.import_module(module_name)
except (ImportError, DeserializationError) as e:
raise DeserializationError(
f"DocumentStore of type '{init_params['document_store']['type']}' not correctly imported"
) from e
docstore_class = getattr(module, type_)
docstore = docstore_class.from_dict(init_params["document_store"])
data["init_parameters"]["document_store"] = docstore
return default_from_dict(cls, data)
run
Run the DocumentWriter on the given input data.
| PARAMETER | DESCRIPTION |
|---|---|
document_ids
|
A list of document IDs to delete from the store.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
dict[str, int]
|
A dictionary containing the number of documents deleted. |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If the specified document store is not found. |
Source code in docs/microservices/rag/src/rag/pipelines/components/document_deleter.py
@component.output_types(documents_deleted=int)
def run(self, document_ids: list[str]) -> dict[str, int]:
"""Run the DocumentWriter on the given input data.
Args:
document_ids (list[str]): A list of document IDs to delete from the store.
Returns:
A dictionary containing the number of documents deleted.
Raises:
ValueError: If the specified document store is not found.
"""
self.document_store.delete_documents(document_ids=document_ids)
return {"documents_deleted": len(document_ids)}
to_dict
Serializes the component to a dictionary.
| RETURNS | DESCRIPTION |
|---|---|
dict[str, Any]
|
Dictionary with serialized data. |
document_id_retriever
This module provides an implementation of a haystack pipeline component that retrieves IDs of documents.
| CLASS | DESCRIPTION |
|---|---|
DocumentIDRetriever |
Retrieves document_ids from a DocumentStore. |
DocumentIDRetriever
Retrieves document_ids from a DocumentStore.
| ATTRIBUTE | DESCRIPTION |
|---|---|
document_store |
The DocumentStore instance from which document IDs are retrieved.
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
from_dict |
Deserializes the component from a dictionary. |
run |
Run the DocumentIDRetriever filters. |
to_dict |
Serializes the component to a dictionary. |
Source code in docs/microservices/rag/src/rag/pipelines/components/document_id_retriever.py
@component
class DocumentIDRetriever:
"""Retrieves document_ids from a DocumentStore.
Attributes:
document_store (DocumentStore): The DocumentStore instance from which document IDs are retrieved.
"""
def __init__(self, document_store: DocumentStore) -> None:
"""Create a DocumentIDRetriever component.
Args:
document_store (DocumentStore): The DocumentStore where the documents are to be written.
"""
self.document_store = document_store
def to_dict(self) -> dict[str, Any]:
"""Serializes the component to a dictionary.
Returns:
Dictionary with serialized data.
"""
return default_to_dict(self, document_store=self.document_store.to_dict())
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "DocumentIDRetriever":
"""Deserializes the component from a dictionary.
Args:
data (dict[str, Any]): The dictionary to deserialize from.
Returns:
The deserialized component.
Raises:
DeserializationError: If the document store is not properly specified in the serialization data
or its type cannot be imported.
"""
init_params = data.get("init_parameters", {})
if "document_store" not in init_params:
raise DeserializationError("Missing 'document_store' in serialization data")
if "type" not in init_params["document_store"]:
raise DeserializationError(
"Missing 'type' in document store's serialization data"
)
try:
module_name, type_ = init_params["document_store"]["type"].rsplit(".", 1)
logger.debug(f"Trying to import module '{module_name}'")
module = importlib.import_module(module_name)
except (ImportError, DeserializationError) as e:
raise DeserializationError(
f"DocumentStore of type '{init_params['document_store']['type']}' not correctly imported"
) from e
docstore_class = getattr(module, type_)
docstore = docstore_class.from_dict(init_params["document_store"])
data["init_parameters"]["document_store"] = docstore
return default_from_dict(cls, data)
@component.output_types(document_ids=list[str])
def run(self, filters: dict[str, Any] | None = None) -> dict[str, list[str]]:
"""Run the DocumentIDRetriever filters.
Args:
filters (dict[str, Any] | None): Filters applied to the retrieved `Document`s.
Raises:
ValueError: If the specified document store is not found.
"""
docs = self.document_store.filter_documents(filters=filters)
document_ids = [doc.id for doc in docs]
return {"document_ids": document_ids, "documents": docs}
from_dict
classmethod
Deserializes the component from a dictionary.
| PARAMETER | DESCRIPTION |
|---|---|
data
|
The dictionary to deserialize from.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
DocumentIDRetriever
|
The deserialized component. |
| RAISES | DESCRIPTION |
|---|---|
DeserializationError
|
If the document store is not properly specified in the serialization data or its type cannot be imported. |
Source code in docs/microservices/rag/src/rag/pipelines/components/document_id_retriever.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "DocumentIDRetriever":
"""Deserializes the component from a dictionary.
Args:
data (dict[str, Any]): The dictionary to deserialize from.
Returns:
The deserialized component.
Raises:
DeserializationError: If the document store is not properly specified in the serialization data
or its type cannot be imported.
"""
init_params = data.get("init_parameters", {})
if "document_store" not in init_params:
raise DeserializationError("Missing 'document_store' in serialization data")
if "type" not in init_params["document_store"]:
raise DeserializationError(
"Missing 'type' in document store's serialization data"
)
try:
module_name, type_ = init_params["document_store"]["type"].rsplit(".", 1)
logger.debug(f"Trying to import module '{module_name}'")
module = importlib.import_module(module_name)
except (ImportError, DeserializationError) as e:
raise DeserializationError(
f"DocumentStore of type '{init_params['document_store']['type']}' not correctly imported"
) from e
docstore_class = getattr(module, type_)
docstore = docstore_class.from_dict(init_params["document_store"])
data["init_parameters"]["document_store"] = docstore
return default_from_dict(cls, data)
run
Run the DocumentIDRetriever filters.
| PARAMETER | DESCRIPTION |
|---|---|
filters
|
Filters applied to the retrieved
TYPE:
|
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If the specified document store is not found. |
Source code in docs/microservices/rag/src/rag/pipelines/components/document_id_retriever.py
@component.output_types(document_ids=list[str])
def run(self, filters: dict[str, Any] | None = None) -> dict[str, list[str]]:
"""Run the DocumentIDRetriever filters.
Args:
filters (dict[str, Any] | None): Filters applied to the retrieved `Document`s.
Raises:
ValueError: If the specified document store is not found.
"""
docs = self.document_store.filter_documents(filters=filters)
document_ids = [doc.id for doc in docs]
return {"document_ids": document_ids, "documents": docs}
to_dict
Serializes the component to a dictionary.
| RETURNS | DESCRIPTION |
|---|---|
dict[str, Any]
|
Dictionary with serialized data. |
f13_openai_chat_generator
This module provides an adaption of the haystack OpenAIGenerator for F13.
| CLASS | DESCRIPTION |
|---|---|
F13OpenAIChatGenerator |
This class is an adaption of the haystack OpenAIGenerator for F13. |
F13OpenAIChatGenerator
Bases: OpenAIChatGenerator
This class is an adaption of the haystack OpenAIGenerator for F13.
| ATTRIBUTE | DESCRIPTION |
|---|---|
timeout |
Timeout duration (in seconds) for OpenAI client calls.
TYPE:
|
max_retries |
Maximum number of retries on OpenAI internal errors.
TYPE:
|
client |
Synchronous OpenAI client with custom authentication.
TYPE:
|
async_client |
Asynchronous OpenAI client with custom authentication.
TYPE:
|
Source code in docs/microservices/rag/src/rag/pipelines/components/f13_openai_chat_generator.py
@component
class F13OpenAIChatGenerator(OpenAIChatGenerator):
"""This class is an adaption of the haystack OpenAIGenerator for F13.
Attributes:
timeout (float): Timeout duration (in seconds) for OpenAI client calls.
max_retries (int): Maximum number of retries on OpenAI internal errors.
client (OpenAI): Synchronous OpenAI client with custom authentication.
async_client (AsyncOpenAI): Asynchronous OpenAI client with custom authentication.
"""
def __init__(
self,
llm: LLM,
streaming_callback: Callable[[StreamingChunk], None] | None = None,
generation_kwargs: dict[str, Any] | None = None,
timeout: float = 180,
max_retries: int = 5,
) -> None:
"""Creates an instance of F13OpenAIGenerator.
By setting the 'OPENAI_TIMEOUT' and 'OPENAI_MAX_RETRIES' environment variables,
you can change the timeout and max_retries parameters in the OpenAI client.
Args:
llm (LLM): The language model (pydantic object) to use.
streaming_callback (Callable[[StreamingChunk], None], optional):
A callback function called when a new token is received from the stream.
generation_kwargs (dict[str, Any], optional): Additional parameters sent directly to the OpenAI endpoint.
Supported parameters include:
- max_tokens (int): Max tokens for output.
- temperature (float): Sampling temperature for creativity.
- top_p (float): Nucleus sampling probability mass.
- n (int): Number of completions per prompt.
- stop (str or list): Sequence(s) where generation stops.
- presence_penalty (float): Penalty for token presence to reduce repetition.
- frequency_penalty (float): Penalty for token frequency to reduce repetition.
- logit_bias (dict): Bias added to specific tokens.
timeout (float, optional): Timeout for OpenAI client calls.
Defaults to 600 seconds or is inferred from `OPENAI_TIMEOUT` env variable.
max_retries (int, optional): Maximum retries on OpenAI internal errors.
Defaults to 5 or is inferred from `OPENAI_MAX_RETRIES` env variable.
"""
super(F13OpenAIChatGenerator, self).__init__( # noqa: UP008, because haystack components need this init
api_key=Secret.from_token("UNUSED_TOKEN"),
model=llm.model,
streaming_callback=streaming_callback,
api_base_url=str(llm.api.url),
generation_kwargs=generation_kwargs,
max_retries=max_retries,
timeout=timeout,
)
self.timeout = timeout
self.max_retries = max_retries
if llm.api.auth:
auth_header = llm.api.auth.get_auth_header()
auth_client = CustomAuthClient(auth_header=auth_header)
async_auth_client = CustomAsyncAuthClient(auth_header=auth_header)
else:
auth_client = CustomAuthClient(auth_header=None)
async_auth_client = CustomAsyncAuthClient(auth_header=None)
self.client = OpenAI(
http_client=auth_client,
api_key="",
base_url=str(llm.api.url),
timeout=self.timeout,
max_retries=self.max_retries,
)
self.async_client = AsyncOpenAI(
http_client=async_auth_client,
api_key="",
base_url=str(llm.api.url),
timeout=self.timeout,
max_retries=self.max_retries,
)
f13_openai_document_embedder
This module provides an implementation of a haystack-pipeline-component for document embedding.
| CLASS | DESCRIPTION |
|---|---|
F13OpenAIDocumentEmbedder |
A component for computing Document embeddings using OpenAI-conform API. |
F13OpenAIDocumentEmbedder
Bases: OpenAIDocumentEmbedder
A component for computing Document embeddings using OpenAI-conform API.
Usage example:
from haystack import Document
import F13OpenAIDocumentEmbedder
doc = Document(content="I love pizza!")
document_embedder = F13OpenAIDocumentEmbedder()
result = document_embedder.run([doc])
print(result["documents"][0].embedding)
# [0.017020374536514282, -0.023255806416273117, ...]
| ATTRIBUTE | DESCRIPTION |
|---|---|
timeout |
Timeout duration (in seconds) for OpenAI client calls.
TYPE:
|
max_retries |
Maximum number of retries on OpenAI internal errors.
TYPE:
|
client |
Synchronous OpenAI client with custom authentication.
TYPE:
|
async_client |
Asynchronous OpenAI client with custom authentication.
TYPE:
|
Source code in docs/microservices/rag/src/rag/pipelines/components/f13_openai_document_embedder.py
@component
class F13OpenAIDocumentEmbedder(OpenAIDocumentEmbedder):
"""A component for computing Document embeddings using OpenAI-conform API.
Usage example:
```python
from haystack import Document
import F13OpenAIDocumentEmbedder
doc = Document(content="I love pizza!")
document_embedder = F13OpenAIDocumentEmbedder()
result = document_embedder.run([doc])
print(result["documents"][0].embedding)
# [0.017020374536514282, -0.023255806416273117, ...]
```
Attributes:
timeout (float): Timeout duration (in seconds) for OpenAI client calls.
max_retries (int): Maximum number of retries on OpenAI internal errors.
client (OpenAI): Synchronous OpenAI client with custom authentication.
async_client (AsyncOpenAI): Asynchronous OpenAI client with custom authentication.
"""
def __init__(
self,
embedding_model: EmbeddingModel,
meta_fields_to_embed: list[str] | None = None,
embedding_separator: str = "\n",
timeout: float = 60,
max_retries: int = 5,
) -> None:
"""Create a F13OpenAIDocumentEmbedder component.
By setting the 'OPENAI_TIMEOUT' and 'OPENAI_MAX_RETRIES' environment variables,
you can change the timeout and max_retries parameters in the OpenAI client.
Args:
embedding_model (EmbeddingModel): EmbeddingModel pydantic object.
meta_fields_to_embed (list of str or None): List of metadata fields
to embed along with the document text.
embedding_separator (str): Separator used to concatenate metadata fields
to the document text.
timeout (float or None): Timeout for OpenAI client calls.
If not set, inferred from `OPENAI_TIMEOUT` env variable or defaults to 30.
max_retries (int or None): Maximum retries on OpenAI internal errors.
If not set, inferred from `OPENAI_MAX_RETRIES` env variable or defaults to 5.
"""
super(F13OpenAIDocumentEmbedder, self).__init__( # noqa: UP008, because haystack components need this init
api_key=Secret.from_token("UNUSED_TOKEN"),
model=embedding_model.model,
api_base_url=str(embedding_model.api.url),
meta_fields_to_embed=meta_fields_to_embed or [],
embedding_separator=embedding_separator,
timeout=timeout,
max_retries=max_retries,
)
if embedding_model.api.auth:
auth_header = embedding_model.api.auth.get_auth_header()
auth_client = CustomAuthClient(auth_header=auth_header)
async_auth_client = CustomAsyncAuthClient(auth_header=auth_header)
else:
auth_client = CustomAuthClient(auth_header=None)
async_auth_client = CustomAsyncAuthClient(auth_header=None)
self.client = OpenAI(
http_client=auth_client,
api_key="",
base_url=str(embedding_model.api.url),
timeout=self.timeout,
max_retries=self.max_retries,
)
self.async_client = AsyncOpenAI(
http_client=async_auth_client,
api_key="",
base_url=str(embedding_model.api.url),
timeout=self.timeout,
max_retries=self.max_retries,
)
f13_openai_text_embedder
A component for embedding strings using OpenAI-conform API.
| CLASS | DESCRIPTION |
|---|---|
F13OpenAITextEmbedder |
A component for embedding strings using OpenAI-conform API. |
F13OpenAITextEmbedder
Bases: OpenAITextEmbedder
A component for embedding strings using OpenAI-conform API.
Usage example:
import F13OpenAITextEmbedder
text_to_embed = "I love pizza!"
text_embedder = F13OpenAITextEmbedder()
print(text_embedder.run(text_to_embed))
# {'embedding': [0.017020374536514282, -0.023255806416273117, ...],
# 'meta': {'model': 'text-embedding-ada-002-v2',
# 'usage': {'prompt_tokens': 4, 'total_tokens': 4}}}
| ATTRIBUTE | DESCRIPTION |
|---|---|
timeout |
Timeout duration (in seconds) for OpenAI client calls.
TYPE:
|
max_retries |
Maximum number of retries on OpenAI internal errors.
TYPE:
|
client |
Synchronous OpenAI client with custom authentication.
TYPE:
|
async_client |
Asynchronous OpenAI client with custom authentication.
TYPE:
|
Source code in docs/microservices/rag/src/rag/pipelines/components/f13_openai_text_embedder.py
@component
class F13OpenAITextEmbedder(OpenAITextEmbedder):
"""A component for embedding strings using OpenAI-conform API.
Usage example:
```python
import F13OpenAITextEmbedder
text_to_embed = "I love pizza!"
text_embedder = F13OpenAITextEmbedder()
print(text_embedder.run(text_to_embed))
# {'embedding': [0.017020374536514282, -0.023255806416273117, ...],
# 'meta': {'model': 'text-embedding-ada-002-v2',
# 'usage': {'prompt_tokens': 4, 'total_tokens': 4}}}
```
Attributes:
timeout (float): Timeout duration (in seconds) for OpenAI client calls.
max_retries (int): Maximum number of retries on OpenAI internal errors.
client (OpenAI): Synchronous OpenAI client with custom authentication.
async_client (AsyncOpenAI): Asynchronous OpenAI client with custom authentication.
"""
def __init__(
self,
embedding_model: EmbeddingModel,
timeout: float = 60,
max_retries: int = 5,
) -> None:
"""Create an F13OpenAITextEmbedder component.
By setting the 'OPENAI_TIMEOUT' and 'OPENAI_MAX_RETRIES' environment variables,
you can change the timeout and max_retries parameters in the OpenAI client.
Args:
embedding_model (EmbeddingModel): EmbeddingModel pydantic object.
timeout (int, optional): Timeout for OpenAI client calls. If not set, inferred from
the `OPENAI_TIMEOUT` environment variable or defaults to 30.
max_retries (int, optional): Maximum retries to establish contact with OpenAI if it
returns an internal error. If not set, inferred from the `OPENAI_MAX_RETRIES`
environment variable or defaults to 5.
"""
super(F13OpenAITextEmbedder, self).__init__( # noqa: UP008, because haystack components need this init
model=embedding_model.model,
api_base_url=str(embedding_model.api.url),
api_key=Secret.from_token("UNUSED_TOKEN"),
max_retries=max_retries,
timeout=timeout,
)
if embedding_model.api.auth:
auth_header = embedding_model.api.auth.get_auth_header()
auth_client = CustomAuthClient(auth_header=auth_header)
async_auth_client = CustomAsyncAuthClient(auth_header=auth_header)
else:
auth_client = CustomAuthClient(auth_header=None)
async_auth_client = CustomAsyncAuthClient(auth_header=None)
self.client = OpenAI(
http_client=auth_client,
api_key="",
base_url=str(embedding_model.api.url),
timeout=self.timeout,
max_retries=self.max_retries,
)
self.async_client = AsyncOpenAI(
http_client=async_auth_client,
api_key="",
base_url=str(embedding_model.api.url),
timeout=self.timeout,
max_retries=self.max_retries,
)
id_setter
This module implements a haystack pipeline component that ssets the document_id hash.
| CLASS | DESCRIPTION |
|---|---|
IDSetter |
Sets the document_id hash. |
IDSetter
Sets the document_id hash.
| ATTRIBUTE | DESCRIPTION |
|---|---|
ignore_keys_for_doc_hash |
Metadata keys to ignore when creating the document hash.
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
run |
Process documents, merge YAML metadata and return with IDs. |
Source code in docs/microservices/rag/src/rag/pipelines/components/id_setter.py
@component
class IDSetter:
"""Sets the document_id hash.
Attributes:
ignore_keys_for_doc_hash (list[str]): Metadata keys to ignore when creating the document hash.
"""
def __init__(self, ignore_keys_for_doc_hash: list[str] = None) -> None:
"""Initialize the IDSetter.
Args:
ignore_keys_for_doc_hash (list[str]): All metadata keys that should be ignored when creating the ID hash.
"""
self.ignore_keys_for_doc_hash = (
ignore_keys_for_doc_hash if ignore_keys_for_doc_hash is not None else []
)
@component.output_types(documents=list[Document], document_ids=list[str])
def run(
self, documents: list[Document], doc_ids: list[str] = None
) -> dict[str, list[Document] | list[str]]:
"""Process documents, merge YAML metadata and return with IDs.
Process the list of documents to merge YAML metadata and return processed documents along with
IDs of documents missing corresponding YAML metadata.
Args:
documents (list[Document]): List of Document objects to process.
doc_ids (list[str], optional): List of document IDs to set as the document IDs. Defaults to None.
Returns:
A dictionary containing:
- documents: List of Document objects with merged metadata.
- document_ids: List of document IDs.
"""
final_doc_ids = []
input_doc_ids = doc_ids or [None] * len(documents)
for doc, precomputed_id in zip(documents, input_doc_ids):
if precomputed_id:
doc.id = precomputed_id
else:
meta = {
k: v
for k, v in doc.meta.items()
if k not in self.ignore_keys_for_doc_hash
}
doc.id = self._create_id(content=doc.content, meta=meta)
final_doc_ids.append(doc.id)
return {"documents": documents, "document_ids": final_doc_ids}
@staticmethod
def _create_id(content: str, meta: dict[str, Any] | str, **kwargs: object) -> str:
"""Create a unique SHA-256 hash ID based on the given content, metadata, and additional keyword arguments.
Args:
content (str): The content to hash (usually the document content).
meta (dict or str): The metadata to hash.
**kwargs (object): Additional keyword arguments to include in the hash.
Returns:
A SHA-256 hash string uniquely identifying the input data.
"""
data = f"{content}{meta}{kwargs}"
return hashlib.sha256(data.encode("utf-8")).hexdigest()
run
Process documents, merge YAML metadata and return with IDs.
Process the list of documents to merge YAML metadata and return processed documents along with IDs of documents missing corresponding YAML metadata.
| PARAMETER | DESCRIPTION |
|---|---|
documents
|
List of Document objects to process.
TYPE:
|
doc_ids
|
List of document IDs to set as the document IDs. Defaults to None.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
dict[str, list[Document] | list[str]]
|
A dictionary containing: - documents: List of Document objects with merged metadata. - document_ids: List of document IDs. |
Source code in docs/microservices/rag/src/rag/pipelines/components/id_setter.py
@component.output_types(documents=list[Document], document_ids=list[str])
def run(
self, documents: list[Document], doc_ids: list[str] = None
) -> dict[str, list[Document] | list[str]]:
"""Process documents, merge YAML metadata and return with IDs.
Process the list of documents to merge YAML metadata and return processed documents along with
IDs of documents missing corresponding YAML metadata.
Args:
documents (list[Document]): List of Document objects to process.
doc_ids (list[str], optional): List of document IDs to set as the document IDs. Defaults to None.
Returns:
A dictionary containing:
- documents: List of Document objects with merged metadata.
- document_ids: List of document IDs.
"""
final_doc_ids = []
input_doc_ids = doc_ids or [None] * len(documents)
for doc, precomputed_id in zip(documents, input_doc_ids):
if precomputed_id:
doc.id = precomputed_id
else:
meta = {
k: v
for k, v in doc.meta.items()
if k not in self.ignore_keys_for_doc_hash
}
doc.id = self._create_id(content=doc.content, meta=meta)
final_doc_ids.append(doc.id)
return {"documents": documents, "document_ids": final_doc_ids}
database_rag
This module implements a RAG system based on database retrievals as data source.
| CLASS | DESCRIPTION |
|---|---|
DatabaseRagPipe |
Initializes an instance of the f13-server specific DatabaseRagPipe for database-based retrieval operations. |
DatabaseRagPipe
Bases: BaseRagPipe
Initializes an instance of the f13-server specific DatabaseRagPipe for database-based retrieval operations.
This class extends BaseRagPipe and loads pipelines defined in configs/rag_pipeline_config.yml.
| ATTRIBUTE | DESCRIPTION |
|---|---|
rag_type |
Identifier for the RAG type used in this pipeline ("database").
TYPE:
|
name |
The name of the pipeline instance (inherited from BaseRagPipe).
TYPE:
|
rag_pipe |
The haystack RAG pipeline (inherited).
TYPE:
|
indexing_pipe |
The haystack indexing pipeline (inherited).
TYPE:
|
llm_generators |
Dictionary of LLM generation configurations from pipeline metadata (inherited).
TYPE:
|
_required_metadata_keys |
Required metadata keys expected in the pipeline (inherited).
TYPE:
|
_required_components |
Required component names expected in the pipeline (inherited).
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
generate_answer |
Run the rag pipeline to generate an answer based on a query. |
indexing |
Run the indexing pipeline on the provided list of documents. |
Source code in docs/microservices/rag/src/rag/pipelines/database_rag.py
class DatabaseRagPipe(BaseRagPipe):
"""Initializes an instance of the f13-server specific DatabaseRagPipe for database-based retrieval operations.
This class extends BaseRagPipe and loads pipelines defined in `configs/rag_pipeline_config.yml`.
Attributes:
rag_type (str): Identifier for the RAG type used in this pipeline ("database").
name (str): The name of the pipeline instance (inherited from BaseRagPipe).
rag_pipe (AsyncPipeline): The haystack RAG pipeline (inherited).
indexing_pipe (AsyncPipeline): The haystack indexing pipeline (inherited).
llm_generators (dict): Dictionary of LLM generation configurations from pipeline metadata (inherited).
_required_metadata_keys (list[str]): Required metadata keys expected in the pipeline (inherited).
_required_components (list[str]): Required component names expected in the pipeline (inherited).
"""
def __init__(
self,
indexing_pipeline: AsyncPipeline,
rag_pipeline: AsyncPipeline,
pipe_name: str,
) -> None:
"""Initialize a new instance of DatabaseRagPipe.
Pipeline Metadata Requirements:
- `rag_answers_keys`: A list of relevant keys to access RAG answers.
- `retrieved_docs_keys`: A list of relevant keys to access retrieved documents.
- `llm_generators`: A dict of dictionaries with the available answer generation LLMs.
- The models are selected with a Haystack component called `llm_router`.
Pipeline Component Requirements:
The following components are required in the pipeline: ["query", "filters"]
Args:
indexing_pipeline (Pipeline): Haystack indexing pipeline.
rag_pipeline (Pipeline): Haystack RAG pipeline.
pipe_name (str): The name of this specific pipeline.
"""
self.rag_type = "database"
super().__init__(
indexing_pipeline=indexing_pipeline,
rag_pipeline=rag_pipeline,
pipe_name=pipe_name,
)
async def indexing(
self, files: list[Document], double_doc_handling: DoubleDocsHandling
) -> tuple[int, int]:
"""Run the indexing pipeline on the provided list of documents.
An ingestion timestamp is automatically added to the metadata of each document.
Args:
files (list[Document]): List of Document objects to be indexed.
double_doc_handling (DoubleDocsHandling):
string indicate if embedding should be forced if doc is already in the DB
Returns:
Number of ingested chunks.
Raises:
AttributeError: If `self.indexing_pipe` is not initialized or deployed.
"""
if self.indexing_pipe is None:
raise AttributeError(
"`self.indexing_pipe` is not deployed. Please run `self.deploy_indexing_pipe`"
)
current_time = datetime.now(tz=pytz.timezone("Europe/Berlin"))
n_docs = 0
n_chunks = 0
for doc in files:
try:
doc_id = self._compute_doc_hash(doc)
doc.meta["ingestion_date"] = current_time
doc.id = doc_id
file_id_already_exists, existing_docs = self._check_if_file_id_exists(
doc_id
)
if file_id_already_exists:
logger.info(
f"Document with title {doc.meta['title']} already exists. Skipping embedding."
)
if double_doc_handling == DoubleDocsHandling.FORCE:
self._update_all_meta_data(existing_docs, doc.meta)
else:
continue
status = await self.indexing_pipe.run_async(
data={
"file_type_router": {
"sources": [],
},
"converted_docs_joiner": {"documents": [doc]},
"set_doc_id": {"doc_ids": [doc_id]},
}
)
n_chunks = n_chunks + status["document_writer"]["documents_written"]
logger.debug(status)
n_docs = n_docs + 1
except Exception as e:
logger.error(f"Failed to write document {doc.id}: {e}")
return n_docs, n_chunks
async def generate_answer(
self,
query: str,
language_model: str,
meta_data_filters: list[MetadataFilter] | None = None,
max_chunks_to_use: int = None,
) -> tuple[dict, dict]:
"""Run the rag pipeline to generate an answer based on a query.
This method processes the input query through a Retrieval-Augmented Generation (RAG) pipeline,
which retrieves relevant documents and generates an answer based on that context.
Args:
query (str): The input query for which an answer is to be generated.
language_model (str): The model used for generating the answer.
meta_data_filters (List[MetadataFilter] | None): List of metadata filters to apply during retrieval.
max_chunks_to_use (int): Maximum number of chunks to use for the retrieval query.
If None it is overwritten by the default max_chunks_to_use value of the selected llm.
Returns:
Tuple containing a dictionary with generated answers and a dictionary of documents
used for answer generation.
Raises:
KeyError: If the language_model is not found in the valid LLM generator keys.
"""
if self.rag_pipe is None:
raise AttributeError(
"`self.rag_pipe` is not deployed. Please run `self.deploy_rag_pipe`"
)
self._validate_language_model_key(language_model)
meta_data_filters = self._generate_meta_data_filters(
meta_data_filters=meta_data_filters
)
docs_component_name = self.rag_pipe.metadata["retrieved_docs_keys"][0]
max_chunks_to_use_component = self.rag_pipe.metadata[
"max_chunks_to_use_component"
][0]
data = {"query": {"value": query}, "filters": {"value": meta_data_filters}}
if (
max_chunks_to_use is None
and "max_chunks_to_use" in self.llm_generators[language_model]
):
max_chunks_to_use = self.llm_generators[language_model]["max_chunks_to_use"]
if max_chunks_to_use is not None:
data[max_chunks_to_use_component] = {"top_k": max_chunks_to_use}
self.add_llm_router_arg_inplace(language_model_key=language_model, data=data)
result = await self.rag_pipe.run_async(
data=data,
include_outputs_from=[docs_component_name],
)
documents = get_dict_value(
result, self.rag_pipe.metadata["retrieved_docs_keys"]
)
answer = get_dict_value(result, self.rag_pipe.metadata["rag_answers_keys"])
logger.debug(
"relevant sources for answer generation: "
+ "\n ####### \n".join([f"{s.meta} \n{s.content}" for s in documents])
)
return answer, documents
@staticmethod
def _generate_meta_data_filters(
meta_data_filters: list[MetadataFilter] | None = None,
) -> dict | None:
"""Generate a filter based on the provided metadata filtering options.
Args:
meta_data_filters (list | None): A list of pydantic dataclass objects with metadata filtering options.
Returns:
A dictionary representing the generated filter, or None if no filters are needed.
"""
if meta_data_filters is None:
return None
conditions = []
for meta_data_filter in meta_data_filters:
# add conditions for lists
conditions_per_filter = []
if meta_data_filter.end_date is not None:
condition = {
"field": "meta.date",
"operator": "<=",
"value": meta_data_filter.end_date,
}
conditions_per_filter.append(condition)
if meta_data_filter.start_date is not None:
condition = {
"field": "meta.date",
"operator": ">=",
"value": meta_data_filter.start_date,
}
conditions_per_filter.append(condition)
if meta_data_filter.source is not None:
condition = {
"field": "meta.source",
"operator": "==",
"value": meta_data_filter.source,
}
conditions_per_filter.append(condition)
if len(conditions_per_filter) == 0:
continue
else:
conditions.append(
{"operator": "AND", "conditions": conditions_per_filter}
)
filters = {"operator": "OR", "conditions": conditions}
logger.debug(f"Haystack filter for ES {filters}")
return filters
generate_answer
async
Run the rag pipeline to generate an answer based on a query.
This method processes the input query through a Retrieval-Augmented Generation (RAG) pipeline, which retrieves relevant documents and generates an answer based on that context.
| PARAMETER | DESCRIPTION |
|---|---|
query
|
The input query for which an answer is to be generated.
TYPE:
|
language_model
|
The model used for generating the answer.
TYPE:
|
meta_data_filters
|
List of metadata filters to apply during retrieval.
TYPE:
|
max_chunks_to_use
|
Maximum number of chunks to use for the retrieval query. If None it is overwritten by the default max_chunks_to_use value of the selected llm.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
dict
|
Tuple containing a dictionary with generated answers and a dictionary of documents |
dict
|
used for answer generation. |
| RAISES | DESCRIPTION |
|---|---|
KeyError
|
If the language_model is not found in the valid LLM generator keys. |
Source code in docs/microservices/rag/src/rag/pipelines/database_rag.py
async def generate_answer(
self,
query: str,
language_model: str,
meta_data_filters: list[MetadataFilter] | None = None,
max_chunks_to_use: int = None,
) -> tuple[dict, dict]:
"""Run the rag pipeline to generate an answer based on a query.
This method processes the input query through a Retrieval-Augmented Generation (RAG) pipeline,
which retrieves relevant documents and generates an answer based on that context.
Args:
query (str): The input query for which an answer is to be generated.
language_model (str): The model used for generating the answer.
meta_data_filters (List[MetadataFilter] | None): List of metadata filters to apply during retrieval.
max_chunks_to_use (int): Maximum number of chunks to use for the retrieval query.
If None it is overwritten by the default max_chunks_to_use value of the selected llm.
Returns:
Tuple containing a dictionary with generated answers and a dictionary of documents
used for answer generation.
Raises:
KeyError: If the language_model is not found in the valid LLM generator keys.
"""
if self.rag_pipe is None:
raise AttributeError(
"`self.rag_pipe` is not deployed. Please run `self.deploy_rag_pipe`"
)
self._validate_language_model_key(language_model)
meta_data_filters = self._generate_meta_data_filters(
meta_data_filters=meta_data_filters
)
docs_component_name = self.rag_pipe.metadata["retrieved_docs_keys"][0]
max_chunks_to_use_component = self.rag_pipe.metadata[
"max_chunks_to_use_component"
][0]
data = {"query": {"value": query}, "filters": {"value": meta_data_filters}}
if (
max_chunks_to_use is None
and "max_chunks_to_use" in self.llm_generators[language_model]
):
max_chunks_to_use = self.llm_generators[language_model]["max_chunks_to_use"]
if max_chunks_to_use is not None:
data[max_chunks_to_use_component] = {"top_k": max_chunks_to_use}
self.add_llm_router_arg_inplace(language_model_key=language_model, data=data)
result = await self.rag_pipe.run_async(
data=data,
include_outputs_from=[docs_component_name],
)
documents = get_dict_value(
result, self.rag_pipe.metadata["retrieved_docs_keys"]
)
answer = get_dict_value(result, self.rag_pipe.metadata["rag_answers_keys"])
logger.debug(
"relevant sources for answer generation: "
+ "\n ####### \n".join([f"{s.meta} \n{s.content}" for s in documents])
)
return answer, documents
indexing
async
Run the indexing pipeline on the provided list of documents.
An ingestion timestamp is automatically added to the metadata of each document.
| PARAMETER | DESCRIPTION |
|---|---|
files
|
List of Document objects to be indexed.
TYPE:
|
double_doc_handling
|
string indicate if embedding should be forced if doc is already in the DB
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
tuple[int, int]
|
Number of ingested chunks. |
| RAISES | DESCRIPTION |
|---|---|
AttributeError
|
If |
Source code in docs/microservices/rag/src/rag/pipelines/database_rag.py
async def indexing(
self, files: list[Document], double_doc_handling: DoubleDocsHandling
) -> tuple[int, int]:
"""Run the indexing pipeline on the provided list of documents.
An ingestion timestamp is automatically added to the metadata of each document.
Args:
files (list[Document]): List of Document objects to be indexed.
double_doc_handling (DoubleDocsHandling):
string indicate if embedding should be forced if doc is already in the DB
Returns:
Number of ingested chunks.
Raises:
AttributeError: If `self.indexing_pipe` is not initialized or deployed.
"""
if self.indexing_pipe is None:
raise AttributeError(
"`self.indexing_pipe` is not deployed. Please run `self.deploy_indexing_pipe`"
)
current_time = datetime.now(tz=pytz.timezone("Europe/Berlin"))
n_docs = 0
n_chunks = 0
for doc in files:
try:
doc_id = self._compute_doc_hash(doc)
doc.meta["ingestion_date"] = current_time
doc.id = doc_id
file_id_already_exists, existing_docs = self._check_if_file_id_exists(
doc_id
)
if file_id_already_exists:
logger.info(
f"Document with title {doc.meta['title']} already exists. Skipping embedding."
)
if double_doc_handling == DoubleDocsHandling.FORCE:
self._update_all_meta_data(existing_docs, doc.meta)
else:
continue
status = await self.indexing_pipe.run_async(
data={
"file_type_router": {
"sources": [],
},
"converted_docs_joiner": {"documents": [doc]},
"set_doc_id": {"doc_ids": [doc_id]},
}
)
n_chunks = n_chunks + status["document_writer"]["documents_written"]
logger.debug(status)
n_docs = n_docs + 1
except Exception as e:
logger.error(f"Failed to write document {doc.id}: {e}")
return n_docs, n_chunks
draw_pipeline
Visualizes haystack pipelines.
| FUNCTION | DESCRIPTION |
|---|---|
draw_pipeline |
Create a Graphviz visualization of the pipeline. |
draw_pipeline
Create a Graphviz visualization of the pipeline.
| PARAMETER | DESCRIPTION |
|---|---|
graph
|
Graphviz visualization object of the pipeline.
TYPE:
|
path
|
The file path where the image will be saved.
TYPE:
|
Source code in docs/microservices/rag/src/rag/pipelines/draw_pipeline.py
def draw_pipeline(graph: networkx.MultiDiGraph, path: str) -> None:
"""Create a Graphviz visualization of the pipeline.
Args:
graph: Graphviz visualization object of the pipeline.
path: The file path where the image will be saved.
"""
graphviz = to_agraph(graph.copy())
graphviz.layout("dot")
graphviz.draw(path)
file_rag
This module implements a RAG system based on files as data source.
| CLASS | DESCRIPTION |
|---|---|
FileRagPipe |
Initializes an instance of the f13-server specific FileRagPipe for file-based retrieval operations. |
FileRagPipe
Bases: BaseRagPipe
Initializes an instance of the f13-server specific FileRagPipe for file-based retrieval operations.
This class extends BaseRagPipe and is specifically configured for file-based pipelines on the f13-server.
| ATTRIBUTE | DESCRIPTION |
|---|---|
rag_type |
Identifier for the RAG type used in this pipeline ("file").
TYPE:
|
file_deletion_pipe |
Haystack pipeline for deleting indexed documents by ID.
TYPE:
|
name |
The name of the pipeline instance (inherited from BaseRagPipe).
TYPE:
|
rag_pipe |
The haystack RAG pipeline (inherited).
TYPE:
|
indexing_pipe |
The haystack indexing pipeline (inherited).
TYPE:
|
llm_generators |
Dictionary of LLM generation configurations from pipeline metadata (inherited).
TYPE:
|
_required_metadata_keys |
Required metadata keys expected in the pipeline (inherited).
TYPE:
|
_required_components |
Required component names expected in the pipeline (inherited).
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
deploy_rag_pipe |
This method loads the pipeline from yaml file and instantiates it as a haystack pipeline. |
file_deletion |
Run the temporary file deleter pipeline to delete files older than a specified timestamp. |
generate_answer |
Run the RAG pipeline to generate an answer based on a query. |
indexing |
Run the file indexing pipeline on the provided files. |
Source code in docs/microservices/rag/src/rag/pipelines/file_rag.py
class FileRagPipe(BaseRagPipe):
"""Initializes an instance of the f13-server specific FileRagPipe for file-based retrieval operations.
This class extends BaseRagPipe and is specifically configured for file-based pipelines on the f13-server.
Attributes:
rag_type (str): Identifier for the RAG type used in this pipeline ("file").
file_deletion_pipe (AsyncPipeline): Haystack pipeline for deleting indexed documents by ID.
name (str): The name of the pipeline instance (inherited from BaseRagPipe).
rag_pipe (AsyncPipeline): The haystack RAG pipeline (inherited).
indexing_pipe (AsyncPipeline): The haystack indexing pipeline (inherited).
llm_generators (dict): Dictionary of LLM generation configurations from pipeline metadata (inherited).
_required_metadata_keys (list[str]): Required metadata keys expected in the pipeline (inherited).
_required_components (list[str]): Required component names expected in the pipeline (inherited).
"""
def __init__(
self,
indexing_pipeline: AsyncPipeline,
rag_pipeline: AsyncPipeline,
file_deletion_pipeline: AsyncPipeline,
pipe_name: str,
) -> None:
"""Initialize a new instance of BaseRagPipe.
Pipeline Metadata Requirements:
- `rag_answers_keys`: A list of relevant keys to access rag answers.
- `retrieved_docs_keys`: A list of relevant keys to access retrieved documents.
- `llm_generators`: A dict of dictionaries with the available answer generation LLMs.
- The models are selected with a Haystack component called `llm_router`.
Pipeline Component Requirements:
The following components are required in the pipeline: ["query", "filters"]
Args:
indexing_pipeline (AsyncPipeline): haystack indexing pipeline.
rag_pipeline (AsyncPipeline): haystack RAG pipeline.
file_deletion_pipeline (AsyncPipeline): haystack doc-id deletion pipeline.
pipe_name (str): The name of this specific pipeline.
"""
self.rag_type = "file"
super().__init__(
indexing_pipeline=indexing_pipeline,
rag_pipeline=rag_pipeline,
pipe_name=pipe_name,
)
self.file_deletion_pipe = file_deletion_pipeline
async def indexing(self, files: list[UploadFile]) -> list[str]:
"""Run the file indexing pipeline on the provided files.
Takes a list of byte streams representing files
and runs it with the provided data. Returns a list of source IDs.
Args:
files (list[UploadFile]): A list of fastapi.UploadFile objects.
Returns:
A list of source IDs generated by the indexing pipeline.
"""
if self.indexing_pipe is None:
raise AttributeError(
"`self.indexing_pipe` is not deployed. Please run `self.deploy_indexing_pipe`"
)
current_time = datetime.now(tz=pytz.timezone("Europe/Berlin"))
source_ids = []
for file in files:
try:
files_byte_stream = convert_uploadfile_to_byte_stream(file)
doc_id = self._compute_doc_hash(file)
source_ids.append(doc_id)
file_id_already_exists, existing_docs = self._check_if_file_id_exists(
doc_id
)
if file_id_already_exists:
logger.info(
f"File {file.filename} already exists. Skipping indexing. Updating ingestion date."
)
self._update_ingestion_date(existing_docs, current_time)
continue
result = await self.indexing_pipe.run_async(
data={
"file_type_router": {
"sources": [files_byte_stream],
"meta": {"ingestion_date": current_time},
},
"set_doc_id": {"doc_ids": [doc_id]},
}
)
if not result["set_doc_id"]["document_ids"]:
logger.error(
"Something during indexing went wrong. No Documents embedded."
)
except Exception as e:
logger.info(f"Error processing file {file.filename}: {e}")
continue
return source_ids
async def generate_answer(
self,
query: str,
language_model: str,
source_ids: list[str] | None = None,
max_chunks_to_use: int | None = None,
) -> tuple[dict, dict]:
"""Run the RAG pipeline to generate an answer based on a query.
This method processes the input query through a Retrieval-Augmented Generation (RAG) pipeline,
which retrieves relevant documents and generates an answer based on that context.
Args:
query (str): The input query for which an answer is to be generated.
language_model (str): The model used for generating the answer.
source_ids (list[str] | None): Document IDs of the files available for file retrieval.
max_chunks_to_use (int | None): Maximum number of chunks to use for the retrieval query.
Returns:
A tuple containing a dictionary with generated answers and a dictionary of
documents used for answer generation.
"""
if self.rag_pipe is None:
raise AttributeError(
"`self.rag_pipe` is not deployed. Please run `self.deploy_rag_pipe`"
)
if not source_ids:
answer = "Bitte stellen Sie ein Dokument bereit."
documents = []
return answer, documents
meta_data_filters = {
"field": "meta.source_id",
"operator": "in",
"value": source_ids,
}
docs_component_name = self.rag_pipe.metadata["retrieved_docs_keys"][0]
max_chunks_to_use_component = self.rag_pipe.metadata[
"max_chunks_to_use_component"
][0]
data = {
"query": {"value": query},
"filters": {"value": meta_data_filters},
}
if (
max_chunks_to_use is None
and "max_chunks_to_use" in self.llm_generators[language_model]
):
max_chunks_to_use = self.llm_generators[language_model]["max_chunks_to_use"]
if max_chunks_to_use is not None:
data[max_chunks_to_use_component] = {"top_k": max_chunks_to_use}
self.add_llm_router_arg_inplace(language_model_key=language_model, data=data)
result = await self.rag_pipe.run_async(
data=data,
include_outputs_from=[docs_component_name],
)
documents = get_dict_value(
result, self.rag_pipe.metadata["retrieved_docs_keys"]
)
answer = get_dict_value(result, self.rag_pipe.metadata["rag_answers_keys"])
logger.debug(
"relevant sources for answer generation: "
+ "\n ####### \n".join([f"{s.meta} \n{s.content}" for s in documents])
)
return answer, documents
async def file_deletion(self, timestamp: datetime) -> int:
"""Run the temporary file deleter pipeline to delete files older than a specified timestamp.
The pipeline uses a filter to delete documents with an ingestion date
older than or equal to the provided timestamp. It then runs the retrieval
pipeline to delete these documents.
Args:
timestamp (date): The maximum ingestion date for files to be deleted.
Returns:
The number of documents deleted by the pipeline.
"""
if self.file_deletion_pipe is None:
raise AttributeError(
"`self.file_deletion_pipe` is not deployed. Please run `self.file_deletion_pipe`"
)
filters = {"field": "meta.ingestion_date", "operator": "<=", "value": timestamp}
n_deleted_docs = await self.file_deletion_pipe.run_async(
data={
"document_id_retriever": {"filters": filters},
}
)
return n_deleted_docs.get("document_deleter", {}).get("documents_deleted", 0)
def deploy_rag_pipe(self) -> None:
"""This method loads the pipeline from yaml file and instantiates it as a haystack pipeline."""
self.file_deletion_pipe.warm_up()
return super().deploy_rag_pipe()
deploy_rag_pipe
This method loads the pipeline from yaml file and instantiates it as a haystack pipeline.
file_deletion
async
Run the temporary file deleter pipeline to delete files older than a specified timestamp.
The pipeline uses a filter to delete documents with an ingestion date older than or equal to the provided timestamp. It then runs the retrieval pipeline to delete these documents.
| PARAMETER | DESCRIPTION |
|---|---|
timestamp
|
The maximum ingestion date for files to be deleted.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
int
|
The number of documents deleted by the pipeline. |
Source code in docs/microservices/rag/src/rag/pipelines/file_rag.py
async def file_deletion(self, timestamp: datetime) -> int:
"""Run the temporary file deleter pipeline to delete files older than a specified timestamp.
The pipeline uses a filter to delete documents with an ingestion date
older than or equal to the provided timestamp. It then runs the retrieval
pipeline to delete these documents.
Args:
timestamp (date): The maximum ingestion date for files to be deleted.
Returns:
The number of documents deleted by the pipeline.
"""
if self.file_deletion_pipe is None:
raise AttributeError(
"`self.file_deletion_pipe` is not deployed. Please run `self.file_deletion_pipe`"
)
filters = {"field": "meta.ingestion_date", "operator": "<=", "value": timestamp}
n_deleted_docs = await self.file_deletion_pipe.run_async(
data={
"document_id_retriever": {"filters": filters},
}
)
return n_deleted_docs.get("document_deleter", {}).get("documents_deleted", 0)
generate_answer
async
Run the RAG pipeline to generate an answer based on a query.
This method processes the input query through a Retrieval-Augmented Generation (RAG) pipeline, which retrieves relevant documents and generates an answer based on that context.
| PARAMETER | DESCRIPTION |
|---|---|
query
|
The input query for which an answer is to be generated.
TYPE:
|
language_model
|
The model used for generating the answer.
TYPE:
|
source_ids
|
Document IDs of the files available for file retrieval.
TYPE:
|
max_chunks_to_use
|
Maximum number of chunks to use for the retrieval query.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
dict
|
A tuple containing a dictionary with generated answers and a dictionary of |
dict
|
documents used for answer generation. |
Source code in docs/microservices/rag/src/rag/pipelines/file_rag.py
async def generate_answer(
self,
query: str,
language_model: str,
source_ids: list[str] | None = None,
max_chunks_to_use: int | None = None,
) -> tuple[dict, dict]:
"""Run the RAG pipeline to generate an answer based on a query.
This method processes the input query through a Retrieval-Augmented Generation (RAG) pipeline,
which retrieves relevant documents and generates an answer based on that context.
Args:
query (str): The input query for which an answer is to be generated.
language_model (str): The model used for generating the answer.
source_ids (list[str] | None): Document IDs of the files available for file retrieval.
max_chunks_to_use (int | None): Maximum number of chunks to use for the retrieval query.
Returns:
A tuple containing a dictionary with generated answers and a dictionary of
documents used for answer generation.
"""
if self.rag_pipe is None:
raise AttributeError(
"`self.rag_pipe` is not deployed. Please run `self.deploy_rag_pipe`"
)
if not source_ids:
answer = "Bitte stellen Sie ein Dokument bereit."
documents = []
return answer, documents
meta_data_filters = {
"field": "meta.source_id",
"operator": "in",
"value": source_ids,
}
docs_component_name = self.rag_pipe.metadata["retrieved_docs_keys"][0]
max_chunks_to_use_component = self.rag_pipe.metadata[
"max_chunks_to_use_component"
][0]
data = {
"query": {"value": query},
"filters": {"value": meta_data_filters},
}
if (
max_chunks_to_use is None
and "max_chunks_to_use" in self.llm_generators[language_model]
):
max_chunks_to_use = self.llm_generators[language_model]["max_chunks_to_use"]
if max_chunks_to_use is not None:
data[max_chunks_to_use_component] = {"top_k": max_chunks_to_use}
self.add_llm_router_arg_inplace(language_model_key=language_model, data=data)
result = await self.rag_pipe.run_async(
data=data,
include_outputs_from=[docs_component_name],
)
documents = get_dict_value(
result, self.rag_pipe.metadata["retrieved_docs_keys"]
)
answer = get_dict_value(result, self.rag_pipe.metadata["rag_answers_keys"])
logger.debug(
"relevant sources for answer generation: "
+ "\n ####### \n".join([f"{s.meta} \n{s.content}" for s in documents])
)
return answer, documents
indexing
async
Run the file indexing pipeline on the provided files.
Takes a list of byte streams representing files and runs it with the provided data. Returns a list of source IDs.
| PARAMETER | DESCRIPTION |
|---|---|
files
|
A list of fastapi.UploadFile objects.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[str]
|
A list of source IDs generated by the indexing pipeline. |
Source code in docs/microservices/rag/src/rag/pipelines/file_rag.py
async def indexing(self, files: list[UploadFile]) -> list[str]:
"""Run the file indexing pipeline on the provided files.
Takes a list of byte streams representing files
and runs it with the provided data. Returns a list of source IDs.
Args:
files (list[UploadFile]): A list of fastapi.UploadFile objects.
Returns:
A list of source IDs generated by the indexing pipeline.
"""
if self.indexing_pipe is None:
raise AttributeError(
"`self.indexing_pipe` is not deployed. Please run `self.deploy_indexing_pipe`"
)
current_time = datetime.now(tz=pytz.timezone("Europe/Berlin"))
source_ids = []
for file in files:
try:
files_byte_stream = convert_uploadfile_to_byte_stream(file)
doc_id = self._compute_doc_hash(file)
source_ids.append(doc_id)
file_id_already_exists, existing_docs = self._check_if_file_id_exists(
doc_id
)
if file_id_already_exists:
logger.info(
f"File {file.filename} already exists. Skipping indexing. Updating ingestion date."
)
self._update_ingestion_date(existing_docs, current_time)
continue
result = await self.indexing_pipe.run_async(
data={
"file_type_router": {
"sources": [files_byte_stream],
"meta": {"ingestion_date": current_time},
},
"set_doc_id": {"doc_ids": [doc_id]},
}
)
if not result["set_doc_id"]["document_ids"]:
logger.error(
"Something during indexing went wrong. No Documents embedded."
)
except Exception as e:
logger.info(f"Error processing file {file.filename}: {e}")
continue
return source_ids
openai_custom_auth_client
This module provides an OpenAI client that supports Bearer-Token-Authentication and Basic-Authentication.
| CLASS | DESCRIPTION |
|---|---|
CustomAsyncAuthClient |
Custom HTTP transport for asynchronous OpenAI client. |
CustomAuthClient |
Custom HTTP transport for OpenAI client. |
CustomAsyncAuthClient
Bases: AsyncClient
Custom HTTP transport for asynchronous OpenAI client.
This class supports both Bearer Token Authentication and Basic Authentication.
If auth_type is 'token', the secret is expected to be the API key.
If auth_type is 'basic_auth', the secret is expected to be a base64-encoded string of 'username:password'.
| ATTRIBUTE | DESCRIPTION |
|---|---|
auth_header |
Authentication header for the httpx client.
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
send |
Synchronous method for sending HTTP requests. |
Source code in docs/microservices/rag/src/rag/pipelines/openai_custom_auth_client.py
class CustomAsyncAuthClient(httpx.AsyncClient):
"""Custom HTTP transport for asynchronous OpenAI client.
This class supports both Bearer Token Authentication and Basic Authentication.
If `auth_type` is 'token', the `secret` is expected to be the API key.
If `auth_type` is 'basic_auth', the `secret` is expected to be a base64-encoded string of 'username:password'.
Attributes:
auth_header (str): Authentication header for the httpx client.
Methods:
send(request, *args, **kwargs): Synchronous method for sending HTTP requests.
"""
def __init__(
self,
auth_header: str | None = "",
*args: object,
**kwargs: object,
) -> None:
"""Custom HTTP transport for OpenAI client that supports Bearer-Token and Basic Authentication.
Args:
auth_header (str | None): Header containing auth information
(bearer token or basic_auth `username:password`).
*args (object): Variable length argument list.
**kwargs (object): Arbitrary keyword arguments.
"""
super().__init__(*args, **kwargs)
self.auth_header = auth_header
async def send(
self, request: httpx.Request, *args: object, **kwargs: object
) -> httpx.Response:
"""Override the send method to remove the Authorization header if no authentication was set.
Args:
request (httpx.Request): The HTTP request to be sent.
*args (object): Variable length argument list.
**kwargs (object): Arbitrary keyword arguments.
Returns:
The HTTP response received.
"""
if "Authorization" in request.headers:
del request.headers["Authorization"]
if self.auth_header:
request.headers["Authorization"] = self.auth_header
return await super().send(request, *args, **kwargs)
send
async
Override the send method to remove the Authorization header if no authentication was set.
| PARAMETER | DESCRIPTION |
|---|---|
request
|
The HTTP request to be sent.
TYPE:
|
*args
|
Variable length argument list.
TYPE:
|
**kwargs
|
Arbitrary keyword arguments.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Response
|
The HTTP response received. |
Source code in docs/microservices/rag/src/rag/pipelines/openai_custom_auth_client.py
async def send(
self, request: httpx.Request, *args: object, **kwargs: object
) -> httpx.Response:
"""Override the send method to remove the Authorization header if no authentication was set.
Args:
request (httpx.Request): The HTTP request to be sent.
*args (object): Variable length argument list.
**kwargs (object): Arbitrary keyword arguments.
Returns:
The HTTP response received.
"""
if "Authorization" in request.headers:
del request.headers["Authorization"]
if self.auth_header:
request.headers["Authorization"] = self.auth_header
return await super().send(request, *args, **kwargs)
CustomAuthClient
Bases: Client
Custom HTTP transport for OpenAI client.
This class supports both Bearer Token Authentication and Basic Authentication.
If auth_type is 'token', the secret is expected to be the API key.
If auth_type is 'basic_auth', the secret is expected to be a base64-encoded string of 'username:password'.
| ATTRIBUTE | DESCRIPTION |
|---|---|
auth_header |
Authentication header for the httpx client.
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
send |
Synchronous method for sending HTTP requests. |
Source code in docs/microservices/rag/src/rag/pipelines/openai_custom_auth_client.py
class CustomAuthClient(httpx.Client):
"""Custom HTTP transport for OpenAI client.
This class supports both Bearer Token Authentication and Basic Authentication.
If `auth_type` is 'token', the `secret` is expected to be the API key.
If `auth_type` is 'basic_auth', the `secret` is expected to be a base64-encoded string of 'username:password'.
Attributes:
auth_header (str): Authentication header for the httpx client.
Methods:
send(request, *args, **kwargs): Synchronous method for sending HTTP requests.
"""
def __init__(
self,
auth_header: str | None = "",
*args: object,
**kwargs: object,
) -> None:
"""Custom HTTP transport for OpenAI client that supports Bearer-Token and Basic Authentication.
Args:
auth_header (str | None): Header containing auth information
(bearer token or basic_auth `username:password`).
*args (object): Variable length argument list.
**kwargs (object): Arbitrary keyword arguments.
"""
super().__init__(*args, **kwargs)
self.auth_header = auth_header
def send(
self, request: httpx.Request, *args: object, **kwargs: object
) -> httpx.Response:
"""Override the send method to remove the Authorization header if no authentication was set.
Args:
request (httpx.Request): The HTTP request to be sent.
*args (object): Variable length argument list.
**kwargs (object): Arbitrary keyword arguments.
Returns:
The HTTP response received.
"""
if "Authorization" in request.headers:
del request.headers["Authorization"]
if self.auth_header:
request.headers["Authorization"] = self.auth_header
return super().send(request, *args, **kwargs)
send
Override the send method to remove the Authorization header if no authentication was set.
| PARAMETER | DESCRIPTION |
|---|---|
request
|
The HTTP request to be sent.
TYPE:
|
*args
|
Variable length argument list.
TYPE:
|
**kwargs
|
Arbitrary keyword arguments.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Response
|
The HTTP response received. |
Source code in docs/microservices/rag/src/rag/pipelines/openai_custom_auth_client.py
def send(
self, request: httpx.Request, *args: object, **kwargs: object
) -> httpx.Response:
"""Override the send method to remove the Authorization header if no authentication was set.
Args:
request (httpx.Request): The HTTP request to be sent.
*args (object): Variable length argument list.
**kwargs (object): Arbitrary keyword arguments.
Returns:
The HTTP response received.
"""
if "Authorization" in request.headers:
del request.headers["Authorization"]
if self.auth_header:
request.headers["Authorization"] = self.auth_header
return super().send(request, *args, **kwargs)
pipeline_definitions
Implementation of the pipeline definitions of the RAG.
| MODULE | DESCRIPTION |
|---|---|
haystack_pipelines |
This module sets up a hybrid RAG pipeline for file and database question answering. |
pipeline_moduls |
This module provides functions to generate haystack RAG pipelines. |
haystack_pipelines
This module sets up a hybrid RAG pipeline for file and database question answering.
| FUNCTION | DESCRIPTION |
|---|---|
create_indexing_pipeline |
Create and optionally save a Haystack indexing pipeline. |
create_rag_pipeline |
Create the complete RAG pipeline consisting of retrieval and answer generation. |
create_retrieval_pipeline |
Create Haystack retrieval pipeline. |
get_rag_pipelines |
Generate and save hybrid RAG pipeline. |
| ATTRIBUTE | DESCRIPTION |
|---|---|
args |
Be very careful when using pipeline visualizations with sensitive data,
|
args
module-attribute
Be very careful when using pipeline visualizations with sensitive data, as the Mermaid graph service is an external server that does not have documented privacy policies or data retention guidelines. This increases the risk of unintended exposure of confidential information. mermaid-draw-params - format: Output format ('img', 'svg', or 'pdf'). Default: 'img'. - type: Image type for /img endpoint ('jpeg', 'png', 'webp'). Default: 'png'. - theme: Mermaid theme ('default', 'neutral', 'dark', 'forest'). Default: 'neutral'. - bgColor: Background color in hexadecimal (e.g., 'FFFFFF') or named format (e.g., '!white'). - width: Width of the output image (integer). - height: Height of the output image (integer). - scale: Scaling factor (1–3). Only applicable if 'width' or 'height' is specified. - fit: Whether to fit the diagram size to the page (PDF only, boolean). - paper: Paper size for PDFs (e.g., 'a4', 'a3'). Ignored if 'fit' is true. - landscape: Landscape orientation for PDFs (boolean). Ignored if 'fit' is true.
create_indexing_pipeline
Create and optionally save a Haystack indexing pipeline.
| PARAMETER | DESCRIPTION |
|---|---|
embedding_config
|
Embedding model configuration.
TYPE:
|
indexing_pipeline_config
|
Configuration for the indexing pipeline containing: - index (str): Elasticsearch index to store the documents to. - es_host (str): Elasticsearch host.
TYPE:
|
pipeline_save_config
|
Configuration for pipelines saving process: - file_path (str | Path): Path to save the pipeline to. - pipe_label (str): Pipeline label. - png (str or None): The type of PNG image to generate. Can be "mermaid", "local" or None.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
AsyncPipeline | None
|
The pipeline if not saved to disk, otherwise None. |
Source code in docs/microservices/rag/src/rag/pipelines/pipeline_definitions/haystack_pipelines.py
def create_indexing_pipeline(
embedding_config: EmbeddingModel,
indexing_pipeline_config: IndexingPipelineConfig,
pipeline_save_config: PipelineSaveConfig,
**_: object,
) -> AsyncPipeline | None:
"""Create and optionally save a Haystack indexing pipeline.
Args:
embedding_config (EmbeddingModel): Embedding model configuration.
indexing_pipeline_config (IndexingPipelineConfig): Configuration for the indexing pipeline containing:
- index (str): Elasticsearch index to store the documents to.
- es_host (str): Elasticsearch host.
pipeline_save_config (PipelineSaveConfig): Configuration for pipelines saving process:
- file_path (str | Path): Path to save the pipeline to.
- pipe_label (str): Pipeline label.
- png (str or None): The type of PNG image to generate. Can be "mermaid", "local" or None.
Returns:
The pipeline if not saved to disk, otherwise None.
"""
metadata = {"pipe_label": pipeline_save_config.pipe_label}
document_embedder = F13OpenAIDocumentEmbedder(
embedding_model=embedding_config,
timeout=settings.llm_api_timeout,
)
file_type_router = FileTypeRouter(
mime_types=[
"text/plain",
"application/pdf",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
]
)
document_store = ElasticsearchDocumentStore(
hosts=indexing_pipeline_config.es_host,
index=indexing_pipeline_config.index,
)
pdf_converter = PyPDFToDocument()
text_file_converter = TextFileToDocument()
docx_converter = TikaDocumentConverter()
set_id = IDSetter(
ignore_keys_for_doc_hash=[
"ingestion_date",
"source_id",
"id",
"embedding",
]
)
document_splitter = DocumentSplitter(
split_by="word",
split_length=200,
split_overlap=20,
)
converted_docs_joiner = DocumentJoiner()
document_writer = DocumentWriter(document_store)
indexing_pipeline = AsyncPipeline(metadata=metadata)
indexing_pipeline.add_component(instance=file_type_router, name="file_type_router")
indexing_pipeline.add_component(
instance=text_file_converter, name="text_file_converter"
)
indexing_pipeline.add_component(instance=pdf_converter, name="pdf_converter")
indexing_pipeline.add_component(instance=docx_converter, name="tika_converter")
indexing_pipeline.add_component(instance=set_id, name="set_doc_id")
indexing_pipeline.add_component(
instance=converted_docs_joiner, name="converted_docs_joiner"
)
indexing_pipeline.add_component("document_splitter", document_splitter)
indexing_pipeline.add_component("document_embedder", document_embedder)
indexing_pipeline.add_component("document_writer", document_writer)
indexing_pipeline.connect(
"file_type_router.text/plain", "text_file_converter.sources"
)
indexing_pipeline.connect(
"file_type_router.application/pdf", "pdf_converter.sources"
)
indexing_pipeline.connect(
"file_type_router.application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"tika_converter.sources",
)
indexing_pipeline.connect("text_file_converter", "converted_docs_joiner")
indexing_pipeline.connect("pdf_converter", "converted_docs_joiner")
indexing_pipeline.connect("tika_converter", "converted_docs_joiner")
indexing_pipeline.connect("converted_docs_joiner", "set_doc_id")
indexing_pipeline.connect("set_doc_id.documents", "document_splitter")
indexing_pipeline.connect("document_splitter", "document_embedder")
indexing_pipeline.connect("document_embedder", "document_writer")
# saving indexing pipeline
# saving pipeline only for the retrieval of documents
if pipeline_save_config.file_path:
save_pipe_to(
pipeline_save_config.file_path,
indexing_pipeline,
png=pipeline_save_config.png,
)
else:
return indexing_pipeline
create_rag_pipeline
create_rag_pipeline(embedding_config, retrieval_config, indexing_pipeline_config, pipeline_save_config, rag_llms=None, **_)
Create the complete RAG pipeline consisting of retrieval and answer generation.
| PARAMETER | DESCRIPTION |
|---|---|
retrieval_config
|
Reranker config.
TYPE:
|
embedding_config
|
Embedding model config.
TYPE:
|
indexing_pipeline_config
|
TYPE:
|
pipeline_save_config
|
Configuration for pipelines saving process: - file_path (str | Path): Path to save the pipeline to. - pipe_label (str): Pipeline label. - png (str or None): The type of PNG image to generate. Can be "mermaid", "local" or None.
TYPE:
|
rag_llms
|
RAG model config.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
AsyncPipeline
|
RAG pipeline. |
Source code in docs/microservices/rag/src/rag/pipelines/pipeline_definitions/haystack_pipelines.py
def create_rag_pipeline(
embedding_config: EmbeddingModel,
retrieval_config: RetrievalConfig,
indexing_pipeline_config: IndexingPipelineConfig,
pipeline_save_config: PipelineSaveConfig,
rag_llms: dict[str, LLM] = None,
**_: object,
) -> AsyncPipeline:
"""Create the complete RAG pipeline consisting of retrieval and answer generation.
Args:
retrieval_config (RetrievalConfig): Reranker config.
embedding_config (EmbeddingModel): Embedding model config.
indexing_pipeline_config (IndexingPipelineConfig):
- es_host (str): Elasticsearch host to connect to.
- index (str): Elasticsearch index to use for retrieval.
pipeline_save_config (PipelineSaveConfig): Configuration for pipelines saving process:
- file_path (str | Path): Path to save the pipeline to.
- pipe_label (str): Pipeline label.
- png (str or None): The type of PNG image to generate. Can be "mermaid", "local" or None.
rag_llms (dict[str, LLM]): RAG model config.
Returns:
RAG pipeline.
"""
retrieval_pipe = create_retrieval_pipeline(
retrieval_config=retrieval_config,
embedding_config=embedding_config,
indexing_pipeline_config=indexing_pipeline_config,
pipeline_save_config=pipeline_save_config,
)
pipe = add_answer_generation_pipeline(
retrieval_pipe=retrieval_pipe,
es_host=indexing_pipeline_config.es_host,
index=indexing_pipeline_config.index,
pipeline_save_config=pipeline_save_config,
rag_llms=rag_llms,
)
return pipe
create_retrieval_pipeline
create_retrieval_pipeline(embedding_config, retrieval_config, indexing_pipeline_config, pipeline_save_config, **_)
Create Haystack retrieval pipeline.
| PARAMETER | DESCRIPTION |
|---|---|
embedding_config
|
Embedding model config.
TYPE:
|
retrieval_config
|
Retrieval config.
TYPE:
|
indexing_pipeline_config
|
TYPE:
|
pipeline_save_config
|
Configuration for pipelines saving process: - file_path (str | Path): Path to save the pipeline to. - pipe_label (str): Pipeline label. - png (str or None): The type of PNG image to generate. Can be "mermaid", "local" or None.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
AsyncPipeline
|
Retrieval pipeline. |
Source code in docs/microservices/rag/src/rag/pipelines/pipeline_definitions/haystack_pipelines.py
def create_retrieval_pipeline(
embedding_config: EmbeddingModel,
retrieval_config: RetrievalConfig,
indexing_pipeline_config: IndexingPipelineConfig,
pipeline_save_config: PipelineSaveConfig,
**_: object,
) -> AsyncPipeline:
"""Create Haystack retrieval pipeline.
Args:
embedding_config (EmbeddingModel): Embedding model config.
retrieval_config (RetrievalConfig): Retrieval config.
indexing_pipeline_config (IndexingPipelineConfig):
- es_host (str): Elasticsearch host to connect to.
- index (str): Elasticsearch index to use for retrieval.
pipeline_save_config (PipelineSaveConfig): Configuration for pipelines saving process:
- file_path (str | Path): Path to save the pipeline to.
- pipe_label (str): Pipeline label.
- png (str or None): The type of PNG image to generate. Can be "mermaid", "local" or None.
Returns:
Retrieval pipeline.
"""
metadata = {"pipe_label": pipeline_save_config.pipe_label}
text_embedder = F13OpenAITextEmbedder(
embedding_model=embedding_config,
timeout=settings.llm_api_timeout,
)
document_store = ElasticsearchDocumentStore(
hosts=indexing_pipeline_config.es_host, index=indexing_pipeline_config.index
)
embedding_retriever = EmbeddingRetriever(
document_store=document_store, top_k=retrieval_config.retriever_top_k
)
bm25_retriever = BM25Retriever(
document_store=document_store, top_k=retrieval_config.retriever_top_k
)
if retrieval_config.include_ranker:
document_joiner_top_k = 2 * retrieval_config.retriever_top_k
else:
document_joiner_top_k = retrieval_config.max_chunks_to_use
result_document_joiner = DocumentJoiner(
join_mode="reciprocal_rank_fusion",
sort_by_score=True,
top_k=document_joiner_top_k,
)
query_branch = BranchJoiner(str)
filter_branch = BranchJoiner(dict[str, Any])
grouping = MetaFieldGroupingRanker(group_by="source_id")
retrieval_pipeline = AsyncPipeline(metadata=metadata)
retrieval_pipeline.add_component("query", query_branch)
retrieval_pipeline.add_component("filters", filter_branch)
retrieval_pipeline.add_component("text_embedder", text_embedder)
retrieval_pipeline.add_component("embedding_retriever", embedding_retriever)
retrieval_pipeline.add_component("bm25_retriever", bm25_retriever)
retrieval_pipeline.add_component(
instance=result_document_joiner, name="retrieved_docs_joiner"
)
retrieval_pipeline.add_component("document_grouping", grouping)
retrieval_pipeline.connect("query.value", "text_embedder")
retrieval_pipeline.connect(
"text_embedder.embedding", "embedding_retriever.query_embedding"
)
retrieval_pipeline.connect("filters.value", "embedding_retriever.filters")
retrieval_pipeline.connect("query.value", "bm25_retriever.query")
retrieval_pipeline.connect("filters.value", "bm25_retriever.filters")
retrieval_pipeline.connect("bm25_retriever", "retrieved_docs_joiner")
retrieval_pipeline.connect("embedding_retriever", "retrieved_docs_joiner")
# add ranker component
if retrieval_config.include_ranker:
ranker = SentenceTransformersSimilarityRanker(
model=retrieval_config.ranker_model_path,
meta_fields_to_embed=["title"],
score_threshold=retrieval_config.ranker_score_threshold,
top_k=retrieval_config.max_chunks_to_use,
)
retrieval_pipeline.add_component("ranker", ranker)
retrieval_pipeline.connect(
"retrieved_docs_joiner.documents", "ranker.documents"
)
retrieval_pipeline.connect("query.value", "ranker.query")
retrieval_pipeline.connect("ranker.documents", "document_grouping.documents")
retrieval_pipeline.metadata["max_chunks_to_use_component"] = ["ranker"]
else:
result_document_joiner.top_k = retrieval_config.max_chunks_to_use
retrieval_pipeline.connect(
"retrieved_docs_joiner.documents", "document_grouping.documents"
)
retrieval_pipeline.metadata["max_chunks_to_use_component"] = [
"retrieved_docs_joiner"
]
retrieval_pipeline.metadata["retrieved_docs_keys"] = [
"document_grouping",
"documents",
]
# saving pipeline only for the retrieval of documents
if pipeline_save_config.file_path:
save_pipe_to(
pipeline_save_config.file_path,
retrieval_pipeline,
png=pipeline_save_config.png,
)
else:
return retrieval_pipeline
get_rag_pipelines
Generate and save hybrid RAG pipeline.
| PARAMETER | DESCRIPTION |
|---|---|
es_host
|
Elasticsearch host, e.g., "http://localhost:9200".
TYPE:
|
Source code in docs/microservices/rag/src/rag/pipelines/pipeline_definitions/haystack_pipelines.py
def get_rag_pipelines(es_host: str, **_: object) -> None:
"""Generate and save hybrid RAG pipeline.
Args:
es_host (str): Elasticsearch host, e.g., "http://localhost:9200".
"""
logger.debug("Creating Haystack Pipelines.")
pipe_config = rag_pipeline_config.model_dump()["pipeline"]
# load configuration parameters of the LLMs to use from yaml file
rag_llms = llm_config.rag
embedding_config = llm_config.embedding.get(pipe_config["embedding_model_name"])
creation_dict = {
"deleter": create_deletion_pipeline,
"indexing": create_indexing_pipeline,
"rag": create_rag_pipeline,
}
pipeline_registry = {}
for rag_type in ["file", "database"]:
for task, creation_f in creation_dict.items():
if rag_type == "database" and task == "deleter":
continue
pipe_label = f"{rag_type}_{task}"
pipeline_save_config = PipelineSaveConfig(
file_path=None,
pipe_label=pipe_label,
png="local",
)
indexing_pipeline_config = IndexingPipelineConfig(
es_host=es_host,
index=pipe_config["index"][rag_type],
)
pipeline_registry[pipe_label] = creation_f(
embedding_config=embedding_config,
retrieval_config=rag_pipeline_config.pipeline.retrieval_config,
indexing_pipeline_config=indexing_pipeline_config,
pipeline_save_config=pipeline_save_config,
rag_llms=rag_llms,
)
return pipeline_registry
pipeline_moduls
This module provides functions to generate haystack RAG pipelines.
| FUNCTION | DESCRIPTION |
|---|---|
add_answer_generation_pipeline |
Create a Haystack pipeline for answer generation. |
create_deletion_pipeline |
Create a Haystack pipeline to delete documents from temporary storage. |
openai_generation_pipe_component |
Create the generation pipeline using an OpenAI-conform LLM API. |
add_answer_generation_pipeline
Create a Haystack pipeline for answer generation.
| PARAMETER | DESCRIPTION |
|---|---|
retrieval_pipe
|
The retrieval pipeline to use.
TYPE:
|
rag_llms
|
The RAG config to use.
TYPE:
|
pipeline_save_config
|
Configuration for pipelines saving process: - file_path (str | Path): Path to save the pipeline to. - pipe_label (str): Pipeline label. - png (str or None): The type of PNG image to generate. Can be "mermaid", "local" or None.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
AsyncPipeline
|
The Haystack pipeline. |
Source code in docs/microservices/rag/src/rag/pipelines/pipeline_definitions/pipeline_moduls.py
def add_answer_generation_pipeline(
retrieval_pipe: AsyncPipeline,
rag_llms: dict[str, LLM],
pipeline_save_config: PipelineSaveConfig,
**_: object,
) -> AsyncPipeline:
"""Create a Haystack pipeline for answer generation.
Args:
retrieval_pipe (AsyncPipeline): The retrieval pipeline to use.
rag_llms (dict[str, RAG]): The RAG config to use.
pipeline_save_config (PipelineSaveConfig): Configuration for pipelines saving process:
- file_path (str | Path): Path to save the pipeline to.
- pipe_label (str): Pipeline label.
- png (str or None): The type of PNG image to generate. Can be "mermaid", "local" or None.
Returns:
The Haystack pipeline.
"""
pipe = retrieval_pipe
metadata = {"pipe_label": pipeline_save_config.pipe_label}
generator_info = {}
# initialize conditional LLM routing
llm_routes = [
{
"condition": "{{ generator_model| string == '" + llm_name + "' }}",
"output": "{{query}}",
"output_name": llm_name,
"output_type": str,
}
for llm_name, llm in rag_llms.items()
]
joiner = BranchJoiner(str)
rag_answer_component_name = "rag_answers"
pipe.add_component(rag_answer_component_name, joiner)
pipe.add_component("llm_router", ConditionalRouter(routes=llm_routes))
pipe.connect("query", "llm_router.query")
docs_component_name = pipe.metadata["retrieved_docs_keys"][0]
metadata["rag_answers_keys"] = [rag_answer_component_name, "value"]
# initialize LLMs from LLMConfig File (RAG section)
if rag_llms:
for llm_key, llm in rag_llms.items():
generator_info[llm_key] = llm.model_dump(
include=["label", "model", "max_chunks_to_use"]
)
generation_config = GenerationConfig(
docs_sender_name=docs_component_name,
routed_query_name=f"llm_router.{llm_key}",
name_prefix=llm_key,
rag_answer_component_name=rag_answer_component_name,
)
pipe = openai_generation_pipe_component(
pipe=pipe,
llm=llm,
config=generation_config,
)
metadata["llm_generators"] = generator_info
pipe.metadata.update(metadata)
# saving pipeline only for the retrieval of documents
if pipeline_save_config.file_path:
save_pipe_to(pipeline_save_config.file_path, pipe, png=pipeline_save_config.png)
return pipe
create_deletion_pipeline
Create a Haystack pipeline to delete documents from temporary storage.
| PARAMETER | DESCRIPTION |
|---|---|
indexing_pipeline_config
|
TYPE:
|
pipeline_save_config
|
Configuration for pipelines saving process: - file_path (str | Path): Path to save the pipeline to. - pipe_label (str): Pipeline label. - png (str or None): The type of PNG image to generate. Can be "mermaid", "local" or None.
TYPE:
|
Source code in docs/microservices/rag/src/rag/pipelines/pipeline_definitions/pipeline_moduls.py
def create_deletion_pipeline(
indexing_pipeline_config: IndexingPipelineConfig,
pipeline_save_config: PipelineSaveConfig,
**_: object,
) -> None:
"""Create a Haystack pipeline to delete documents from temporary storage.
Args:
indexing_pipeline_config (IndexingPipelineConfig):
- es_host (str): Elasticsearch host to connect to.
- index (str): Elasticsearch index to use for retrieval.
pipeline_save_config (PipelineSaveConfig): Configuration for pipelines saving process:
- file_path (str | Path): Path to save the pipeline to.
- pipe_label (str): Pipeline label.
- png (str or None): The type of PNG image to generate. Can be "mermaid", "local" or None.
"""
metadata = {"pipe_label": pipeline_save_config.pipe_label}
document_store = DocumentStore(
hosts=indexing_pipeline_config.es_host, index=indexing_pipeline_config.index
)
document_deleter = DocumentDeleter(document_store=document_store)
document_id_retriever = DocumentIDRetriever(document_store=document_store)
delete_temporary_pipe = AsyncPipeline(metadata=metadata)
delete_temporary_pipe.add_component(
instance=document_deleter, name="document_deleter"
)
delete_temporary_pipe.add_component(
instance=document_id_retriever, name="document_id_retriever"
)
delete_temporary_pipe.connect("document_id_retriever", "document_deleter")
# saving pipeline only for the retrieval of documents
if pipeline_save_config.file_path:
save_pipe_to(
pipeline_save_config.file_path,
delete_temporary_pipe,
png=pipeline_save_config.png,
)
else:
return delete_temporary_pipe
openai_generation_pipe_component
Create the generation pipeline using an OpenAI-conform LLM API.
| PARAMETER | DESCRIPTION |
|---|---|
pipe
|
Pipeline to serialize and optionally generate an image for.
TYPE:
|
llm
|
Language model object used for response generation.
TYPE:
|
config
|
Configuration object containing: - docs_sender_name (str): Name of the document sender component. - routed_query_name (str): Name of the routed query component. - name_prefix (str): Prefix used for naming pipeline components. - rag_answer_component_name (str): Name of the final RAG answer component.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
AsyncPipeline
|
Generated Haystack pipeline. |
Source code in docs/microservices/rag/src/rag/pipelines/pipeline_definitions/pipeline_moduls.py
def openai_generation_pipe_component(
pipe: AsyncPipeline,
llm: LLM,
config: GenerationConfig,
) -> AsyncPipeline:
"""Create the generation pipeline using an OpenAI-conform LLM API.
Args:
pipe (AsyncPipeline): Pipeline to serialize and optionally generate an image for.
llm (LLM): Language model object used for response generation.
config (GenerationConfig): Configuration object containing:
- docs_sender_name (str): Name of the document sender component.
- routed_query_name (str): Name of the routed query component.
- name_prefix (str): Prefix used for naming pipeline components.
- rag_answer_component_name (str): Name of the final RAG answer component.
Returns:
Generated Haystack pipeline.
"""
name_prefix = f"{config.name_prefix}_" if config.name_prefix else ""
generation_kwargs = {
"max_completion_tokens": llm.inference.max_new_tokens,
"temperature": llm.inference.temperature,
"top_p": llm.inference.top_p,
}
generator = F13OpenAIChatGenerator(
llm=llm,
generation_kwargs=generation_kwargs,
timeout=settings.llm_api_timeout,
)
adapter = OutputAdapter(template="{{ replies[0].text }}", output_type=str)
pipe.add_component(
f"{name_prefix}prompt_builder",
ChatPromptBuilder(
template=[
ChatMessage.from_system(llm.prompt_config.system.generate),
ChatMessage.from_user(llm.prompt_config.user.generate),
],
required_variables=["query", "documents"],
),
)
pipe.add_component(f"{name_prefix}adapter", adapter)
pipe.add_component(f"{name_prefix}llm", generator)
pipe.connect(config.docs_sender_name, f"{name_prefix}prompt_builder.documents")
pipe.connect(config.routed_query_name, f"{name_prefix}prompt_builder.query")
pipe.connect(f"{name_prefix}prompt_builder", f"{name_prefix}llm")
pipe.connect(f"{name_prefix}llm.replies", f"{name_prefix}adapter")
pipe.connect(f"{name_prefix}adapter", config.rag_answer_component_name)
return pipe
rag_registry
RAG-Registry class for setting-up, storing and accessing RAG pipelines.
| CLASS | DESCRIPTION |
|---|---|
RAGRegistry |
Sets up and stores RAG pipelines and makes access possible. |
RAGRegistry
Sets up and stores RAG pipelines and makes access possible.
| ATTRIBUTE | DESCRIPTION |
|---|---|
file_rag |
Instance of the FileRag-Pipeline.
TYPE:
|
database_rag |
Instance of the DatabaseRag-Pipeline.
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
file_rag_tempfile_deletion |
Runs the temporary file deleter pipeline to delete files older than a specified timestamp. |
index_database_rag |
Run the database indexing pipeline on the provided files. |
run_database_rag |
Run the database-rag pipeline to generate an answer based on a query. |
run_file_rag |
Run the file-rag pipeline to generate an answer based on a query. |
Source code in docs/microservices/rag/src/rag/rag_registry.py
class RAGRegistry:
"""Sets up and stores RAG pipelines and makes access possible.
Attributes:
file_rag (FileRagPipe): Instance of the FileRag-Pipeline.
database_rag (DatabaseRagPipe): Instance of the DatabaseRag-Pipeline.
"""
def __init__(self) -> None:
"""Initializes the RAG Pipelines."""
self.file_rag: FileRagPipe
self.database_rag: DatabaseRagPipe
self._check_for_ranker_model()
self.file_rag, self.database_rag = self._initialize_pipelines()
def _check_for_ranker_model(self) -> None:
"""Checks if the RAG pipeline has a ranker model defined and tries to download it."""
if rag_pipeline_config.pipeline.retrieval_config.include_ranker:
logger.info("RAG pipeline has a ranker model defined.")
reranker_model_path = download_reranker_model()
if not reranker_model_path:
logger.critical(
"Re-ranker model could not be downloaded. Creating Retrieval-Pipeline without Re-ranker."
)
rag_pipeline_config.pipeline.retrieval_config.include_ranker = False
def _initialize_pipelines(self) -> tuple[FileRagPipe, DatabaseRagPipe]:
try:
file_rag, database_rag = setup_rag(
debug_haystack_pipelines=settings.debug_haystack_pipelines
)
return file_rag, database_rag
except Exception as e:
logger.exception(e)
sys.exit(-1) # Terminate application if pipeline not available
async def run_database_rag(self, rag_input: RAGInput) -> list[RAGOutput]:
"""Run the database-rag pipeline to generate an answer based on a query.
This method processes the input query through a Retrieval-Augmented Generation (RAG) pipeline,
which retrieves relevant documents and generates an answer based on that context.
Args:
rag_input (RAGInput): A dataclass containing the list of questions.
Returns:
A list of dictionaries, each containing a `'question'`, `'answer'`, and `'sources'`.
"""
if rag_input.language_model not in settings.active_llms.rag:
logger.error(f"Invalid language model selected: {rag_input.language_model}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=(
f"Es wurde ein ungültiges Sprachmodell ausgewählt ({rag_input.language_model})."
" Bitte versuchen Sie es mit einem anderen Modell."
),
)
question = rag_input.question
try:
logger.info("Dokumente werden gesucht und eine Antwort wird generiert.")
meta_data_filters = rag_input.meta_data_filters
answer, sources = await self.database_rag.generate_answer(
query=question,
language_model=rag_input.language_model,
meta_data_filters=meta_data_filters,
max_chunks_to_use=rag_input.max_chunks_to_use,
)
if len(sources) == 0:
logger.warning("No matching documents retrieved.")
except Exception as e:
logger.error(f"Answer generation failed: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Die Beantwortung der Frage ist fehlgeschlagen.",
)
result = apply_output_format(question, answer, sources)
logger.debug(f"Answer of the Database-RAG: {result}")
return [result]
async def index_database_rag(
self, db_ingestion: DBIngestionInput
) -> dict[str, Any]:
"""Run the database indexing pipeline on the provided files.
Args:
db_ingestion: DBIngestionInput: A client-defined connection input containing the ingestion data.
Returns:
A status and message response, for example:
{
"status": "success",
"message": "Ingested <n> documents and <m> chunks into the database."
}
"""
logger.info("Dokumente werden hochgeladen.")
docs = [Document(**doc.model_dump()) for doc in db_ingestion.documents]
n_docs = 0
n_chunks = 0
failed_batches = []
batches = split_list_into_batches(docs, db_ingestion.batch_size)
for batch in batches:
try:
n_docs_batch, n_batch_chunks = await self.database_rag.indexing(
files=batch, double_doc_handling=db_ingestion.double_doc_handling
)
n_docs += n_docs_batch
n_chunks += n_batch_chunks
if n_docs_batch == 0:
failed_batches.append(
[batch_item.to_dict() for batch_item in batch]
)
except Exception as e:
logger.exception(
f"Error during indexing of the current batch. Error: {e}"
)
failed_batches.append([batch_item.to_dict() for batch_item in batch])
result = {
"status": "success",
"message": f"Ingested {n_docs} documents and {n_chunks} chunks into the database.",
}
if failed_batches:
result["status"] = "failed"
result["failed_batches"] = str(failed_batches)
return result
async def run_file_rag(
self, rag_input: RAGInput, files: list[UploadFile]
) -> list[RAGOutput]:
"""Run the file-rag pipeline to generate an answer based on a query.
This method processes the input query through a Retrieval-Augmented Generation (RAG) pipeline,
which retrieves relevant documents and generates an answer based on that context.
Args:
rag_input (RAGInput): A dataclass containing the list of questions.
files (list[UploadFiles]): A list of fastapi.UploadFile objects.
Returns:
A list of dictionaries, each containing a `'question'`, `'answer'`, and `'sources'`.
"""
if rag_input.language_model not in settings.active_llms.rag:
logger.error(f"Invalid language model selected: {rag_input.language_model}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=(
f"Es wurde ein ungültiges Sprachmodell ausgewählt ({rag_input.language_model})."
" Bitte versuchen Sie es mit einem anderen Modell."
),
)
source_ids = await self._index_file_rag(files_byte_stream=files)
return [await self._answer_file_rag(rag_input, source_ids=source_ids)]
async def _answer_file_rag(
self, rag_input: RAGInput, source_ids: list[str] = None
) -> RAGOutput:
"""Run the file-rag pipeline to generate an answer based on a query.
This method processes the input query through a Retrieval-Augmented Generation (RAG) pipeline,
which retrieves relevant documents and generates an answer based on that context.
Args:
rag_input (RAGInput): A dataclass containing the list of questions.
source_ids (list[str]): A list of document IDs to restrict the retrieval context.
Returns:
The question, generated answer, and sources.
"""
question = rag_input.question
try:
logger.info(
"Dokumente werden gesucht und eine Antwort wird generiert.",
)
answer, sources = await self.file_rag.generate_answer(
query=question,
language_model=rag_input.language_model,
source_ids=source_ids,
max_chunks_to_use=rag_input.max_chunks_to_use,
)
if len(sources) == 0:
logger.warning(
"No matching documents retrieved.",
)
except Exception as e:
logger.error(f"Answer generation failed: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Die Beantwortung dieser Frage ist fehlgeschlagen.",
)
result = apply_output_format(question, answer, sources)
logger.debug(f"Answer of the File-RAG: {result}")
return result
async def _index_file_rag(self, files_byte_stream: list) -> list[str]:
"""Run the file indexing pipeline on the provided files.
Takes a list of byte streams representing files, loads and warms up the indexing pipeline,
and runs it with the provided data. Returns a list of source IDs.
Args:
files_byte_stream (list): A list of byte streams representing files.
Returns:
A list of source IDs generated by the indexing pipeline.
"""
logger.info("{%d}} Dokumente werden hochgeladen.", len(files_byte_stream))
try:
doc_ids = await self.file_rag.indexing(files=files_byte_stream)
except Exception:
logger.exception("Unexpected error during indexing.")
doc_ids = []
return doc_ids
async def file_rag_tempfile_deletion(self, timestamp: datetime.datetime) -> int:
"""Runs the temporary file deleter pipeline to delete files older than a specified timestamp.
The pipeline uses a filter to delete documents with an ingestion date
older than or equal to the provided timestamp.
It then runs the retrieval pipeline to delete these documents.
Args:
timestamp (datetime): The maximum ingestion datetime for files to be deleted.
Returns:
The number of documents deleted by the pipeline.
Raises:
Exception: If any error occurs during pipeline execution.
"""
try:
n_deleted_files = await self.file_rag.file_deletion(timestamp=timestamp)
except Exception:
logger.exception("Unexpected error during temporary file deletion.")
n_deleted_files = 0
return n_deleted_files
file_rag_tempfile_deletion
async
Runs the temporary file deleter pipeline to delete files older than a specified timestamp.
The pipeline uses a filter to delete documents with an ingestion date older than or equal to the provided timestamp. It then runs the retrieval pipeline to delete these documents.
| PARAMETER | DESCRIPTION |
|---|---|
timestamp
|
The maximum ingestion datetime for files to be deleted.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
int
|
The number of documents deleted by the pipeline. |
| RAISES | DESCRIPTION |
|---|---|
Exception
|
If any error occurs during pipeline execution. |
Source code in docs/microservices/rag/src/rag/rag_registry.py
async def file_rag_tempfile_deletion(self, timestamp: datetime.datetime) -> int:
"""Runs the temporary file deleter pipeline to delete files older than a specified timestamp.
The pipeline uses a filter to delete documents with an ingestion date
older than or equal to the provided timestamp.
It then runs the retrieval pipeline to delete these documents.
Args:
timestamp (datetime): The maximum ingestion datetime for files to be deleted.
Returns:
The number of documents deleted by the pipeline.
Raises:
Exception: If any error occurs during pipeline execution.
"""
try:
n_deleted_files = await self.file_rag.file_deletion(timestamp=timestamp)
except Exception:
logger.exception("Unexpected error during temporary file deletion.")
n_deleted_files = 0
return n_deleted_files
index_database_rag
async
Run the database indexing pipeline on the provided files.
| PARAMETER | DESCRIPTION |
|---|---|
db_ingestion
|
DBIngestionInput: A client-defined connection input containing the ingestion data.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
dict[str, Any]
|
A status and message response, for example: |
dict[str, Any]
|
{
"status": "success",
"message": "Ingested |
dict[str, Any]
|
} |
Source code in docs/microservices/rag/src/rag/rag_registry.py
async def index_database_rag(
self, db_ingestion: DBIngestionInput
) -> dict[str, Any]:
"""Run the database indexing pipeline on the provided files.
Args:
db_ingestion: DBIngestionInput: A client-defined connection input containing the ingestion data.
Returns:
A status and message response, for example:
{
"status": "success",
"message": "Ingested <n> documents and <m> chunks into the database."
}
"""
logger.info("Dokumente werden hochgeladen.")
docs = [Document(**doc.model_dump()) for doc in db_ingestion.documents]
n_docs = 0
n_chunks = 0
failed_batches = []
batches = split_list_into_batches(docs, db_ingestion.batch_size)
for batch in batches:
try:
n_docs_batch, n_batch_chunks = await self.database_rag.indexing(
files=batch, double_doc_handling=db_ingestion.double_doc_handling
)
n_docs += n_docs_batch
n_chunks += n_batch_chunks
if n_docs_batch == 0:
failed_batches.append(
[batch_item.to_dict() for batch_item in batch]
)
except Exception as e:
logger.exception(
f"Error during indexing of the current batch. Error: {e}"
)
failed_batches.append([batch_item.to_dict() for batch_item in batch])
result = {
"status": "success",
"message": f"Ingested {n_docs} documents and {n_chunks} chunks into the database.",
}
if failed_batches:
result["status"] = "failed"
result["failed_batches"] = str(failed_batches)
return result
run_database_rag
async
Run the database-rag pipeline to generate an answer based on a query.
This method processes the input query through a Retrieval-Augmented Generation (RAG) pipeline, which retrieves relevant documents and generates an answer based on that context.
| PARAMETER | DESCRIPTION |
|---|---|
rag_input
|
A dataclass containing the list of questions.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[RAGOutput]
|
A list of dictionaries, each containing a |
Source code in docs/microservices/rag/src/rag/rag_registry.py
async def run_database_rag(self, rag_input: RAGInput) -> list[RAGOutput]:
"""Run the database-rag pipeline to generate an answer based on a query.
This method processes the input query through a Retrieval-Augmented Generation (RAG) pipeline,
which retrieves relevant documents and generates an answer based on that context.
Args:
rag_input (RAGInput): A dataclass containing the list of questions.
Returns:
A list of dictionaries, each containing a `'question'`, `'answer'`, and `'sources'`.
"""
if rag_input.language_model not in settings.active_llms.rag:
logger.error(f"Invalid language model selected: {rag_input.language_model}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=(
f"Es wurde ein ungültiges Sprachmodell ausgewählt ({rag_input.language_model})."
" Bitte versuchen Sie es mit einem anderen Modell."
),
)
question = rag_input.question
try:
logger.info("Dokumente werden gesucht und eine Antwort wird generiert.")
meta_data_filters = rag_input.meta_data_filters
answer, sources = await self.database_rag.generate_answer(
query=question,
language_model=rag_input.language_model,
meta_data_filters=meta_data_filters,
max_chunks_to_use=rag_input.max_chunks_to_use,
)
if len(sources) == 0:
logger.warning("No matching documents retrieved.")
except Exception as e:
logger.error(f"Answer generation failed: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Die Beantwortung der Frage ist fehlgeschlagen.",
)
result = apply_output_format(question, answer, sources)
logger.debug(f"Answer of the Database-RAG: {result}")
return [result]
run_file_rag
async
Run the file-rag pipeline to generate an answer based on a query.
This method processes the input query through a Retrieval-Augmented Generation (RAG) pipeline, which retrieves relevant documents and generates an answer based on that context.
| PARAMETER | DESCRIPTION |
|---|---|
rag_input
|
A dataclass containing the list of questions.
TYPE:
|
files
|
A list of fastapi.UploadFile objects.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[RAGOutput]
|
A list of dictionaries, each containing a |
Source code in docs/microservices/rag/src/rag/rag_registry.py
async def run_file_rag(
self, rag_input: RAGInput, files: list[UploadFile]
) -> list[RAGOutput]:
"""Run the file-rag pipeline to generate an answer based on a query.
This method processes the input query through a Retrieval-Augmented Generation (RAG) pipeline,
which retrieves relevant documents and generates an answer based on that context.
Args:
rag_input (RAGInput): A dataclass containing the list of questions.
files (list[UploadFiles]): A list of fastapi.UploadFile objects.
Returns:
A list of dictionaries, each containing a `'question'`, `'answer'`, and `'sources'`.
"""
if rag_input.language_model not in settings.active_llms.rag:
logger.error(f"Invalid language model selected: {rag_input.language_model}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=(
f"Es wurde ein ungültiges Sprachmodell ausgewählt ({rag_input.language_model})."
" Bitte versuchen Sie es mit einem anderen Modell."
),
)
source_ids = await self._index_file_rag(files_byte_stream=files)
return [await self._answer_file_rag(rag_input, source_ids=source_ids)]
utils
This module contains helper functions for RAG pipelines.
| FUNCTION | DESCRIPTION |
|---|---|
apply_output_format |
Apply the output format to a question, its answer, and the associated sources with metadata. |
convert_uploadfile_to_byte_stream |
Convert an uploaded file object into a |
format_content_text |
Format the content text for display in the UI. |
get_dict_value |
Extract a nested value from a dictionary. |
save_pipe_to |
Serialize the pipe configuration to a YAML file and optionally generate a PNG image representing the pipe. |
save_to_yaml |
Save data to a YAML file. |
split_list_into_batches |
Split a list into batches of |
apply_output_format
Apply the output format to a question, its answer, and the associated sources with metadata.
| PARAMETER | DESCRIPTION |
|---|---|
question
|
The question to be formatted.
TYPE:
|
answer
|
The answer to the question.
TYPE:
|
sources_raw
|
A list of source objects with associated metadata.
TYPE:
|
db2ui_map
|
Mapping from database metadata_keys to the displayed strings in the UI. Default mapping is defined in the rag_pipeline_config
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
RAGOutput
|
The formatted question, answer, and sources. |
Source code in docs/microservices/rag/src/rag/utils.py
def apply_output_format(
question: str, answer: str, sources_raw: list, db2ui_map: dict | None = None
) -> RAGOutput:
"""Apply the output format to a question, its answer, and the associated sources with metadata.
Args:
question (str): The question to be formatted.
answer (str): The answer to the question.
sources_raw (list[Any]): A list of source objects with associated metadata.
db2ui_map (dict[str,str] | None): Mapping from database metadata_keys to the displayed strings in the UI.
Default mapping is defined in the rag_pipeline_config
Returns:
The formatted question, answer, and sources.
"""
if db2ui_map is None:
db2ui_map = rag_pipeline_config.db2ui_map.model_dump()
sources = []
for source in sources_raw:
# extract metadata listed in db2ui-map and apply human-readable name
metadata = {db2ui_map[k]: v for k, v in source.meta.items() if k in db2ui_map}
# extract url if set
source_url = source.meta.get("url", None)
# apply content-formatting
formatted_content = format_content_text(source.content)
sources.append(
{"content": formatted_content, "meta": metadata, "url": source_url}
)
out = RAGOutput(question=question, answer=answer, sources=sources)
return out
convert_uploadfile_to_byte_stream
Convert an uploaded file object into a ByteStream instance.
Reads the file content and wraps it in a ByteStream along with metadata
such as the file name and MIME type.
| PARAMETER | DESCRIPTION |
|---|---|
upload_file
|
The uploaded file object (e.g., an instance of FastAPI's
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ByteStream
|
An object containing the file's bytes, file name, and MIME type. |
Source code in docs/microservices/rag/src/rag/utils.py
def convert_uploadfile_to_byte_stream(upload_file: UploadFile) -> ByteStream:
"""Convert an uploaded file object into a `ByteStream` instance.
Reads the file content and wraps it in a `ByteStream` along with metadata
such as the file name and MIME type.
Args:
upload_file: The uploaded file object (e.g., an instance of FastAPI's `UploadFile`).
Returns:
An object containing the file's bytes, file name, and MIME type.
"""
byte_stream = ByteStream(
data=upload_file.file.read(),
meta={"file_name": upload_file.filename},
mime_type=upload_file.content_type,
)
return byte_stream
format_content_text
Format the content text for display in the UI.
| PARAMETER | DESCRIPTION |
|---|---|
content_text
|
The content text to be formatted.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
str
|
The formatted content text. |
Source code in docs/microservices/rag/src/rag/utils.py
def format_content_text(content_text: str) -> str:
"""Format the content text for display in the UI.
Args:
content_text (str): The content text to be formatted.
Returns:
The formatted content text.
"""
# if content is None return empty string
if not content_text:
return ""
# remove single linebreaks
text = content_text.replace("\n\n", "<|LINEBREAK|>")
text = text.replace("-\n", "")
text = text.replace("\n", " ")
text = text.replace("<|LINEBREAK|>", "\n")
# remove leading metadata
text = text.split(rag_pipeline_config.pipeline.metadata_title_separator)[-1]
text = text.strip()
return text
get_dict_value
Extract a nested value from a dictionary.
| PARAMETER | DESCRIPTION |
|---|---|
dd
|
The dictionary from which the value will be extracted.
TYPE:
|
keys
|
A list representing the path of keys to the desired value.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
dict
|
The extracted value from the dictionary. |
Source code in docs/microservices/rag/src/rag/utils.py
def get_dict_value(dd: dict, keys: list[str]) -> dict:
"""Extract a nested value from a dictionary.
Args:
dd (dict): The dictionary from which the value will be extracted.
keys (list): A list representing the path of keys to the desired value.
Returns:
The extracted value from the dictionary.
"""
d = deepcopy(dd)
for k in keys:
d = d[k]
return d
save_pipe_to
Serialize the pipe configuration to a YAML file and optionally generate a PNG image representing the pipe.
| PARAMETER | DESCRIPTION |
|---|---|
file_path
|
The path where the pipe configuration will be saved (as a YAML file).
TYPE:
|
pipe
|
The pipe to serialize and optionally generate an image for.
TYPE:
|
png
|
The type of PNG image to generate. Can be "mermaid", "local", or
TYPE:
|
| RAISES | DESCRIPTION |
|---|---|
ImportError
|
If local PNG export fails due to a missing or faulty dependency. |
Source code in docs/microservices/rag/src/rag/utils.py
def save_pipe_to(file_path: str, pipe: AsyncPipeline, png: str = None) -> None:
"""Serialize the pipe configuration to a YAML file and optionally generate a PNG image representing the pipe.
Args:
file_path (str): The path where the pipe configuration will be saved (as a YAML file).
pipe (AsyncPipeline): The pipe to serialize and optionally generate an image for.
png (str or None): The type of PNG image to generate. Can be "mermaid", "local", or `None`.
Raises:
ImportError: If local PNG export fails due to a missing or faulty dependency.
"""
pipe_config = pipe.to_dict()
save_to_yaml(file_path, pipe_config)
png_path = Path(file_path).with_suffix(".png")
if png == "mermaid":
try:
pipe.draw(png_path)
except Exception as e:
logger.warning(
f"There was an issues with mermaid.ink to draw the pipeline: {e}"
)
elif png == "local":
spec = importlib.util.find_spec("pygraphviz")
if spec is not None:
draw_pipeline(graph=pipe.graph, path=png_path)
else:
logger.warning(
"Pipeline PNG-Export failed. Could not import `pygraphviz`. Please install via: \n"
"'pip install pygraphviz'\n"
"(You might need to run this first: 'apt install libgraphviz-dev graphviz')"
)
elif png is None:
pass
else:
logger.warning(f"no png generated. Option {png=} does not exist")
save_to_yaml
Save data to a YAML file.
| PARAMETER | DESCRIPTION |
|---|---|
file_path
|
Path to the YAML file.
TYPE:
|
data_to_write
|
Data to write to the YAML file.
TYPE:
|
Source code in docs/microservices/rag/src/rag/utils.py
def save_to_yaml(file_path: str, data_to_write: dict) -> None:
"""Save data to a YAML file.
Args:
file_path (str): Path to the YAML file.
data_to_write (dict): Data to write to the YAML file.
"""
with open(file_path, "w") as outfile:
yaml.dump(data_to_write, outfile, default_flow_style=False, allow_unicode=True)
split_list_into_batches
Split a list into batches of n elements.
| PARAMETER | DESCRIPTION |
|---|---|
lst
|
The list to be split.
TYPE:
|
n
|
The size of each batch.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list
|
A list of batches, where each batch is a sublist of length |
Source code in docs/microservices/rag/src/rag/utils.py
def split_list_into_batches(lst: list, n: int) -> list:
"""Split a list into batches of `n` elements.
Args:
lst (list): The list to be split.
n (int): The size of each batch.
Returns:
A list of batches, where each batch is a sublist of length `n` (except possibly the last one).
"""
return [lst[i : i + n] for i in range(0, len(lst), n)]
settings
Load all settings from a central place, not hidden in utils.
utils
Utils functions for logging, LLM availability check and configuration processing.
| MODULE | DESCRIPTION |
|---|---|
base_logger |
Set up the root logger for the entire application. This logger will log messages to the console and a file. |
check_model_api_availability |
This module provides functions to check LLM-APIs for availability. |
delete_elasticsearch_tempfiles |
This module contains a function to delete documents in the tempfile index in the Elasticsearch database. |
download_reranker_model |
Download re-ranker model defined in rag_pipeline_config.yml from huggingface. |
process_configs |
Methods to load and config and start checks of config integrity. |
base_logger
Set up the root logger for the entire application. This logger will log messages to the console and a file.
| FUNCTION | DESCRIPTION |
|---|---|
setup_logger |
Initialize the logger with the desired log level and add handlers. |
setup_logger
Initialize the logger with the desired log level and add handlers.
Sets up the root logger, which all other loggers inherit from. Adds file, console and exit handlers to the logger and sets the format.
Source code in docs/microservices/rag/src/utils/base_logger.py
def setup_logger() -> None:
"""Initialize the logger with the desired log level and add handlers.
Sets up the root logger, which all other loggers inherit from.
Adds file, console and exit handlers to the logger and sets the format.
"""
# root logger, all other loggers inherit from this
logger = logging.getLogger()
# create different handlers for log file and console
file_handler = logging.handlers.RotatingFileHandler(
filename=settings.log_file,
maxBytes=settings.log_file_max_bytes,
backupCount=settings.log_file_backup_count,
)
console_handler = logging.StreamHandler()
# define log format and set for each handler
formatter = logging.Formatter(
fmt="%(asctime)s - %(levelname)8s - %(module)s - %(funcName)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S%z",
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# add handlers to the logger
logger.addHandler(file_handler)
logger.addHandler(console_handler)
# Set level for 'haystack' logger
logging.getLogger("haystack").setLevel(settings.haystack_log_level)
# Set global logger level
logger.setLevel(settings.log_level)
check_model_api_availability
This module provides functions to check LLM-APIs for availability.
To check a certain LLM use await check_model_api(llm).
To get all LLMs that are activated in configs/general.yml, use await get_available_llms().
| FUNCTION | DESCRIPTION |
|---|---|
get_available_llms |
Returns a list of available LLMs. |
is_model_api_available |
Check if API is available using credentials. |
get_available_llms
async
Returns a list of available LLMs.
| RETURNS | DESCRIPTION |
|---|---|
list[dict[str, str]]
|
List of available LLMs with selected infos. |
Source code in docs/microservices/rag/src/utils/check_model_api_availability.py
async def get_available_llms() -> list[dict[str, str]]:
"""Returns a list of available LLMs.
Returns:
List of available LLMs with selected infos.
"""
available_llms = {}
# iterate over model_groups (services), i.e. chat, RAG, embedding, ...
for model_group_key in llm_config:
available_llms[model_group_key] = []
logger.debug(f"Checking APIs for {model_group_key}-LLMs.")
model_group = llm_config[model_group_key]
for llm_name, llm in model_group.items():
logger.debug(f"Checking availability of {llm_name}")
if await is_model_api_available(llm_api=llm.api, llm_name=llm_name):
llm_dict = llm.model_dump(
include=["label", "is_remote", "max_chunks_to_use"]
)
llm_dict["name"] = llm_name
available_llms[model_group_key].append(llm_dict)
if not available_llms["embedding"]:
return [] # if no embedding model available, then RAG-service is unavailable
return available_llms["rag"]
is_model_api_available
async
Check if API is available using credentials.
Availability is checked by sending a HEAD, GET, or POST request. If a health_check endpoint is provided, the request is sent to that endpoint; otherwise, it is sent to the main API URL.
| PARAMETER | DESCRIPTION |
|---|---|
llm_api
|
the LLMAPI instance to check
TYPE:
|
llm_name
|
ID of the LLM as used in the config file as reference
TYPE:
|
timeout_in_s
|
http timeout in seconds; defaults to 10
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
bool
|
True if the model API is available. |
Source code in docs/microservices/rag/src/utils/check_model_api_availability.py
async def is_model_api_available(
llm_api: LLMAPI,
llm_name: str,
timeout_in_s: int = 10,
) -> bool:
"""Check if API is available using credentials.
Availability is checked by sending a HEAD, GET, or POST request. If a health_check endpoint is provided,
the request is sent to that endpoint; otherwise, it is sent to the main API URL.
Args:
llm_api (LLMAPI): the LLMAPI instance to check
llm_name (str): ID of the LLM as used in the config file as reference
timeout_in_s (int): http timeout in seconds; defaults to 10
Returns:
True if the model API is available.
"""
headers = {"Content-type": "application/json"}
# Authorization is not always needed
if llm_api.auth:
headers["Authorization"] = llm_api.auth.get_auth_header()
url = llm_api.get_health_check_url()
# test health check endpoint with GET, HEAD and POST
try:
async with httpx.AsyncClient() as client:
response = await client.get(
url,
headers=headers,
timeout=timeout_in_s,
)
logger.debug(
f"{url} health check via GET request: {response.status_code=}, LLM: '{llm_name}"
)
# test with HEAD
if response.status_code != HTTPStatus.OK:
async with httpx.AsyncClient() as client:
response = await client.head(
url,
headers=headers,
timeout=timeout_in_s,
)
logger.debug(
f"{url} health check via HEAD request: {response.status_code=}, LLM: '{llm_name}"
)
# test with POST
if response.status_code != HTTPStatus.OK:
async with httpx.AsyncClient() as client:
response = await client.post(
url,
headers=headers,
timeout=timeout_in_s,
)
logger.debug(
f"{url} health check via POST request: {response.status_code=}, LLM: '{llm_name}"
)
except Exception as e:
logger.warning(
f"Exception when trying to reach LLM API. Error: {e}, LLM: '{llm_name}"
)
return False
if response.status_code != HTTPStatus.OK:
logger.warning(
f"LLM unavailable: Could not establish connection to LLM-API. LLM: '{llm_name}"
)
return response.status_code == HTTPStatus.OK
delete_elasticsearch_tempfiles
This module contains a function to delete documents in the tempfile index in the Elasticsearch database.
| FUNCTION | DESCRIPTION |
|---|---|
cleanup_elasticsearch_tempindex_task |
Cleanup temp-file index in Elasticsearch database. |
cleanup_elasticsearch_tempindex_task
Cleanup temp-file index in Elasticsearch database.
| RETURNS | DESCRIPTION |
|---|---|
bool
|
True if cleanup was successful. |
Source code in docs/microservices/rag/src/utils/delete_elasticsearch_tempfiles.py
def cleanup_elasticsearch_tempindex_task() -> bool:
"""Cleanup temp-file index in Elasticsearch database.
Returns:
True if cleanup was successful.
"""
timestamp = datetime.now(tz=timezone("Europe/Berlin")) - timedelta(minutes=15)
try:
n_docs = rag_registry.rag_registry.file_rag_tempfile_deletion(timestamp)
logger.info(
f"Cleaned uploaded documents in elasticsearch temporary index, "
f"that were older than one hour. {n_docs} were deleted."
)
return True
except Exception:
logger.exception("Failed to cleanup elasticsearch temporary index")
return False
download_reranker_model
Download re-ranker model defined in rag_pipeline_config.yml from huggingface.
| FUNCTION | DESCRIPTION |
|---|---|
download_reranker_model |
Check if re-ranker model needs to be obtained from Huggingface and download it. |
download_reranker_model
Check if re-ranker model needs to be obtained from Huggingface and download it.
| RETURNS | DESCRIPTION |
|---|---|
str | None
|
Local path of file or None if model could not be loaded. |
Source code in docs/microservices/rag/src/utils/download_reranker_model.py
def download_reranker_model() -> str | None:
"""Check if re-ranker model needs to be obtained from Huggingface and download it.
Returns:
Local path of file or None if model could not be loaded.
"""
config = rag_pipeline_config.pipeline.retrieval_config
logger.info(
f"Downloading re-ranker model. {config.ranker_model} and storing it in {config.ranker_model_path}"
)
try:
reranker_model_path = snapshot_download(
repo_id=config.ranker_model, local_dir=config.ranker_model_path
)
logger.info(f"Saved re-ranker model to {reranker_model_path}")
return reranker_model_path
except Exception:
logger.exception("Failed to download re-ranker model")
return None
process_configs
Methods to load and config and start checks of config integrity.
| FUNCTION | DESCRIPTION |
|---|---|
load_all_configs |
Load config settings from respective paths. |
load_from_yml_in_pydantic_model |
Load config from 'list_of_yaml_paths' into given pydantic-Model. |
load_yaml |
Load yaml. |
merge_specific_cfgs_in_place |
Copy Prompt-config to appropriate section in general llm_config. Edit in-place! |
postprocess_configs |
Post-Process loaded configs. |
remove_inactive_models |
Remove models from all use cases, if they are not in 'active_llms'. Edit in-place! |
load_all_configs
load_all_configs(general_config_paths, path_to_llm_prompts, path_to_llm_model_configs, path_to_rag_config)
Load config settings from respective paths.
| PARAMETER | DESCRIPTION |
|---|---|
general_config_paths
|
Path to config, matching 'Settings'
TYPE:
|
path_to_llm_prompts
|
Path to config, matching 'LLMPromptMaps'
TYPE:
|
path_to_llm_model_configs
|
Path to config, matching 'LLMConfig'
TYPE:
|
path_to_rag_config
|
Path to config, matching 'RAGPipelineConfig'
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
tuple[Settings, LLMConfig, RAGPipelineConfig]
|
Config loaded into their Pydantic Model. |
Source code in docs/microservices/rag/src/utils/process_configs.py
def load_all_configs(
general_config_paths: Path,
path_to_llm_prompts: Path,
path_to_llm_model_configs: Path,
path_to_rag_config: Path,
) -> tuple[Settings, LLMConfig, RAGPipelineConfig]:
"""Load config settings from respective paths.
Args:
general_config_paths (Path): Path to config, matching 'Settings'
path_to_llm_prompts (Path): Path to config, matching 'LLMPromptMaps'
path_to_llm_model_configs (Path): Path to config, matching 'LLMConfig'
path_to_rag_config (Path): Path to config, matching 'RAGPipelineConfig'
Returns:
Config loaded into their Pydantic Model.
"""
settings = load_from_yml_in_pydantic_model(general_config_paths, Settings)
llm_prompts = load_from_yml_in_pydantic_model(path_to_llm_prompts, LLMPromptMaps)
llm_config = load_from_yml_in_pydantic_model(path_to_llm_model_configs, LLMConfig)
rag_config = load_from_yml_in_pydantic_model(path_to_rag_config, RAGPipelineConfig)
postprocess_configs(settings, llm_prompts, llm_config)
return settings, llm_config, rag_config
load_from_yml_in_pydantic_model
Load config from 'list_of_yaml_paths' into given pydantic-Model.
| PARAMETER | DESCRIPTION |
|---|---|
yaml_path
|
Yaml to load
TYPE:
|
pydantic_reference_model
|
pydantic model to load yaml into
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
BaseModel
|
BaseModel derived pydantic data class. |
Source code in docs/microservices/rag/src/utils/process_configs.py
def load_from_yml_in_pydantic_model(
yaml_path: Path, pydantic_reference_model: BaseModel
) -> BaseModel:
"""Load config from 'list_of_yaml_paths' into given pydantic-Model.
Args:
yaml_path (Path): Yaml to load
pydantic_reference_model (BaseModel): pydantic model to load yaml into
Returns:
BaseModel derived pydantic data class.
"""
data = load_yaml(yaml_path)
try:
pydantic_class = pydantic_reference_model(**data)
logger.info(f"Config loaded from: '{yaml_path}'")
return pydantic_class
except ValidationError as e:
logger.critical(f"Error loading config: '{e}'")
raise e
load_yaml
Load yaml.
| PARAMETER | DESCRIPTION |
|---|---|
yaml_path
|
Path to yaml
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
dict[str, Any]
|
Content of loaded yaml. |
Source code in docs/microservices/rag/src/utils/process_configs.py
merge_specific_cfgs_in_place
Copy Prompt-config to appropriate section in general llm_config. Edit in-place!
Only if 'prompt_map' in LLMConfig can be found in LLMPromptMaps, it will be merged. i.e. try to generalize sth. like this:
cfg["test_model:local"].prompt_config = prompt[cfg["test_model:local"].prompt_map]
| PARAMETER | DESCRIPTION |
|---|---|
llm_config
|
Target for merge of Prompt parameter
TYPE:
|
llm_prompts
|
Source to merge Prompt parameter from
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
bool
|
True if no problems occurred. |
Source code in docs/microservices/rag/src/utils/process_configs.py
def merge_specific_cfgs_in_place(
llm_config: LLMConfig, llm_prompts: LLMPromptMaps
) -> bool:
"""Copy Prompt-config to appropriate section in general llm_config. Edit in-place!
Only if 'prompt_map' in LLMConfig can be found in LLMPromptMaps, it will be merged.
i.e. try to generalize sth. like this:
cfg["test_model:local"].prompt_config = prompt[cfg["test_model:local"].prompt_map]
Args:
llm_config (LLMConfig): Target for merge of Prompt parameter
llm_prompts (LLMPromptMaps): Source to merge Prompt parameter from
Returns:
True if no problems occurred.
"""
no_issues_occurred = True
for usecase in llm_config:
if usecase == "embedding":
continue
# load identical use-cases, i.e. chat, RAG
try:
cfg = getattr(llm_config, usecase)
prompt = getattr(llm_prompts, usecase)
except AttributeError as e:
logger.exception(e)
logger.warning(
f"Usecase '{usecase}' not matching between prompt- and general llm config. \
Skipping cfg-merge for '{usecase}' .."
)
no_issues_occurred = False
continue
# copy prompt config to its use-case- and model-counterpart
for model in cfg:
prompt_map_to_use = cfg[model].prompt_map
if prompt_map_to_use in prompt:
cfg[model].prompt_config = prompt[prompt_map_to_use]
else:
logger.warning(
f"'prompt_map: {prompt_map_to_use}' from LLM-config not in prompt-config for '{usecase}'. \
Skipping .."
)
no_issues_occurred = False
continue
return no_issues_occurred
postprocess_configs
Post-Process loaded configs.
Remove unused models (from settings.active_llms), merge LLMPromptMaps into LLMConfig.
| PARAMETER | DESCRIPTION |
|---|---|
settings
|
Config matching pydantic 'Settings'.
TYPE:
|
llm_prompts
|
Config matching pydantic 'LLMPromptMaps'.
TYPE:
|
llm_config
|
Config matching pydantic 'LLMConfig'.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
LLMConfig
|
Merged and filtered LLM configuration. |
Source code in docs/microservices/rag/src/utils/process_configs.py
def postprocess_configs(
settings: Settings, llm_prompts: LLMPromptMaps, llm_config: LLMConfig
) -> LLMConfig:
"""Post-Process loaded configs.
Remove unused models (from settings.active_llms), merge LLMPromptMaps into LLMConfig.
Args:
settings (Settings): Config matching pydantic 'Settings'.
llm_prompts (LLMPromptMaps): Config matching pydantic 'LLMPromptMaps'.
llm_config (LLMConfig): Config matching pydantic 'LLMConfig'.
Returns:
Merged and filtered LLM configuration.
"""
remove_inactive_models(llm_config, settings.active_llms)
merge_specific_cfgs_in_place(llm_config, llm_prompts)
return llm_config
remove_inactive_models
Remove models from all use cases, if they are not in 'active_llms'. Edit in-place!
| PARAMETER | DESCRIPTION |
|---|---|
input_config
|
Config to change
TYPE:
|
active_llms
|
Models to keep - remove other
TYPE:
|
Source code in docs/microservices/rag/src/utils/process_configs.py
def remove_inactive_models(input_config: LLMConfig, active_llms: list[str]) -> None:
"""Remove models from all use cases, if they are not in 'active_llms'. Edit in-place!
Args:
input_config (LLMConfig): Config to change
active_llms (list[str]): Models to keep - remove other
"""
for usecase in input_config:
if usecase == "embedding":
continue
cfg = getattr(input_config, usecase)
active_llms_for_usecase = getattr(active_llms, usecase)
for model in list(cfg):
if model not in active_llms_for_usecase:
cfg.pop(model)