Zum Inhalt

RAG

rag

MODULE DESCRIPTION
main

Main module of the application.

src

Source code of the chat containing core components and utilities.

main

Main module of the application.

This module serves as the entry point for the program. It imports necessary modules, sets up any initial configuration or data structures, and possibly defines main functions or classes that are used throughout the application.

src

Source code of the chat containing core components and utilities.

MODULE DESCRIPTION
app

Initialize the app.

endpoints

This module provides API endpoints for RAG.

models

Data model classes for loading and validation API and configuration parameters.

rag

Implementation of the core logic and interaction flow of the RAG.

settings

Load all settings from a central place, not hidden in utils.

utils

Utils functions for logging, LLM availability check and configuration processing.

app

Initialize the app.

FUNCTION DESCRIPTION
lifespan

Sets up a scheduler and updates available llms.

lifespan async
lifespan(_app)

Sets up a scheduler and updates available llms.

This lifespan function is started on startup of FastAPI. The first part - till yield is executed on startup and initializes a scheduler to regulary check the LLM-API. The second part is executed on shutdown and is used to clean up the scheduler.

The available LLMs - i.e. the LLMs where API-checks passed - are cached in FastAPI state object as app.state.available_llms.

PARAMETER DESCRIPTION
_app

fastapi.applications.FastAPI object

TYPE: FastAPI

Source code in docs/microservices/rag/src/app.py
@asynccontextmanager
async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
    """Sets up a scheduler and updates available llms.

    This lifespan function is started on startup of FastAPI. The first part
    - till `yield` is executed on startup and initializes a scheduler to regulary
    check the LLM-API. The second part is executed on shutdown and is used to
    clean up the scheduler.

    The available LLMs - i.e. the LLMs where API-checks passed - are cached in
    FastAPI state object as `app.state.available_llms`.

    Args:
        _app (FastAPI): fastapi.applications.FastAPI object

    """

    async def update_llm_state() -> None:
        _app.state.available_llms = await get_available_llms()

    # store available LLMs in FastAPI app state
    _app.state.available_llms = await get_available_llms()

    # setup a scheduler
    scheduler = AsyncIOScheduler()
    scheduler.add_job(
        update_llm_state,
        "interval",
        seconds=settings.check_llm_api_interval_in_s,
    )
    scheduler.add_job(cleanup_elasticsearch_tempindex_task, "interval", seconds=120)
    scheduler.start()

    yield

    # cleanup
    scheduler.shutdown()

endpoints

This module provides API endpoints for RAG.

FUNCTION DESCRIPTION
fetch_database_rag

Endpoint for question answering (RAG) on a vector database.

fetch_file_rag

Endpoint for question answering (RAG) on a given list of files.

get_llms

Returns model information of available LLMs.

get_sources_for_rag

This endpoint returns a list of available sources for database RAG.

health

Return a health check message.

ingest_docs_to_database

Endpoint to ingest a list of documents into the vector database.

fetch_database_rag async
fetch_database_rag(rag_input)

Endpoint for question answering (RAG) on a vector database.

PARAMETER DESCRIPTION
rag_input

A client-defined connection input containing the query data.

TYPE: RAGInput

RETURNS DESCRIPTION
list[RAGOutput]

Inference results returned by the vector database pipeline.

Source code in docs/microservices/rag/src/endpoints.py
@router.post(
    "/database",
    response_model=list[RAGOutput],
    summary="RAG knowledge database endpoint.",
    description=(
        "Performs a RAG (Retrieval-Augmented Generation) query on the knowledge database.\n\n"
        "The endpoint retrieves relevant documents based on the input filters and question, "
        "then generates answers using the specified language model."
    ),
    openapi_extra={
        "requestBody": {
            "content": {
                "application/json": {
                    "examples": {}  # empty example added to avoid CI issues
                }
            }
        }
    },
    responses={
        200: {
            "description": "Successful RAG response with retrieved documents.",
            "content": {
                "application/json": {
                    "examples": RAGOutput.model_config["json_schema_extra"][
                        "openapi_examples"
                    ],
                }
            },
        },
        400: {
            "description": "Invalid language model.",
            "content": {
                "application/json": {
                    "schema": {
                        "type": "object",
                        "properties": {"detail": {"type": "string"}},
                        "example": {
                            "detail": (
                                "Es wurde ein ungültiges Sprachmodell ausgewählt. "
                                "Bitte versuchen Sie es mit einem anderen Modell."
                            )
                        },
                    }
                }
            },
        },
        500: {"description": "Internal server error."},
    },
)
async def fetch_database_rag(rag_input: RAGInput) -> list[RAGOutput]:
    """Endpoint for question answering (RAG) on a vector database.

    Args:
        rag_input (RAGInput): A client-defined connection input containing the query data.

    Returns:
        Inference results returned by the vector database pipeline.
    """
    return await rag_registry.run_database_rag(rag_input)
fetch_file_rag async
fetch_file_rag(request_timestamp=Form(None, description='Unix timestamp of the request.', example=1731252767, deprecated=True), question=Form(..., description="The user's input question to be answered.", example='What did the parties decide?'), language_model=Form(..., description='Identifier for the language model to use.', example='test_model_mock'), max_chunks_to_use=Form(None, description='Optional upper limit on the number of text chunks used for the response.', example=5), files=File(..., description='One or more files (pdf, txt, docx).'))

Endpoint for question answering (RAG) on a given list of files.

PARAMETER DESCRIPTION
request_timestamp

The timestamp of the request.

TYPE: int DEFAULT: Form(None, description='Unix timestamp of the request.', example=1731252767, deprecated=True)

question

The question to be answered, encoded as FormData.

TYPE: str DEFAULT: Form(..., description="The user's input question to be answered.", example='What did the parties decide?')

language_model

The selected language model. LLMs are defined in configs/llm_models.yml.

TYPE: str DEFAULT: Form(..., description='Identifier for the language model to use.', example='test_model_mock')

max_chunks_to_use

The number of chunks to use for answer generation.

TYPE: int DEFAULT: Form(None, description='Optional upper limit on the number of text chunks used for the response.', example=5)

files

List of binary-encoded uploaded files (e.g., PDF, DOCX/ODT, TXT), sent as FormData.

TYPE: list DEFAULT: File(..., description='One or more files (pdf, txt, docx).')

RETURNS DESCRIPTION
list[RAGOutput]

The generated answer as an inference result.

Source code in docs/microservices/rag/src/endpoints.py
@router.post(
    "/file",
    response_model=list[RAGOutput],
    summary="RAG file endpoint with multiple uploads.",
    description=(
        "Performs a RAG (Retrieval-Augmented Generation) query based on uploaded files.\n\n"
        "The endpoint parses and chunks the uploaded files, retrieves relevant documents, "
        "and generates answers using the specified language model."
    ),
    responses={
        200: {
            "description": "Successful RAG response with retrieved documents.",
            "content": {
                "application/json": {
                    "examples": RAGOutput.model_config["json_schema_extra"][
                        "openapi_examples"
                    ],
                }
            },
        },
        400: {"description": "Invalid language model or request."},
        500: {"description": "Internal server error."},
    },
)
async def fetch_file_rag(
    request_timestamp: int | None = Form(
        None,
        description="Unix timestamp of the request.",
        example=1731252767,
        deprecated=True,
    ),
    question: str = Form(
        ...,
        description="The user's input question to be answered.",
        example="What did the parties decide?",
    ),
    language_model: str = Form(
        ...,
        description="Identifier for the language model to use.",
        example="test_model_mock",
    ),
    max_chunks_to_use: int | None = Form(
        None,
        description="Optional upper limit on the number of text chunks used for the response.",
        example=5,
    ),
    files: list[UploadFile] = File(
        ...,
        description="One or more files (pdf, txt, docx).",
    ),
) -> list[RAGOutput]:
    """Endpoint for question answering (RAG) on a given list of files.

    Args:
        request_timestamp (int): The timestamp of the request.
        question (str): The question to be answered, encoded as FormData.
        language_model (str): The selected language model. LLMs are defined in configs/llm_models.yml.
        max_chunks_to_use (int): The number of chunks to use for answer generation.
        files (list): List of binary-encoded uploaded files (e.g., PDF, DOCX/ODT, TXT), sent as FormData.

    Returns:
        The generated answer as an inference result.
    """
    rag_input = RAGInput(
        request_timestamp=request_timestamp,
        question=question,
        meta_data_filters=None,
        max_chunks_to_use=max_chunks_to_use,
        language_model=language_model,
    )
    return await rag_registry.run_file_rag(rag_input, files)
get_llms async
get_llms(request)

Returns model information of available LLMs.

PARAMETER DESCRIPTION
request

Request-Data.

TYPE: Request

RETURNS DESCRIPTION
list[dict]

List with information for each LLM.

Source code in docs/microservices/rag/src/endpoints.py
@router.get(
    "/llms",
    summary="List available language models.",
    description=("Returns a list of available language models (LLMs).\n\n"),
    responses={
        200: {
            "description": "List of available LLMs.",
            "content": {
                "application/json": {
                    "example": [
                        {
                            "label": "test_model:mock",
                            "is_remote": False,
                            "name": "test_model_mock",
                        },
                    ]
                }
            },
        },
        500: {"description": "Internal server error accessing microservice"},
    },
)
async def get_llms(request: Request) -> list[dict]:
    """Returns model information of available LLMs.

    Args:
        request (Request): Request-Data.

    Returns:
        List with information for each LLM.
    """
    app = request.app  # indirectly access the FastAPI app object
    return app.state.available_llms
get_sources_for_rag async
get_sources_for_rag()

This endpoint returns a list of available sources for database RAG.

RETURNS DESCRIPTION
list[Source]

List of available sources.

Source code in docs/microservices/rag/src/endpoints.py
@router.get(
    "/sources",
    response_model=list[Source],
    summary="List available RAG database sources.",
    description=(
        "Returns a list of all available sources in the knowledge database.\n\n"
        "Each source contains its name and whether date-based filtering is applied."
    ),
    responses={
        200: {
            "description": "List of available database sources.",
            "content": {
                "application/json": {
                    "examples": Source.model_config["json_schema_extra"][
                        "openapi_examples"
                    ],
                }
            },
        }
    },
)
async def get_sources_for_rag() -> list[Source]:
    """This endpoint returns a list of available sources for database RAG.

    Returns:
        List of available sources.
    """
    return rag_pipeline_config.pipeline.sources
health async
health()

Return a health check message.

RETURNS DESCRIPTION
dict[str, str]

The health check message as a dictionary.

Source code in docs/microservices/rag/src/endpoints.py
@router.get(
    "/",
    summary="Health check endpoint",
    description=(
        "Returns a simple message indicating that the RAG service is running.\n\n"
        "Use this endpoint to verify that the service is alive and responsive."
    ),
    responses={
        200: {
            "description": "Health check successful",
            "content": {"application/json": {"example": {"status": "RAG is running"}}},
        },
        500: {"description": "Internal server error"},
    },
)
@router.get(
    "/health",
    summary="Health check endpoint",
    description=(
        "Returns a simple message indicating that the RAG service is running.\n\n"
        "Use this endpoint to verify that the service is alive and responsive."
    ),
    responses={
        200: {
            "description": "Health check successful",
            "content": {"application/json": {"example": {"status": "RAG is running"}}},
        },
        500: {"description": "Internal server error"},
    },
)
async def health() -> dict[str, str]:
    """Return a health check message.

    Returns:
        The health check message as a dictionary.
    """
    return {"message": f"{settings.service_name} is running"}
ingest_docs_to_database async
ingest_docs_to_database(db_ingestion)

Endpoint to ingest a list of documents into the vector database.

Each document must be an instance of IngestionDocument and include the required content and metadata fields. The endpoint processes the documents and inserts them into the database asynchronously.

For contents containing page breaks, the meta data page_number is calculated automatically during ingestion starting with page_number=1. Page breaks are represented as \f in the text. E.g. "content": "Text on page 1. \f Text on page 2. \f Text on page 3.

Input structure: { "documents": [ { "content": "string", "metadata": { "source": "string", # Required; must not be empty after trimming. "title": "string", # Optional. "date": "YYYY-MM-DD", # Optional. "url": "string", # Optional. } } ], "batch_size": 1 # Optional; defaults to 1. }

PARAMETER DESCRIPTION
db_ingestion

Client-defined ingestion input, including connection and document data.

TYPE: DBIngestionInput

RETURNS DESCRIPTION
dict[str, Any]

A status and message response, for example:

dict[str, Any]

{ "status": "success", "message": "Ingested documents and chunks into the database."

dict[str, Any]

}

Notes

Large document batches may cause connection timeouts. In such cases, ingestion may not complete successfully. Small batch sizes or sequential submissions are recommended.

Source code in docs/microservices/rag/src/endpoints.py
@router.post(
    "/database_ingestion",
    response_model=dict[str, Any],
    summary="Ingest a list of documents into the vector database.",
    description=(
        "Each document must be an instance of `IngestionDocument` and include the required"
        "`content` and `metadata` fields. The endpoint processes the documents and inserts"
        "them into the database asynchronously."
        ""
        "For contents containing page breaks, the meta data `page_number` is calculated automatically"
        "during ingestion starting with `page_number=1`. Page breaks are represented as `\\f` in the text."
        "E.g. `content`: `Text on page 1. \\f Text on page 2. \\f Text on page 3.`"
    ),
    openapi_extra={
        "requestBody": {
            "content": {
                "application/json": {
                    "examples": {}  # empty example added to avoid CI issues
                }
            }
        }
    },
)
async def ingest_docs_to_database(
    db_ingestion: DBIngestionInput,
) -> dict[str, Any]:
    r"""Endpoint to ingest a list of documents into the vector database.

    Each document must be an instance of `IngestionDocument` and include the required
    `content` and `metadata` fields. The endpoint processes the documents and inserts
    them into the database asynchronously.

    For contents containing page breaks, the meta data `page_number` is calculated automatically
    during ingestion starting with `page_number=1`. Page breaks are represented as `\f` in the text.
    E.g. `"content": "Text on page 1. \f Text on page 2. \f Text on page 3.`

    Input structure:
    {
        "documents": [
            {
                "content": "string",
                "metadata": {
                    "source": "string",   # Required; must not be empty after trimming.
                    "title": "string",    # Optional.
                    "date": "YYYY-MM-DD", # Optional.
                    "url": "string",      # Optional.
                }
            }
        ],
        "batch_size": 1 # Optional; defaults to 1.
    }

    Args:
        db_ingestion (DBIngestionInput): Client-defined ingestion input, including connection
            and document data.

    Returns:
        A status and message response, for example:

        {
            "status": "success",
            "message": "Ingested <n> documents and <m> chunks into the database."
        }

    Notes:
        Large document batches may cause connection timeouts. In such cases, ingestion
        may not complete successfully. Small batch sizes or sequential submissions are
        recommended.
    """
    status = await rag_registry.index_database_rag(db_ingestion=db_ingestion)

    return status

models

Data model classes for loading and validation API and configuration parameters.

MODULE DESCRIPTION
api_input

Pydantic models for input parameters of the RAG API.

api_output

Pydantic models for API output parameters of the RAG service.

general

Load and check Settings from yml.

llms

pydantic model for LLM config.

rag_config

pydantic models describing a RAG pipeline.

api_input

Pydantic models for input parameters of the RAG API.

CLASS DESCRIPTION
DBIngestionInput

Defines the structure of valid input for document ingestion into the database.

DocumentMetadata

Defines the metadata associated with a document.

DoubleDocsHandling

Defines options for handling already embedded docs during input indexing.

IngestionDocument

Defines the structure of a document to be ingested into the database.

MetadataFilter

Defines metadata-based filters for RAG document retrieval.

RAGInput

Model defining the input of a valid RAG request.

DBIngestionInput

Bases: BaseModel

Defines the structure of valid input for document ingestion into the database.

ATTRIBUTE DESCRIPTION
documents

List of documents to be ingested.

TYPE: list[IngestionDocument]

batch_size

Number of documents per upload batch. (optional, defaults to 1)

TYPE: PositiveInt

Source code in docs/microservices/rag/src/models/api_input.py
class DBIngestionInput(BaseModel):
    """Defines the structure of valid input for document ingestion into the database.

    Attributes:
        documents (list[IngestionDocument]): List of documents to be ingested.
        batch_size (PositiveInt): Number of documents per upload batch. (optional, defaults to 1)
    """

    documents: list[IngestionDocument]
    batch_size: PositiveInt = Field(1, description="Split Upload in Batches.")
    double_doc_handling: DoubleDocsHandling = DoubleDocsHandling.IGNORE
DocumentMetadata

Bases: BaseModel

Defines the metadata associated with a document.

ATTRIBUTE DESCRIPTION
source

Required source identifier (must not be empty).

TYPE: str

title

Optional document title.

TYPE: str | None

date

Optional date of the document.

TYPE: date | None

url

Optional source URL.

TYPE: str | None

Notes

Extra fields are allowed to support flexible metadata extension.

METHOD DESCRIPTION
trim_strings

Trim leading and trailing whitespace from string fields.

validate_non_empty_source

Ensure that the 'source' field is not empty after trimming.

Source code in docs/microservices/rag/src/models/api_input.py
class DocumentMetadata(BaseModel):
    """Defines the metadata associated with a document.

    Attributes:
        source (str): Required source identifier (must not be empty).
        title (str | None): Optional document title.
        date (datetime.date | None): Optional date of the document.
        url (str | None): Optional source URL.

    Notes:
        Extra fields are allowed to support flexible metadata extension.
    """

    model_config = ConfigDict(extra="allow")

    source: str
    title: str | None = None
    date: datetime.date | None = None
    url: str | None = None

    @field_validator("title", "source")
    @classmethod
    def trim_strings(cls, value: str) -> str:
        """Trim leading and trailing whitespace from string fields."""
        if value is None:
            return value
        return value.strip()

    @field_validator("source")
    @classmethod
    def validate_non_empty_source(cls, value: str) -> str:
        """Ensure that the 'source' field is not empty after trimming."""
        if not value:
            raise ValueError("Source cannot be empty")
        return value
trim_strings classmethod
trim_strings(value)

Trim leading and trailing whitespace from string fields.

Source code in docs/microservices/rag/src/models/api_input.py
@field_validator("title", "source")
@classmethod
def trim_strings(cls, value: str) -> str:
    """Trim leading and trailing whitespace from string fields."""
    if value is None:
        return value
    return value.strip()
validate_non_empty_source classmethod
validate_non_empty_source(value)

Ensure that the 'source' field is not empty after trimming.

Source code in docs/microservices/rag/src/models/api_input.py
@field_validator("source")
@classmethod
def validate_non_empty_source(cls, value: str) -> str:
    """Ensure that the 'source' field is not empty after trimming."""
    if not value:
        raise ValueError("Source cannot be empty")
    return value
DoubleDocsHandling

Bases: StrEnum

Defines options for handling already embedded docs during input indexing.

Documents can either be overwritten or skipped in the embedding process.

Values

FORCE: "force" – Forces new embedding of the file into the database. IGNORE: "ignore" – Ignores the duplicate document, no embedding.

Source code in docs/microservices/rag/src/models/api_input.py
class DoubleDocsHandling(StrEnum):
    """Defines options for handling already embedded docs during input indexing.

    Documents can either be overwritten or skipped in the embedding process.

    Values:
        FORCE: "force" – Forces new embedding of the file into the database.
        IGNORE: "ignore" – Ignores the duplicate document, no embedding.
    """

    FORCE = "force"
    IGNORE = "ignore"
IngestionDocument

Bases: BaseModel

Defines the structure of a document to be ingested into the database.

ATTRIBUTE DESCRIPTION
content

The document's text content (must not be empty).

TYPE: str

meta

Associated metadata, including the required source field.

TYPE: DocumentMetadata

METHOD DESCRIPTION
trim_strings

Trim leading and trailing whitespace from content.

validate_non_empty_source

Ensure that the 'content' field is not empty after trimming.

Source code in docs/microservices/rag/src/models/api_input.py
class IngestionDocument(BaseModel):
    """Defines the structure of a document to be ingested into the database.

    Attributes:
        content (str): The document's text content (must not be empty).
        meta (DocumentMetadata): Associated metadata, including the required source field.
    """

    content: str = Field(..., description="Document content text")
    meta: DocumentMetadata = Field(
        ..., description="Document metadata. Must include source."
    )

    @field_validator("content")
    @classmethod
    def trim_strings(cls, value: str) -> str:
        """Trim leading and trailing whitespace from content."""
        if value is None:
            return value
        return value.strip()

    @field_validator("content")
    @classmethod
    def validate_non_empty_source(cls, value: str) -> str:
        """Ensure that the 'content' field is not empty after trimming."""
        if not value:
            raise ValueError("Source cannot be empty")
        return value
trim_strings classmethod
trim_strings(value)

Trim leading and trailing whitespace from content.

Source code in docs/microservices/rag/src/models/api_input.py
@field_validator("content")
@classmethod
def trim_strings(cls, value: str) -> str:
    """Trim leading and trailing whitespace from content."""
    if value is None:
        return value
    return value.strip()
validate_non_empty_source classmethod
validate_non_empty_source(value)

Ensure that the 'content' field is not empty after trimming.

Source code in docs/microservices/rag/src/models/api_input.py
@field_validator("content")
@classmethod
def validate_non_empty_source(cls, value: str) -> str:
    """Ensure that the 'content' field is not empty after trimming."""
    if not value:
        raise ValueError("Source cannot be empty")
    return value
MetadataFilter

Bases: BaseModel

Defines metadata-based filters for RAG document retrieval.

ATTRIBUTE DESCRIPTION
source

Optional source identifier. If None, all sources are considered.

TYPE: str | None

start_date

Optional start date for filtering by date.

TYPE: date | None

end_date

Optional end date for filtering by date.

TYPE: date | None

Source code in docs/microservices/rag/src/models/api_input.py
class MetadataFilter(BaseModel):
    """Defines metadata-based filters for RAG document retrieval.

    Attributes:
        source (str | None): Optional source identifier. If None, all sources are considered.
        start_date (datetime.date | None): Optional start date for filtering by date.
        end_date (datetime.date | None): Optional end date for filtering by date.
    """

    source: str | None = None
    start_date: datetime.date | None = None
    end_date: datetime.date | None = None
RAGInput

Bases: BaseModel

Model defining the input of a valid RAG request.

A RAG request can include one or more metadata filters. Each filter acts as an OR condition: a document is considered a match if it satisfies any of the given filters.

ATTRIBUTE DESCRIPTION
question

The user's input question to be answered.

TYPE: str

meta_data_filters

Optional list of metadata filters.

TYPE: list[MetadataFilter] | None

language_model

Identifier for the language model to use.

TYPE: str

request_timestamp

Unix timestamp indicating when the request was made.

TYPE: int

max_chunks_to_use

Optional upper limit on the number of text chunks used for the response.

TYPE: int | None

Source code in docs/microservices/rag/src/models/api_input.py
class RAGInput(BaseModel):
    """Model defining the input of a valid RAG request.

    A RAG request can include one or more metadata filters. Each filter acts as an OR condition:
    a document is considered a match if it satisfies *any* of the given filters.

    Attributes:
        question (str): The user's input question to be answered.
        meta_data_filters (list[MetadataFilter] | None): Optional list of metadata filters.
        language_model (str): Identifier for the language model to use.
        request_timestamp (int): Unix timestamp indicating when the request was made.
        max_chunks_to_use (int | None): Optional upper limit on the number of text chunks used for the response.
    """

    question: str
    meta_data_filters: list[MetadataFilter] | None = None
    language_model: Annotated[
        str, StringConstraints(strip_whitespace=True, min_length=1)
    ]
    request_timestamp: int | None = Field(
        None,
        description="Unix timestamp indicating when the request was made.",
        deprecated=True,
    )
    max_chunks_to_use: int | None = None

    model_config = ConfigDict(
        json_schema_extra={
            "openapi_examples": {
                "simple": {
                    "summary": "Simple RAG input",
                    "description": "A minimal example of a RAG request with a question and default model, no filters.",
                    "value": {
                        "question": "What did the political parties decide regarding the pension system?",
                        "meta_data_filters": [],
                        "language_model": "test_model_mock",
                        "request_timestamp": 1731252767,
                        "max_chunks_to_use": None,
                    },
                },
                "with_filters": {
                    "summary": "RAG input with a single metadata filter",
                    "description": "Example of a RAG request that applies a metadata filter to restrict the sources.",
                    "value": {
                        "question": "Between which parties was the coalition agreement concluded?",
                        "meta_data_filters": [
                            {"source": "Coalition Agreement"},
                        ],
                        "language_model": "test_model_mock",
                        "request_timestamp": 1731252767,
                        "max_chunks_to_use": 5,
                    },
                },
            }
        }
    )
api_output

Pydantic models for API output parameters of the RAG service.

CLASS DESCRIPTION
RAGOutput

RAG response model defining RAG output.

Source

Model defining source object of RAG output.

RAGOutput

Bases: BaseModel

RAG response model defining RAG output.

ATTRIBUTE DESCRIPTION
question

The user question that triggered the retrieval.

TYPE: str

answer

The generated answer based on the retrieved sources.

TYPE: str

sources

List of source documents used in the response.

TYPE: list[RAGSourceDocument]

Source code in docs/microservices/rag/src/models/api_output.py
class RAGOutput(BaseModel):
    """RAG response model defining RAG output.

    Attributes:
        question (str): The user question that triggered the retrieval.
        answer (str): The generated answer based on the retrieved sources.
        sources (list[RAGSourceDocument]): List of source documents used in the response.
    """

    question: str
    answer: str
    sources: list[Source]

    model_config = ConfigDict(
        json_schema_extra={
            "openapi_examples": {
                "simple": {
                    "summary": "RAG output example",
                    "description": (
                        "Example of a RAG response including the user question, ",
                        "answer and retrieved sources.",
                    ),
                    "value": {
                        "question": "What did the political parties decide regarding the pension system?",
                        "answer": "The parties decided to improve the pension system.",
                        "sources": [
                            {
                                "content": "We decided to improve the pension system.",
                                "meta": {
                                    "source": "Coalition Agreement",
                                    "date": "2025-11-07",
                                },
                                "url": "https://example.com/coalitionagreement.pdf",
                            }
                        ],
                    },
                }
            }
        }
    )
Source

Bases: BaseModel

Model defining source object of RAG output.

ATTRIBUTE DESCRIPTION
content

The text content of the retrieved document.

TYPE: str

meta

Metadata about the document (e.g. title, date, source type).

TYPE: dict

url

Optional URL pointing to the original document.

TYPE: str | None

Source code in docs/microservices/rag/src/models/api_output.py
class Source(BaseModel):
    """Model defining source object of RAG output.

    Attributes:
        content (str): The text content of the retrieved document.
        meta (dict): Metadata about the document (e.g. title, date, source type).
        url (str | None): Optional URL pointing to the original document.
    """

    content: str
    meta: dict
    url: str | None = None
general

Load and check Settings from yml.

CLASS DESCRIPTION
ActiveLLMs

Selects the available models for the respective use cases.

LogLevel

Enum class specifying possible log levels.

Settings

Specifies general settings for the service.

ActiveLLMs

Bases: BaseModel

Selects the available models for the respective use cases.

ATTRIBUTE DESCRIPTION
model_config

Used to ignore other services, which are defined in the config.

TYPE: ConfigDict

summary

List the names of available LLMs for the summary service.

TYPE: List(str

Source code in docs/microservices/rag/src/models/general.py
class ActiveLLMs(BaseModel):
    """Selects the available models for the respective use cases.

    Attributes:
        model_config (ConfigDict): Used to ignore other services, which are defined in the config.
        summary (List(str)): List the names of available LLMs for the summary service.
    """

    model_config = ConfigDict(extra="ignore")

    rag: list[str]
    embedding: list[str]
LogLevel

Bases: StrEnum

Enum class specifying possible log levels.

Source code in docs/microservices/rag/src/models/general.py
class LogLevel(StrEnum):
    """Enum class specifying possible log levels."""

    CRITICAL = "CRITICAL"
    ERROR = "ERROR"
    WARNING = "WARNING"
    INFO = "INFO"
    DEBUG = "DEBUG"

    @classmethod
    def _missing_(cls, value: object) -> None:
        """Convert strings to uppercase and recheck for existence."""
        if isinstance(value, str):
            value = value.upper()
            for level in cls:
                if level == value:
                    return level
        return None
Settings

Bases: BaseModel

Specifies general settings for the service.

ATTRIBUTE DESCRIPTION
model_config

Ignores extra configuration keys not used by this service.

TYPE: ConfigDict

service_name

Name of the current service (e.g., "rag").

TYPE: str

n_uvicorn_workers

Number of parallel Uvicorn worker processes.

TYPE: PositiveInt

service_endpoints

URLs to required dependent services (e.g., parser).

TYPE: dict[str, AnyHttpUrl]

active_llms

Configuration of LLMs available for different use cases.

TYPE: ActiveLLMs

log_level

Minimum logging level for general logs.

TYPE: LogLevel

log_file_max_bytes

Maximum size (in bytes) of a single log file before rotation.

TYPE: PositiveInt

log_file_backup_count

Number of rotated log files to retain.

TYPE: PositiveInt

log_file

File path where logs will be written.

TYPE: FilePath

check_llm_api_interval_in_s

Interval (in seconds) to check LLM API health.

TYPE: PositiveInt

llm_api_timeout

Timeout (in seconds) for LLM API requests.

TYPE: int

haystack_log_level

Logging level specific to Haystack components.

TYPE: LogLevel

debug_haystack_pipelines

If True, activates debug logging for Haystack pipelines.

TYPE: bool

METHOD DESCRIPTION
ensure_log_dir

Create the log directory after validation.

Source code in docs/microservices/rag/src/models/general.py
class Settings(BaseModel):
    """Specifies general settings for the service.

    Attributes:
        model_config (ConfigDict): Ignores extra configuration keys not used by this service.
        service_name (str): Name of the current service (e.g., "rag").
        n_uvicorn_workers (PositiveInt): Number of parallel Uvicorn worker processes.
        service_endpoints (dict[str, AnyHttpUrl]): URLs to required dependent services (e.g., parser).
        active_llms (ActiveLLMs): Configuration of LLMs available for different use cases.
        log_level (LogLevel): Minimum logging level for general logs.
        log_file_max_bytes (PositiveInt): Maximum size (in bytes) of a single log file before rotation.
        log_file_backup_count (PositiveInt): Number of rotated log files to retain.
        log_file (FilePath): File path where logs will be written.
        check_llm_api_interval_in_s (PositiveInt): Interval (in seconds) to check LLM API health.
        llm_api_timeout (int): Timeout (in seconds) for LLM API requests.
        haystack_log_level (LogLevel): Logging level specific to Haystack components.
        debug_haystack_pipelines (bool): If True, activates debug logging for Haystack pipelines.
    """

    model_config = ConfigDict(extra="ignore")

    service_name: str = "RAG"
    service_description: str = (
        "Retrival-Augmented-Generation for file and database using LLMs."
    )

    n_uvicorn_workers: PositiveInt = 1

    service_endpoints: dict[str, AnyHttpUrl]

    active_llms: ActiveLLMs

    log_level: LogLevel = LogLevel.INFO
    log_file_max_bytes: PositiveInt = 1 * 1024 * 1024
    log_file_backup_count: PositiveInt = 3
    log_file: FilePath = Path("/rag/logs/log")

    check_llm_api_interval_in_s: PositiveInt = 120

    llm_api_timeout: int = 60

    haystack_log_level: LogLevel = LogLevel.WARNING
    debug_haystack_pipelines: bool = False

    @model_validator(mode="after")
    def ensure_log_dir(self) -> "Settings":
        """Create the log directory after validation."""
        self.log_file.parent.mkdir(parents=True, exist_ok=True)
        return self
ensure_log_dir
ensure_log_dir()

Create the log directory after validation.

Source code in docs/microservices/rag/src/models/general.py
@model_validator(mode="after")
def ensure_log_dir(self) -> "Settings":
    """Create the log directory after validation."""
    self.log_file.parent.mkdir(parents=True, exist_ok=True)
    return self
llms

pydantic model for LLM config.

CLASS DESCRIPTION
APIAuth

Defines Authentification settings for LLM.

EmbeddingModel

Defines the configuration of an embedding model instance.

LLM

Defines the basic structure of a LLM config.

LLMAPI

Defines API-Connection to LLM.

LLMConfig

Container for all LLM configurations used by the service.

LLMInference

Defines the inference parameters.

LLMPromptConfig

Defines the structure of a LLM prompt configuration.

LLMPromptMaps

Defines the LLMs used for summarization.

LLMPrompts

Defines the prompt parameters.

APIAuth

Bases: BaseModel

Defines Authentification settings for LLM.

ATTRIBUTE DESCRIPTION
type

Either 'token' or 'basic_auth'.

TYPE: Literal

secret_path

File path where the api token or credentials are stored.

TYPE: FilePath

METHOD DESCRIPTION
get_auth_header

Generate auth part of header for http request.

Source code in docs/microservices/rag/src/models/llms.py
class APIAuth(BaseModel):
    """Defines Authentification settings for LLM.

    Attributes:
        type (Literal): Either 'token' or 'basic_auth'.
        secret_path (FilePath): File path where the api token or credentials are stored.
    """

    type: Literal["token", "basic_auth"]
    secret_path: FilePath  # file path where the api token or credentials are stored

    @property
    def secret(self) -> SecretStr:
        """Load secret variable as 'secret'."""
        with open(self.secret_path) as file:
            return SecretStr(file.read().strip())

    def get_auth_header(self) -> str:
        """Generate auth part of header for http request.

        Returns:
            The auth header.
        """
        auth_header = ""

        if self.type == "basic_auth":
            auth_header = f"Basic {base64.b64encode(self.secret.get_secret_value().encode()).decode()}"
        elif self.type == "token":
            auth_header = f"Bearer {self.secret.get_secret_value()}"

        return auth_header
secret property
secret

Load secret variable as 'secret'.

get_auth_header
get_auth_header()

Generate auth part of header for http request.

RETURNS DESCRIPTION
str

The auth header.

Source code in docs/microservices/rag/src/models/llms.py
def get_auth_header(self) -> str:
    """Generate auth part of header for http request.

    Returns:
        The auth header.
    """
    auth_header = ""

    if self.type == "basic_auth":
        auth_header = f"Basic {base64.b64encode(self.secret.get_secret_value().encode()).decode()}"
    elif self.type == "token":
        auth_header = f"Bearer {self.secret.get_secret_value()}"

    return auth_header
EmbeddingModel

Bases: BaseModel

Defines the configuration of an embedding model instance.

ATTRIBUTE DESCRIPTION
model_config

Ignores extra configuration entries not relevant to this class.

TYPE: ConfigDict

label

Human-readable name of the embedding model, suitable for UI display.

TYPE: str

model

Internal model identifier, used in API calls (e.g., Ollama tag).

TYPE: str

api

API details specifying how to interact with the embedding model.

TYPE: LLMAPI

is_remote

Indicates whether the model is hosted on an external API.

TYPE: bool

Source code in docs/microservices/rag/src/models/llms.py
class EmbeddingModel(BaseModel):
    """Defines the configuration of an embedding model instance.

    Attributes:
        model_config (ConfigDict): Ignores extra configuration entries not relevant to this class.
        label (str): Human-readable name of the embedding model, suitable for UI display.
        model (str): Internal model identifier, used in API calls (e.g., Ollama tag).
        api (LLMAPI): API details specifying how to interact with the embedding model.
        is_remote (bool): Indicates whether the model is hosted on an external API.
    """

    model_config = ConfigDict(extra="ignore")

    label: str
    model: str
    api: LLMAPI
    is_remote: bool
LLM

Bases: BaseModel

Defines the basic structure of a LLM config.

ATTRIBUTE DESCRIPTION
label

Human-readable model name that can be presented to users.

TYPE: str

model

Model name which is used in API call, e.g. ollama tag.

TYPE: str

prompt_map

Prompt map name to load LLMPromptMaps from.

TYPE: str

is_remote

Is this LLM hosted at an external API?

TYPE: bool

descrition

Human-readable description of the model that can be provided to the user.

TYPE: str

api

API information.

TYPE: LLMAPI

inference

Inference parameters.

TYPE: LLMInference

max_chunks_to_use

Maximum number of chunks/documents to use for answer generation (Default: 4)

TYPE: int

prompt_config

Prompts (initially None to merge it in another step).

TYPE: LLMPromptConfig

Source code in docs/microservices/rag/src/models/llms.py
class LLM(BaseModel):
    """Defines the basic structure of a LLM config.

    Attributes:
        label (str): Human-readable model name that can be presented to users.
        model (str): Model name which is used in API call, e.g. ollama tag.
        prompt_map (str): Prompt map name to load LLMPromptMaps from.
        is_remote (bool): Is this LLM hosted at an external API?
        descrition (str): Human-readable description of the model that can be provided to the user.
        api (LLMAPI): API information.
        inference (LLMInference): Inference parameters.
        max_chunks_to_use (int): Maximum number of chunks/documents to use for answer generation (Default: 4)
        prompt_config (LLMPromptConfig): Prompts (initially None to merge it in another step).
    """

    label: str
    model: str
    prompt_map: str
    is_remote: bool
    description: str = ""
    api: LLMAPI
    inference: LLMInference
    max_chunks_to_use: int = None
    prompt_config: LLMPromptConfig = None
LLMAPI

Bases: BaseModel

Defines API-Connection to LLM.

ATTRIBUTE DESCRIPTION
url

Url of the LLM.

TYPE: AnyHttpUrl

health_check

Relative path to health check, i.e. '/models'.

TYPE: str | None

auth

Pydantic Model defining the authentication of the LLM.

TYPE: APIAuth | None

METHOD DESCRIPTION
get_health_check_url

Get the URL to check if API is available.

Source code in docs/microservices/rag/src/models/llms.py
class LLMAPI(BaseModel):
    """Defines API-Connection to LLM.

    Attributes:
        url (AnyHttpUrl): Url of the LLM.
        health_check (str | None): Relative path to health check, i.e. '/models'.
        auth (APIAuth | None): Pydantic Model defining the authentication of the LLM.
    """

    url: AnyHttpUrl
    health_check: str | None = None
    auth: APIAuth | None = None

    def get_health_check_url(self) -> str:
        """Get the URL to check if API is available."""
        if self.health_check:
            # make sure to remove trailing and leading slashes to not override path
            return urljoin(
                str(self.url).rstrip("/") + "/",
                self.health_check.lstrip("/"),
            )
        return str(self.url)
get_health_check_url
get_health_check_url()

Get the URL to check if API is available.

Source code in docs/microservices/rag/src/models/llms.py
def get_health_check_url(self) -> str:
    """Get the URL to check if API is available."""
    if self.health_check:
        # make sure to remove trailing and leading slashes to not override path
        return urljoin(
            str(self.url).rstrip("/") + "/",
            self.health_check.lstrip("/"),
        )
    return str(self.url)
LLMConfig

Bases: BaseModel

Container for all LLM configurations used by the service.

ATTRIBUTE DESCRIPTION
model_config

Ignores extra configuration entries not relevant to this class.

TYPE: ConfigDict

rag

Mapping of model names to LLM configurations for RAG use cases.

TYPE: dict[str, LLM]

embedding

Mapping of model names to embedding model configurations.

TYPE: dict[str, EmbeddingModel]

Source code in docs/microservices/rag/src/models/llms.py
class LLMConfig(BaseModel):
    """Container for all LLM configurations used by the service.

    Attributes:
        model_config (ConfigDict): Ignores extra configuration entries not relevant to this class.
        rag (dict[str, LLM]): Mapping of model names to LLM configurations for RAG use cases.
        embedding (dict[str, EmbeddingModel]): Mapping of model names to embedding model configurations.
    """

    model_config = ConfigDict(extra="ignore")

    rag: dict[str, LLM]
    embedding: dict[str, EmbeddingModel]

    def __iter__(self) -> Iterator[str]:
        """Get 'keys' for automatic merge with i.e. LLMPromptConfig."""
        return iter(self.__dict__.keys())

    def __getitem__(self, service: str) -> dict[str, LLM]:
        """Get all LLMs for a given service (e.g. "chat", "rag").

        Args:
            service (str): The service name (e.g., "chat", "rag").

        Returns:
            All configered LLMs for the given service.
        """
        return self.__getattribute__(service)
LLMInference

Bases: BaseModel

Defines the inference parameters.

ATTRIBUTE DESCRIPTION
temperature

Randomness / variation of the output High values indicate more creativity. Default is 0.7.

TYPE: PositiveFloat | None

max_new_tokens

Maximum number of tokens of the generated response. Default is 2048.

TYPE: PositiveInt | None

top_p

Threshold for sampling only from the most likely tokens. Default is 0.7.

TYPE: PositiveFloat | None

Source code in docs/microservices/rag/src/models/llms.py
class LLMInference(BaseModel):
    """Defines the inference parameters.

    Attributes:
        temperature (PositiveFloat | None): Randomness / variation of the output High values indicate more creativity.
                                    Default is 0.7.
        max_new_tokens (PositiveInt | None): Maximum number of tokens of the generated response. Default is 2048.
        top_p (PositiveFloat | None): Threshold for sampling only from the most likely tokens. Default is 0.7.
    """

    temperature: PositiveFloat | None = 0.7
    max_new_tokens: PositiveInt | None = 2048
    top_p: PositiveFloat | None = 0.7
LLMPromptConfig

Bases: BaseModel

Defines the structure of a LLM prompt configuration.

ATTRIBUTE DESCRIPTION
model_config

Used to ignore other services, which are defined in the config.

TYPE: ConfigDict

system

System prompts.

TYPE: LLMPrompts

user

User prompts.

TYPE: LLMPrompts | None

assistant

Assistant prompts.

TYPE: LLMPrompts | None

Source code in docs/microservices/rag/src/models/llms.py
class LLMPromptConfig(BaseModel):
    """Defines the structure of a LLM prompt configuration.

    Attributes:
        model_config (ConfigDict): Used to ignore other services, which are defined in the config.
        system (LLMPrompts): System prompts.
        user (LLMPrompts | None): User prompts.
        assistant (LLMPrompts | None): Assistant prompts.
    """

    # if there are more prompt types defined that are not used in this service: just ignore them
    model_config = ConfigDict(extra="ignore")

    system: LLMPrompts
    user: LLMPrompts | None = None
    assistant: LLMPrompts | None = None
LLMPromptMaps

Bases: BaseModel

Defines the LLMs used for summarization.

ATTRIBUTE DESCRIPTION
model_config

Used to ignore other services, which are defined in the config.

TYPE: ConfigDict

rag

Dictionary containing a name and definition of LLMs's available for RAG.

TYPE: dict[str, LLM]

Source code in docs/microservices/rag/src/models/llms.py
class LLMPromptMaps(BaseModel):
    """Defines the LLMs used for summarization.

    Attributes:
        model_config (ConfigDict): Used to ignore other services, which are defined in the config.
        rag (dict[str, LLM]): Dictionary containing a name and definition of LLMs's available for RAG.
    """

    # if there are more services defined in the config: just ignore them
    model_config = ConfigDict(extra="ignore")

    rag: dict[str, LLMPromptConfig]

    def __iter__(self) -> Iterator[str]:
        """Get 'keys' for automatic merge with i.e. LLMConfig."""
        return iter(self.__dict__.keys())
LLMPrompts

Bases: BaseModel

Defines the prompt parameters.

ATTRIBUTE DESCRIPTION
model_config

Used to ignore other services, which are defined in the config.

TYPE: ConfigDict

generate

Generate prompt generating response of the RAG.

TYPE: str

Source code in docs/microservices/rag/src/models/llms.py
class LLMPrompts(BaseModel):
    """Defines the prompt parameters.

    Attributes:
        model_config (ConfigDict): Used to ignore other services, which are defined in the config.
        generate (str): Generate prompt generating response of the RAG.
    """

    # if there are more prompts defined that are not used in this service: just ignore them
    model_config = ConfigDict(extra="ignore")

    generate: str = ""
rag_config

pydantic models describing a RAG pipeline.

CLASS DESCRIPTION
DB2UIMap

Defines the structure for the mapping from DB-metadata to UI strings.

GenerationConfig

Defines the structure of the configuration of a RAG generation component.

IndexingPipelineConfig

Defines the structure of the configuration of a RAG indexing component.

PipelineConfig

Defines the structure of RAG pipeline configuration.

PipelineSaveConfig

Defines the structure of the configuration of saving a RAG component.

RAGPipelineConfig

Defines the structure of a RAG pipeline configuration file.

RetrievalConfig

Defines the configuration parameter for the retrieval pipeline.

Source

Model defining Database-Sources.

DB2UIMap

Bases: BaseModel

Defines the structure for the mapping from DB-metadata to UI strings.

METHOD DESCRIPTION
ensure_all_values_are_strings

Validates that all keys in the provided data, including additional ones, are strings.

validate_non_empty_source

Ensures that the 'source' field is not empty.

Source code in docs/microservices/rag/src/models/rag_config.py
class DB2UIMap(BaseModel):
    """Defines the structure for the mapping from DB-metadata to UI strings."""

    model_config = ConfigDict(extra="allow")

    source: str

    @field_validator("source")
    @classmethod
    def validate_non_empty_source(cls, value: str) -> str:
        """Ensures that the 'source' field is not empty.

        This check intentionally runs after 'trim_strings' validator.
        """
        if not value:
            raise ValueError("Source cannot be empty")
        return value

    @model_validator(mode="before")
    @classmethod
    def ensure_all_values_are_strings(cls, values: dict[str, Any]) -> dict[str, Any]:
        """Validates that all keys in the provided data, including additional ones, are strings."""
        for key, value in values.items():
            if not isinstance(value, str):
                raise ValueError(
                    f"Field '{key}' must be a string, got {type(value).__name__}"
                )
        return values
ensure_all_values_are_strings classmethod
ensure_all_values_are_strings(values)

Validates that all keys in the provided data, including additional ones, are strings.

Source code in docs/microservices/rag/src/models/rag_config.py
@model_validator(mode="before")
@classmethod
def ensure_all_values_are_strings(cls, values: dict[str, Any]) -> dict[str, Any]:
    """Validates that all keys in the provided data, including additional ones, are strings."""
    for key, value in values.items():
        if not isinstance(value, str):
            raise ValueError(
                f"Field '{key}' must be a string, got {type(value).__name__}"
            )
    return values
validate_non_empty_source classmethod
validate_non_empty_source(value)

Ensures that the 'source' field is not empty.

This check intentionally runs after 'trim_strings' validator.

Source code in docs/microservices/rag/src/models/rag_config.py
@field_validator("source")
@classmethod
def validate_non_empty_source(cls, value: str) -> str:
    """Ensures that the 'source' field is not empty.

    This check intentionally runs after 'trim_strings' validator.
    """
    if not value:
        raise ValueError("Source cannot be empty")
    return value
GenerationConfig

Bases: BaseModel

Defines the structure of the configuration of a RAG generation component.

Source code in docs/microservices/rag/src/models/rag_config.py
class GenerationConfig(BaseModel):
    """Defines the structure of the configuration of a RAG generation component."""

    docs_sender_name: str
    routed_query_name: str
    name_prefix: str
    rag_answer_component_name: str
IndexingPipelineConfig

Bases: BaseModel

Defines the structure of the configuration of a RAG indexing component.

Source code in docs/microservices/rag/src/models/rag_config.py
class IndexingPipelineConfig(BaseModel):
    """Defines the structure of the configuration of a RAG indexing component."""

    es_host: str
    index: str
PipelineConfig

Bases: BaseModel

Defines the structure of RAG pipeline configuration.

Source code in docs/microservices/rag/src/models/rag_config.py
class PipelineConfig(BaseModel):
    """Defines the structure of RAG pipeline configuration."""

    # if there are more services defined in the config: just ignore them
    model_config = ConfigDict(extra="ignore")

    index: dict[str, str]
    embedding_model_name: str

    retrieval_config: RetrievalConfig = RetrievalConfig()

    sources: list[Source]
    metadata_title_separator: str = "----"
PipelineSaveConfig

Bases: BaseModel

Defines the structure of the configuration of saving a RAG component.

Source code in docs/microservices/rag/src/models/rag_config.py
class PipelineSaveConfig(BaseModel):
    """Defines the structure of the configuration of saving a RAG component."""

    file_path: str | Path | None = None
    pipe_label: str | None = None
    png: str | None = None
RAGPipelineConfig

Bases: BaseModel

Defines the structure of a RAG pipeline configuration file.

Source code in docs/microservices/rag/src/models/rag_config.py
class RAGPipelineConfig(BaseModel):
    """Defines the structure of a RAG pipeline configuration file."""

    pipeline: PipelineConfig
    db2ui_map: DB2UIMap
RetrievalConfig

Bases: BaseModel

Defines the configuration parameter for the retrieval pipeline.

retriever_top_k: The number of documents to retrieve per query. include_reranker: Defines if a ranker should be included into the retrieval pipeline. max_chunks_to_use: The number of documents to use for answer generation. ranker_score_threshold: The score threshold to use for ranking. ranker_model: Huggingface repo_id where the re-ranker model is published. ranker_model_path: Path to local model.

Source code in docs/microservices/rag/src/models/rag_config.py
class RetrievalConfig(BaseModel):
    """Defines the configuration parameter for the retrieval pipeline.

    retriever_top_k: The number of documents to retrieve per query.
    include_reranker: Defines if a ranker should be included into the retrieval pipeline.
    max_chunks_to_use: The number of documents to use for answer generation.
    ranker_score_threshold: The score threshold to use for ranking.
    ranker_model: Huggingface repo_id where the re-ranker model is published.
    ranker_model_path: Path to local model.
    """

    retriever_top_k: int = 50
    include_ranker: bool = False
    max_chunks_to_use: int = 20
    ranker_score_threshold: float | None = None
    ranker_model: str = "svalabs/cross-electra-ms-marco-german-uncased"  # Huggingface model repo of ranker model to use
    ranker_model_path: str = "/rag/models"  # path for re-ranker model files
Source

Bases: BaseModel

Model defining Database-Sources.

ATTRIBUTE DESCRIPTION
name

The name of the document source. E.g. 'Landesdrucksachen'

TYPE: str

date_filter

If set to true, this document source can be filtered by date range. Documents like 'Koalitionsvertrag' will probably not need a filtering by date. 'Pressemitteilungen' probably will need that filter.

TYPE: bool

Source code in docs/microservices/rag/src/models/rag_config.py
class Source(BaseModel):
    """Model defining Database-Sources.

    Attributes:
        name: The name of the document source. E.g. 'Landesdrucksachen'
        date_filter: If set to true, this document source can be filtered by date range. Documents
            like 'Koalitionsvertrag' will probably not need a filtering by date. 'Pressemitteilungen'
            probably will need that filter.
    """

    name: str
    date_filter: bool

    model_config = ConfigDict(
        json_schema_extra={
            "openapi_examples": {
                "standard": {
                    "summary": "Standard database source",
                    "description": "A typical database source with date-based filtering enabled.",
                    "value": {
                        "name": "Coalition Agreement",
                        "date_filter": False,
                    },
                },
            }
        }
    )

rag

Implementation of the core logic and interaction flow of the RAG.

MODULE DESCRIPTION
pipeline_configuration

This module initializes the RAG pipelines (database-rag and file-rag).

pipelines

Implementation of the RAG pipelines.

rag_registry

RAG-Registry class for setting-up, storing and accessing RAG pipelines.

utils

This module contains helper functions for RAG pipelines.

pipeline_configuration

This module initializes the RAG pipelines (database-rag and file-rag).

FUNCTION DESCRIPTION
setup_rag

File and database rag pipelines.

setup_rag
setup_rag(*, debug_haystack_pipelines=False)

File and database rag pipelines.

RETURNS DESCRIPTION
tuple[FileRagPipe, DatabaseRagPipe]

The file_rag and database_rag pipelines.

Source code in docs/microservices/rag/src/rag/pipeline_configuration.py
def setup_rag(
    *, debug_haystack_pipelines: bool = False
) -> tuple[FileRagPipe, DatabaseRagPipe]:
    """File and database rag pipelines.

    Returns:
       The file_rag and database_rag pipelines.
    """
    if debug_haystack_pipelines:
        tracing.tracer.is_content_tracing_enabled = (
            True  # to enable tracing/logging content (inputs/outputs)
        )
        tracing.enable_tracing(
            LoggingTracer(
                tags_color_strings={
                    "haystack.component.input": "\x1b[1;31m",
                    "haystack.component.name": "\x1b[1;34m",
                }
            )
        )

    logger.debug("Initialize database-rag pipelines.")
    database_rag = DatabaseRagPipe(
        pipe_name="database",
        indexing_pipeline=pipeline_registry["database_indexing"],
        rag_pipeline=pipeline_registry["database_rag"],
    )
    database_rag.deploy_pipelines()
    logger.debug("Initialize file-rag pipelines.")
    file_rag = FileRagPipe(
        pipe_name="file",
        indexing_pipeline=pipeline_registry["file_indexing"],
        rag_pipeline=pipeline_registry["file_rag"],
        file_deletion_pipeline=pipeline_registry["file_deleter"],
    )
    file_rag.deploy_pipelines()

    return file_rag, database_rag
pipelines

Implementation of the RAG pipelines.

MODULE DESCRIPTION
baseclass_rag

This class serves as a foundation for creating RAG pipelines.

components

Implementation of the pipeline components of the RAG.

database_rag

This module implements a RAG system based on database retrievals as data source.

draw_pipeline

Visualizes haystack pipelines.

file_rag

This module implements a RAG system based on files as data source.

openai_custom_auth_client

This module provides an OpenAI client that supports Bearer-Token-Authentication and Basic-Authentication.

pipeline_definitions

Implementation of the pipeline definitions of the RAG.

baseclass_rag

This class serves as a foundation for creating RAG pipelines.

CLASS DESCRIPTION
BaseRagPipe

Base class for RAG Pipelines based on haystack pipelines.

BaseRagPipe

Bases: ABC

Base class for RAG Pipelines based on haystack pipelines.

This class serves as a foundation for creating RAG-based pipelines, leveraging the capabilities of the haystack library.

ATTRIBUTE DESCRIPTION
name

The name of the pipeline instance.

TYPE: str

rag_pipe

The haystack RAG pipeline.

TYPE: AsyncPipeline

indexing_pipe

The haystack indexing pipeline.

TYPE: AsyncPipeline

id_retriever

Component to retrieve document IDs.

TYPE: DocumentIDRetriever

llm_generators

Dictionary of LLM generation configurations from pipeline metadata.

TYPE: dict

_required_metadata_keys

Required metadata keys expected in the pipeline.

TYPE: list[str]

_required_components

Required component names expected in the pipeline.

TYPE: list[str]

METHOD DESCRIPTION
add_llm_router_arg_inplace

Select the answer generation pipeline that corresponds to the language model.

deploy_indexing_pipe

Initializes and warms up the indexing pipeline.

deploy_pipelines

Initializes and warms up the indexing and rag pipeline.

deploy_rag_pipe

Initializes and warms up the rag pipeline.

generate_answer

Run the RAG pipeline to generate an answer based on a query.

indexing

Run the file indexing pipeline on the provided files and add ingestion_date as metadata.

Source code in docs/microservices/rag/src/rag/pipelines/baseclass_rag.py
class BaseRagPipe(ABC):
    """Base class for RAG Pipelines based on haystack pipelines.

    This class serves as a foundation for creating RAG-based pipelines,
    leveraging the capabilities of the haystack library.

    Attributes:
        name (str): The name of the pipeline instance.
        rag_pipe (AsyncPipeline): The haystack RAG pipeline.
        indexing_pipe (AsyncPipeline): The haystack indexing pipeline.
        id_retriever (DocumentIDRetriever): Component to retrieve document IDs.
        llm_generators (dict): Dictionary of LLM generation configurations from pipeline metadata.
        _required_metadata_keys (list[str]): Required metadata keys expected in the pipeline.
        _required_components (list[str]): Required component names expected in the pipeline.
    """

    def __init__(
        self,
        indexing_pipeline: AsyncPipeline,
        rag_pipeline: AsyncPipeline,
        pipe_name: str,
    ) -> None:
        """Initialize a new instance of BaseRagPipe.

        Pipeline Metadata Requirements:
            - `rag_answers_keys`: A list of relevant keys to access nested RAG answers.
            - `retrieved_docs_keys`: A list of relevant keys to access nested retrieved documents.
            - `llm_generators`: A dict of dictionaries with the available answer generation LLMs.
            - The models are selected with a Haystack component called `llm_router`.

        Pipeline Component Requirements:
            The following components are required in the pipeline: ["query", "filters"].

        Args:
            indexing_pipeline (AsyncPipeline): haystack indexing pipeline.
            rag_pipeline (AsyncPipeline): haystack RAG pipeline.
            pipe_name (str): The name of this specific pipeline.
        """
        self.name = pipe_name
        self._required_metadata_keys = [
            "rag_answers_keys",
            "retrieved_docs_keys",
            "llm_generators",
            "max_chunks_to_use_component",
        ]
        self._required_components = ["query", "filters"]

        self.rag_pipe = rag_pipeline
        self.indexing_pipe = indexing_pipeline

        self._init_id_retriever()

        self.llm_generators = self.rag_pipe.metadata["llm_generators"]

    @abstractmethod
    def indexing(self, files: list[bytes]) -> dict[str, object]:
        """Run the file indexing pipeline on the provided files and add ingestion_date as metadata.

        Args:
            files (list[bytes]): A list of byte streams representing files.

        Returns:
            The results of the indexing pipeline.

        Raises:
            AttributeError: If the indexing pipeline is not deployed.
        """
        result = {"indexing_information": "some information"}
        return result

    @abstractmethod
    def generate_answer(
        self, query: str, language_model: str
    ) -> tuple[str, list[Document]]:
        """Run the RAG pipeline to generate an answer based on a query.

        This method processes the input query through a Retrieval-Augmented Generation (RAG) pipeline,
        which retrieves relevant documents and generates an answer based on that context.

        Args:
            query (str): The input query for which an answer is to be generated.
            language_model (str): The model used for generating the answer.

        Returns:
            A tuple containing the generated answer and the retrieved documents.
        """
        answer = "here is my answer"
        documents = [
            Document(
                content="this are the retrieved documents",
                meta={"source": "test", "title": "my_title"},
            )
        ]
        return answer, documents

    def _init_id_retriever(self) -> None:
        document_store = self.indexing_pipe.get_component(
            "document_writer"
        ).document_store

        self.id_retriever = DocumentIDRetriever(document_store=document_store)

    def deploy_pipelines(self) -> None:
        """Initializes and warms up the indexing and rag pipeline."""
        self.deploy_rag_pipe()
        self.deploy_indexing_pipe()

    def deploy_rag_pipe(self) -> None:
        """Initializes and warms up the rag pipeline."""
        self.rag_pipe.warm_up()

    def deploy_indexing_pipe(self) -> None:
        """Initializes and warms up the indexing pipeline."""
        self.indexing_pipe.warm_up()

    def _check_rag_config_requirements(self) -> None:
        """Validating rag configuration."""
        self._check_rag_component_requirements()
        self._check_rag_metadata_requirements()

    def _check_rag_metadata_requirements(self) -> None:
        """Validating metadata in rag configuration."""
        for key in self._required_metadata_keys:
            if key not in self.rag_pipe.metadata:
                raise KeyError(f"Missing {key=} in `self.rag_config['metadata']`")

    def _check_rag_component_requirements(self) -> None:
        """Validating rag configuration."""
        for key in self._required_components:
            if key not in self.rag_pipe.graph.nodes():
                raise KeyError(f"Missing component {key=} in `self.rag_pipe`")

    def _compute_doc_hash(
        self, document: Document | UploadFile | list[Document] | list[UploadFile]
    ) -> str | list[str]:
        """Compute a hash for the given document(s).

        Args:
            document (IngestionDocument | Document | UploadFile | bytes | list):
                The document or list of documents for which to compute the hash.

        Returns:
            str | list[str]: SHA256 hash or list of SHA256 hashes for each document.

        Raises:
            TypeError: If input is not a supported type.
        """

        def _single_doc_hash(doc: object) -> str:
            if isinstance(doc, Document):
                return self._compute_doc_hash_haystack_document(doc)
            elif isinstance(doc, UploadFile):
                return self._compute_doc_hash_uploadfile(doc)
            else:
                raise TypeError(
                    f"Input must be of type Document or UploadFile or a list of these types "
                    f"but is {type(doc)}"
                )

        if isinstance(document, list):
            return [_single_doc_hash(doc) for doc in document]
        else:
            return _single_doc_hash(document)

    def _compute_doc_hash_haystack_document(
        self, document: Document
    ) -> list[str] | str:
        """Compute a hash for the given documents.

        Args:
            document: IngestionDocument: The documents for which to compute the hash.

        Returns:
            str: List of SHA256 hashes for each document.
        """
        try:
            data = f"{document.content}"
            doc_hash = hashlib.sha256(data.encode("utf-8")).hexdigest()
        except Exception as e:
            logger.error(f"Compute document hash for Haystack Document failed. {e}")

        return doc_hash

    def _compute_doc_hash_uploadfile(
        self, document: list[UploadFile] | UploadFile
    ) -> list[str] | str:
        """Compute a hash for the given documents.

        Args:
            document: | UploadFile: The documents for which to compute the hash.

        Returns:
            str: List of SHA256 hashes for each document.
        """
        try:
            document.file.seek(0)
            data = f"{document.file.read()}{document.filename}"
            docs_hash = hashlib.sha256(data.encode("utf-8")).hexdigest()
        except Exception as e:
            logger.error(f"Compute document hash for Uploadfile failed. {e}")

        return docs_hash

    def add_llm_router_arg_inplace(
        self,
        data: dict,
        language_model_key: str = None,
    ) -> None:
        """Select the answer generation pipeline that corresponds to the language model.

        Args:
            language_model_key (str): The language model to select.
            data (dict): Data dictionary for the haystack pipeline.

        Raises:
            KeyError: If the key is not found in the valid LLM generator keys.
        """
        self._validate_language_model_key(language_model_key)

        # add language_model to data
        if "llm_router" in self.rag_pipe.graph.nodes():
            data.update({"llm_router": {"generator_model": language_model_key}})

    def _validate_language_model_key(self, language_model_key: str) -> None:
        """Validates that the provided language model key exists in the list of valid LLM generator keys.

        Args:
            language_model_key (str): The key to validate.

        Raises:
            KeyError: If the key is not found in the valid LLM generator keys.
        """
        if language_model_key not in self.llm_generators:
            raise KeyError(f"{language_model_key=} is not in pipeline-llms")

    def _check_if_file_id_exists(self, doc_id: str) -> bool:
        """Check if a document with the given ID already exists in the document store.

        Args:
            doc_id (str): The ID of the document to check.

        Returns:
            bool: True if a document with the given ID exists, False otherwise.

        Raises:
            AttributeError: If the document store is not initialized.
        """
        filters = {"field": "meta.source_id", "operator": "==", "value": doc_id}

        if self.rag_pipe is None:
            raise AttributeError(
                "`self.rag_pipe` is not deployed. Please run `self.deploy_rag_pipe`"
            )

        existing_docs = self.id_retriever.run(filters=filters)
        return len(existing_docs["document_ids"]) > 0, existing_docs["documents"]

    def _update_ingestion_date(
        self, existing_docs: list[Document], ingestion_date: datetime
    ) -> None:
        """Update the ingestion date of existing documents in the document store."""
        document_store = self.indexing_pipe.get_component(
            "document_writer"
        ).document_store

        es_client = document_store.client

        for doc in existing_docs:
            response = es_client.update(
                index=document_store._index,
                id=doc.id,
                body={"doc": {"meta.ingestion_date": ingestion_date}},
            )
            if response["_shards"]["failed"] > 0:
                logger.error("Error updating ingestion date. Nothing updated.")

    def _update_all_meta_data(self, existing_docs: list[Document], meta: dict) -> None:
        document_store = self.indexing_pipe.get_component(
            "document_writer"
        ).document_store

        es_client = document_store.client

        for doc in existing_docs:
            flattened_meta = {f"meta.{key}": value for key, value in meta.items()}
            response = es_client.update(
                index=document_store._index, id=doc.id, body={"doc": flattened_meta}
            )
            if response["_shards"]["failed"] > 0:
                logger.error("Error updating meta data. Nothing updated.")
add_llm_router_arg_inplace
add_llm_router_arg_inplace(data, language_model_key=None)

Select the answer generation pipeline that corresponds to the language model.

PARAMETER DESCRIPTION
language_model_key

The language model to select.

TYPE: str DEFAULT: None

data

Data dictionary for the haystack pipeline.

TYPE: dict

RAISES DESCRIPTION
KeyError

If the key is not found in the valid LLM generator keys.

Source code in docs/microservices/rag/src/rag/pipelines/baseclass_rag.py
def add_llm_router_arg_inplace(
    self,
    data: dict,
    language_model_key: str = None,
) -> None:
    """Select the answer generation pipeline that corresponds to the language model.

    Args:
        language_model_key (str): The language model to select.
        data (dict): Data dictionary for the haystack pipeline.

    Raises:
        KeyError: If the key is not found in the valid LLM generator keys.
    """
    self._validate_language_model_key(language_model_key)

    # add language_model to data
    if "llm_router" in self.rag_pipe.graph.nodes():
        data.update({"llm_router": {"generator_model": language_model_key}})
deploy_indexing_pipe
deploy_indexing_pipe()

Initializes and warms up the indexing pipeline.

Source code in docs/microservices/rag/src/rag/pipelines/baseclass_rag.py
def deploy_indexing_pipe(self) -> None:
    """Initializes and warms up the indexing pipeline."""
    self.indexing_pipe.warm_up()
deploy_pipelines
deploy_pipelines()

Initializes and warms up the indexing and rag pipeline.

Source code in docs/microservices/rag/src/rag/pipelines/baseclass_rag.py
def deploy_pipelines(self) -> None:
    """Initializes and warms up the indexing and rag pipeline."""
    self.deploy_rag_pipe()
    self.deploy_indexing_pipe()
deploy_rag_pipe
deploy_rag_pipe()

Initializes and warms up the rag pipeline.

Source code in docs/microservices/rag/src/rag/pipelines/baseclass_rag.py
def deploy_rag_pipe(self) -> None:
    """Initializes and warms up the rag pipeline."""
    self.rag_pipe.warm_up()
generate_answer abstractmethod
generate_answer(query, language_model)

Run the RAG pipeline to generate an answer based on a query.

This method processes the input query through a Retrieval-Augmented Generation (RAG) pipeline, which retrieves relevant documents and generates an answer based on that context.

PARAMETER DESCRIPTION
query

The input query for which an answer is to be generated.

TYPE: str

language_model

The model used for generating the answer.

TYPE: str

RETURNS DESCRIPTION
tuple[str, list[Document]]

A tuple containing the generated answer and the retrieved documents.

Source code in docs/microservices/rag/src/rag/pipelines/baseclass_rag.py
@abstractmethod
def generate_answer(
    self, query: str, language_model: str
) -> tuple[str, list[Document]]:
    """Run the RAG pipeline to generate an answer based on a query.

    This method processes the input query through a Retrieval-Augmented Generation (RAG) pipeline,
    which retrieves relevant documents and generates an answer based on that context.

    Args:
        query (str): The input query for which an answer is to be generated.
        language_model (str): The model used for generating the answer.

    Returns:
        A tuple containing the generated answer and the retrieved documents.
    """
    answer = "here is my answer"
    documents = [
        Document(
            content="this are the retrieved documents",
            meta={"source": "test", "title": "my_title"},
        )
    ]
    return answer, documents
indexing abstractmethod
indexing(files)

Run the file indexing pipeline on the provided files and add ingestion_date as metadata.

PARAMETER DESCRIPTION
files

A list of byte streams representing files.

TYPE: list[bytes]

RETURNS DESCRIPTION
dict[str, object]

The results of the indexing pipeline.

RAISES DESCRIPTION
AttributeError

If the indexing pipeline is not deployed.

Source code in docs/microservices/rag/src/rag/pipelines/baseclass_rag.py
@abstractmethod
def indexing(self, files: list[bytes]) -> dict[str, object]:
    """Run the file indexing pipeline on the provided files and add ingestion_date as metadata.

    Args:
        files (list[bytes]): A list of byte streams representing files.

    Returns:
        The results of the indexing pipeline.

    Raises:
        AttributeError: If the indexing pipeline is not deployed.
    """
    result = {"indexing_information": "some information"}
    return result
components

Implementation of the pipeline components of the RAG.

MODULE DESCRIPTION
document_deleter

This module implements a haystack pipeline component that deletes documents from a DocumentStore.

document_id_retriever

This module provides an implementation of a haystack pipeline component that retrieves IDs of documents.

f13_openai_chat_generator

This module provides an adaption of the haystack OpenAIGenerator for F13.

f13_openai_document_embedder

This module provides an implementation of a haystack-pipeline-component for document embedding.

f13_openai_text_embedder

A component for embedding strings using OpenAI-conform API.

id_setter

This module implements a haystack pipeline component that ssets the document_id hash.

document_deleter

This module implements a haystack pipeline component that deletes documents from a DocumentStore.

CLASS DESCRIPTION
DocumentDeleter

Deletes documents from a DocumentStore.

DocumentDeleter

Deletes documents from a DocumentStore.

This component provides deletion capabilities for documents stored in a Haystack DocumentStore.

ATTRIBUTE DESCRIPTION
document_store

The DocumentStore instance from which documents will be deleted.

TYPE: DocumentStore

METHOD DESCRIPTION
from_dict

Deserializes the component from a dictionary.

run

Run the DocumentWriter on the given input data.

to_dict

Serializes the component to a dictionary.

Source code in docs/microservices/rag/src/rag/pipelines/components/document_deleter.py
@component
class DocumentDeleter:
    """Deletes documents from a DocumentStore.

    This component provides deletion capabilities for documents stored in a Haystack DocumentStore.

    Attributes:
        document_store (DocumentStore): The DocumentStore instance from which documents will be deleted.
    """

    def __init__(self, document_store: DocumentStore) -> None:
        """Create a DocumentDeleter component.

        Args:
            document_store (DocumentStore): The DocumentStore where the documents are to be written.
        """
        self.document_store = document_store

    def to_dict(self) -> dict[str, Any]:
        """Serializes the component to a dictionary.

        Returns:
            Dictionary with serialized data.
        """
        return default_to_dict(self, document_store=self.document_store.to_dict())

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> "DocumentDeleter":
        """Deserializes the component from a dictionary.

        Args:
            data (dict[str, Any]): The dictionary to deserialize from.

        Returns:
            The deserialized component.

        Raises:
            DeserializationError: If the document store is not properly specified in
                the serialization data or its type cannot be imported.
        """
        init_params = data.get("init_parameters", {})
        if "document_store" not in init_params:
            raise DeserializationError("Missing 'document_store' in serialization data")
        if "type" not in init_params["document_store"]:
            raise DeserializationError(
                "Missing 'type' in document store's serialization data"
            )

        try:
            module_name, type_ = init_params["document_store"]["type"].rsplit(".", 1)
            logger.debug(f"Trying to import module '{module_name}'")
            module = importlib.import_module(module_name)
        except (ImportError, DeserializationError) as e:
            raise DeserializationError(
                f"DocumentStore of type '{init_params['document_store']['type']}' not correctly imported"
            ) from e

        docstore_class = getattr(module, type_)
        docstore = docstore_class.from_dict(init_params["document_store"])

        data["init_parameters"]["document_store"] = docstore
        return default_from_dict(cls, data)

    @component.output_types(documents_deleted=int)
    def run(self, document_ids: list[str]) -> dict[str, int]:
        """Run the DocumentWriter on the given input data.

        Args:
            document_ids (list[str]): A list of document IDs to delete from the store.

        Returns:
            A dictionary containing the number of documents deleted.

        Raises:
            ValueError: If the specified document store is not found.
        """
        self.document_store.delete_documents(document_ids=document_ids)

        return {"documents_deleted": len(document_ids)}
from_dict classmethod
from_dict(data)

Deserializes the component from a dictionary.

PARAMETER DESCRIPTION
data

The dictionary to deserialize from.

TYPE: dict[str, Any]

RETURNS DESCRIPTION
DocumentDeleter

The deserialized component.

RAISES DESCRIPTION
DeserializationError

If the document store is not properly specified in the serialization data or its type cannot be imported.

Source code in docs/microservices/rag/src/rag/pipelines/components/document_deleter.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "DocumentDeleter":
    """Deserializes the component from a dictionary.

    Args:
        data (dict[str, Any]): The dictionary to deserialize from.

    Returns:
        The deserialized component.

    Raises:
        DeserializationError: If the document store is not properly specified in
            the serialization data or its type cannot be imported.
    """
    init_params = data.get("init_parameters", {})
    if "document_store" not in init_params:
        raise DeserializationError("Missing 'document_store' in serialization data")
    if "type" not in init_params["document_store"]:
        raise DeserializationError(
            "Missing 'type' in document store's serialization data"
        )

    try:
        module_name, type_ = init_params["document_store"]["type"].rsplit(".", 1)
        logger.debug(f"Trying to import module '{module_name}'")
        module = importlib.import_module(module_name)
    except (ImportError, DeserializationError) as e:
        raise DeserializationError(
            f"DocumentStore of type '{init_params['document_store']['type']}' not correctly imported"
        ) from e

    docstore_class = getattr(module, type_)
    docstore = docstore_class.from_dict(init_params["document_store"])

    data["init_parameters"]["document_store"] = docstore
    return default_from_dict(cls, data)
run
run(document_ids)

Run the DocumentWriter on the given input data.

PARAMETER DESCRIPTION
document_ids

A list of document IDs to delete from the store.

TYPE: list[str]

RETURNS DESCRIPTION
dict[str, int]

A dictionary containing the number of documents deleted.

RAISES DESCRIPTION
ValueError

If the specified document store is not found.

Source code in docs/microservices/rag/src/rag/pipelines/components/document_deleter.py
@component.output_types(documents_deleted=int)
def run(self, document_ids: list[str]) -> dict[str, int]:
    """Run the DocumentWriter on the given input data.

    Args:
        document_ids (list[str]): A list of document IDs to delete from the store.

    Returns:
        A dictionary containing the number of documents deleted.

    Raises:
        ValueError: If the specified document store is not found.
    """
    self.document_store.delete_documents(document_ids=document_ids)

    return {"documents_deleted": len(document_ids)}
to_dict
to_dict()

Serializes the component to a dictionary.

RETURNS DESCRIPTION
dict[str, Any]

Dictionary with serialized data.

Source code in docs/microservices/rag/src/rag/pipelines/components/document_deleter.py
def to_dict(self) -> dict[str, Any]:
    """Serializes the component to a dictionary.

    Returns:
        Dictionary with serialized data.
    """
    return default_to_dict(self, document_store=self.document_store.to_dict())
document_id_retriever

This module provides an implementation of a haystack pipeline component that retrieves IDs of documents.

CLASS DESCRIPTION
DocumentIDRetriever

Retrieves document_ids from a DocumentStore.

DocumentIDRetriever

Retrieves document_ids from a DocumentStore.

ATTRIBUTE DESCRIPTION
document_store

The DocumentStore instance from which document IDs are retrieved.

TYPE: DocumentStore

METHOD DESCRIPTION
from_dict

Deserializes the component from a dictionary.

run

Run the DocumentIDRetriever filters.

to_dict

Serializes the component to a dictionary.

Source code in docs/microservices/rag/src/rag/pipelines/components/document_id_retriever.py
@component
class DocumentIDRetriever:
    """Retrieves document_ids from a DocumentStore.

    Attributes:
        document_store (DocumentStore): The DocumentStore instance from which document IDs are retrieved.
    """

    def __init__(self, document_store: DocumentStore) -> None:
        """Create a DocumentIDRetriever component.

        Args:
            document_store (DocumentStore): The DocumentStore where the documents are to be written.
        """
        self.document_store = document_store

    def to_dict(self) -> dict[str, Any]:
        """Serializes the component to a dictionary.

        Returns:
            Dictionary with serialized data.
        """
        return default_to_dict(self, document_store=self.document_store.to_dict())

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> "DocumentIDRetriever":
        """Deserializes the component from a dictionary.

        Args:
            data (dict[str, Any]): The dictionary to deserialize from.

        Returns:
            The deserialized component.

        Raises:
            DeserializationError: If the document store is not properly specified in the serialization data
                or its type cannot be imported.
        """
        init_params = data.get("init_parameters", {})
        if "document_store" not in init_params:
            raise DeserializationError("Missing 'document_store' in serialization data")
        if "type" not in init_params["document_store"]:
            raise DeserializationError(
                "Missing 'type' in document store's serialization data"
            )

        try:
            module_name, type_ = init_params["document_store"]["type"].rsplit(".", 1)
            logger.debug(f"Trying to import module '{module_name}'")
            module = importlib.import_module(module_name)
        except (ImportError, DeserializationError) as e:
            raise DeserializationError(
                f"DocumentStore of type '{init_params['document_store']['type']}' not correctly imported"
            ) from e

        docstore_class = getattr(module, type_)
        docstore = docstore_class.from_dict(init_params["document_store"])

        data["init_parameters"]["document_store"] = docstore
        return default_from_dict(cls, data)

    @component.output_types(document_ids=list[str])
    def run(self, filters: dict[str, Any] | None = None) -> dict[str, list[str]]:
        """Run the DocumentIDRetriever filters.

        Args:
            filters (dict[str, Any] | None): Filters applied to the retrieved `Document`s.

        Raises:
            ValueError: If the specified document store is not found.
        """
        docs = self.document_store.filter_documents(filters=filters)
        document_ids = [doc.id for doc in docs]
        return {"document_ids": document_ids, "documents": docs}
from_dict classmethod
from_dict(data)

Deserializes the component from a dictionary.

PARAMETER DESCRIPTION
data

The dictionary to deserialize from.

TYPE: dict[str, Any]

RETURNS DESCRIPTION
DocumentIDRetriever

The deserialized component.

RAISES DESCRIPTION
DeserializationError

If the document store is not properly specified in the serialization data or its type cannot be imported.

Source code in docs/microservices/rag/src/rag/pipelines/components/document_id_retriever.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "DocumentIDRetriever":
    """Deserializes the component from a dictionary.

    Args:
        data (dict[str, Any]): The dictionary to deserialize from.

    Returns:
        The deserialized component.

    Raises:
        DeserializationError: If the document store is not properly specified in the serialization data
            or its type cannot be imported.
    """
    init_params = data.get("init_parameters", {})
    if "document_store" not in init_params:
        raise DeserializationError("Missing 'document_store' in serialization data")
    if "type" not in init_params["document_store"]:
        raise DeserializationError(
            "Missing 'type' in document store's serialization data"
        )

    try:
        module_name, type_ = init_params["document_store"]["type"].rsplit(".", 1)
        logger.debug(f"Trying to import module '{module_name}'")
        module = importlib.import_module(module_name)
    except (ImportError, DeserializationError) as e:
        raise DeserializationError(
            f"DocumentStore of type '{init_params['document_store']['type']}' not correctly imported"
        ) from e

    docstore_class = getattr(module, type_)
    docstore = docstore_class.from_dict(init_params["document_store"])

    data["init_parameters"]["document_store"] = docstore
    return default_from_dict(cls, data)
run
run(filters=None)

Run the DocumentIDRetriever filters.

PARAMETER DESCRIPTION
filters

Filters applied to the retrieved Documents.

TYPE: dict[str, Any] | None DEFAULT: None

RAISES DESCRIPTION
ValueError

If the specified document store is not found.

Source code in docs/microservices/rag/src/rag/pipelines/components/document_id_retriever.py
@component.output_types(document_ids=list[str])
def run(self, filters: dict[str, Any] | None = None) -> dict[str, list[str]]:
    """Run the DocumentIDRetriever filters.

    Args:
        filters (dict[str, Any] | None): Filters applied to the retrieved `Document`s.

    Raises:
        ValueError: If the specified document store is not found.
    """
    docs = self.document_store.filter_documents(filters=filters)
    document_ids = [doc.id for doc in docs]
    return {"document_ids": document_ids, "documents": docs}
to_dict
to_dict()

Serializes the component to a dictionary.

RETURNS DESCRIPTION
dict[str, Any]

Dictionary with serialized data.

Source code in docs/microservices/rag/src/rag/pipelines/components/document_id_retriever.py
def to_dict(self) -> dict[str, Any]:
    """Serializes the component to a dictionary.

    Returns:
        Dictionary with serialized data.
    """
    return default_to_dict(self, document_store=self.document_store.to_dict())
f13_openai_chat_generator

This module provides an adaption of the haystack OpenAIGenerator for F13.

CLASS DESCRIPTION
F13OpenAIChatGenerator

This class is an adaption of the haystack OpenAIGenerator for F13.

F13OpenAIChatGenerator

Bases: OpenAIChatGenerator

This class is an adaption of the haystack OpenAIGenerator for F13.

ATTRIBUTE DESCRIPTION
timeout

Timeout duration (in seconds) for OpenAI client calls.

TYPE: float

max_retries

Maximum number of retries on OpenAI internal errors.

TYPE: int

client

Synchronous OpenAI client with custom authentication.

TYPE: OpenAI

async_client

Asynchronous OpenAI client with custom authentication.

TYPE: AsyncOpenAI

Source code in docs/microservices/rag/src/rag/pipelines/components/f13_openai_chat_generator.py
@component
class F13OpenAIChatGenerator(OpenAIChatGenerator):
    """This class is an adaption of the haystack OpenAIGenerator for F13.

    Attributes:
        timeout (float): Timeout duration (in seconds) for OpenAI client calls.
        max_retries (int): Maximum number of retries on OpenAI internal errors.
        client (OpenAI): Synchronous OpenAI client with custom authentication.
        async_client (AsyncOpenAI): Asynchronous OpenAI client with custom authentication.
    """

    def __init__(
        self,
        llm: LLM,
        streaming_callback: Callable[[StreamingChunk], None] | None = None,
        generation_kwargs: dict[str, Any] | None = None,
        timeout: float = 180,
        max_retries: int = 5,
    ) -> None:
        """Creates an instance of F13OpenAIGenerator.

        By setting the 'OPENAI_TIMEOUT' and 'OPENAI_MAX_RETRIES' environment variables,
        you can change the timeout and max_retries parameters in the OpenAI client.

        Args:
            llm (LLM): The language model (pydantic object) to use.
            streaming_callback (Callable[[StreamingChunk], None], optional):
                A callback function called when a new token is received from the stream.
            generation_kwargs (dict[str, Any], optional): Additional parameters sent directly to the OpenAI endpoint.
                Supported parameters include:
                    - max_tokens (int): Max tokens for output.
                    - temperature (float): Sampling temperature for creativity.
                    - top_p (float): Nucleus sampling probability mass.
                    - n (int): Number of completions per prompt.
                    - stop (str or list): Sequence(s) where generation stops.
                    - presence_penalty (float): Penalty for token presence to reduce repetition.
                    - frequency_penalty (float): Penalty for token frequency to reduce repetition.
                    - logit_bias (dict): Bias added to specific tokens.
            timeout (float, optional): Timeout for OpenAI client calls.
                Defaults to 600 seconds or is inferred from `OPENAI_TIMEOUT` env variable.
            max_retries (int, optional): Maximum retries on OpenAI internal errors.
                Defaults to 5 or is inferred from `OPENAI_MAX_RETRIES` env variable.
        """
        super(F13OpenAIChatGenerator, self).__init__(  # noqa: UP008, because haystack components need this init
            api_key=Secret.from_token("UNUSED_TOKEN"),
            model=llm.model,
            streaming_callback=streaming_callback,
            api_base_url=str(llm.api.url),
            generation_kwargs=generation_kwargs,
            max_retries=max_retries,
            timeout=timeout,
        )

        self.timeout = timeout
        self.max_retries = max_retries

        if llm.api.auth:
            auth_header = llm.api.auth.get_auth_header()
            auth_client = CustomAuthClient(auth_header=auth_header)
            async_auth_client = CustomAsyncAuthClient(auth_header=auth_header)
        else:
            auth_client = CustomAuthClient(auth_header=None)
            async_auth_client = CustomAsyncAuthClient(auth_header=None)

        self.client = OpenAI(
            http_client=auth_client,
            api_key="",
            base_url=str(llm.api.url),
            timeout=self.timeout,
            max_retries=self.max_retries,
        )

        self.async_client = AsyncOpenAI(
            http_client=async_auth_client,
            api_key="",
            base_url=str(llm.api.url),
            timeout=self.timeout,
            max_retries=self.max_retries,
        )
f13_openai_document_embedder

This module provides an implementation of a haystack-pipeline-component for document embedding.

CLASS DESCRIPTION
F13OpenAIDocumentEmbedder

A component for computing Document embeddings using OpenAI-conform API.

F13OpenAIDocumentEmbedder

Bases: OpenAIDocumentEmbedder

A component for computing Document embeddings using OpenAI-conform API.

Usage example:

from haystack import Document
import F13OpenAIDocumentEmbedder

doc = Document(content="I love pizza!")

document_embedder = F13OpenAIDocumentEmbedder()

result = document_embedder.run([doc])
print(result["documents"][0].embedding)

# [0.017020374536514282, -0.023255806416273117, ...]

ATTRIBUTE DESCRIPTION
timeout

Timeout duration (in seconds) for OpenAI client calls.

TYPE: float

max_retries

Maximum number of retries on OpenAI internal errors.

TYPE: int

client

Synchronous OpenAI client with custom authentication.

TYPE: OpenAI

async_client

Asynchronous OpenAI client with custom authentication.

TYPE: AsyncOpenAI

Source code in docs/microservices/rag/src/rag/pipelines/components/f13_openai_document_embedder.py
@component
class F13OpenAIDocumentEmbedder(OpenAIDocumentEmbedder):
    """A component for computing Document embeddings using OpenAI-conform API.

    Usage example:
    ```python
    from haystack import Document
    import F13OpenAIDocumentEmbedder

    doc = Document(content="I love pizza!")

    document_embedder = F13OpenAIDocumentEmbedder()

    result = document_embedder.run([doc])
    print(result["documents"][0].embedding)

    # [0.017020374536514282, -0.023255806416273117, ...]
    ```

    Attributes:
        timeout (float): Timeout duration (in seconds) for OpenAI client calls.
        max_retries (int): Maximum number of retries on OpenAI internal errors.
        client (OpenAI): Synchronous OpenAI client with custom authentication.
        async_client (AsyncOpenAI): Asynchronous OpenAI client with custom authentication.
    """

    def __init__(
        self,
        embedding_model: EmbeddingModel,
        meta_fields_to_embed: list[str] | None = None,
        embedding_separator: str = "\n",
        timeout: float = 60,
        max_retries: int = 5,
    ) -> None:
        """Create a F13OpenAIDocumentEmbedder component.

        By setting the 'OPENAI_TIMEOUT' and 'OPENAI_MAX_RETRIES' environment variables,
        you can change the timeout and max_retries parameters in the OpenAI client.

        Args:
            embedding_model (EmbeddingModel): EmbeddingModel pydantic object.
            meta_fields_to_embed (list of str or None): List of metadata fields
                to embed along with the document text.
            embedding_separator (str): Separator used to concatenate metadata fields
                to the document text.
            timeout (float or None): Timeout for OpenAI client calls.
                If not set, inferred from `OPENAI_TIMEOUT` env variable or defaults to 30.
            max_retries (int or None): Maximum retries on OpenAI internal errors.
                            If not set, inferred from `OPENAI_MAX_RETRIES` env variable or defaults to 5.
        """
        super(F13OpenAIDocumentEmbedder, self).__init__(  # noqa: UP008, because haystack components need this init
            api_key=Secret.from_token("UNUSED_TOKEN"),
            model=embedding_model.model,
            api_base_url=str(embedding_model.api.url),
            meta_fields_to_embed=meta_fields_to_embed or [],
            embedding_separator=embedding_separator,
            timeout=timeout,
            max_retries=max_retries,
        )

        if embedding_model.api.auth:
            auth_header = embedding_model.api.auth.get_auth_header()
            auth_client = CustomAuthClient(auth_header=auth_header)
            async_auth_client = CustomAsyncAuthClient(auth_header=auth_header)
        else:
            auth_client = CustomAuthClient(auth_header=None)
            async_auth_client = CustomAsyncAuthClient(auth_header=None)

        self.client = OpenAI(
            http_client=auth_client,
            api_key="",
            base_url=str(embedding_model.api.url),
            timeout=self.timeout,
            max_retries=self.max_retries,
        )

        self.async_client = AsyncOpenAI(
            http_client=async_auth_client,
            api_key="",
            base_url=str(embedding_model.api.url),
            timeout=self.timeout,
            max_retries=self.max_retries,
        )
f13_openai_text_embedder

A component for embedding strings using OpenAI-conform API.

CLASS DESCRIPTION
F13OpenAITextEmbedder

A component for embedding strings using OpenAI-conform API.

F13OpenAITextEmbedder

Bases: OpenAITextEmbedder

A component for embedding strings using OpenAI-conform API.

Usage example:

import F13OpenAITextEmbedder

text_to_embed = "I love pizza!"

text_embedder = F13OpenAITextEmbedder()

print(text_embedder.run(text_to_embed))

# {'embedding': [0.017020374536514282, -0.023255806416273117, ...],
# 'meta': {'model': 'text-embedding-ada-002-v2',
#          'usage': {'prompt_tokens': 4, 'total_tokens': 4}}}

ATTRIBUTE DESCRIPTION
timeout

Timeout duration (in seconds) for OpenAI client calls.

TYPE: float

max_retries

Maximum number of retries on OpenAI internal errors.

TYPE: int

client

Synchronous OpenAI client with custom authentication.

TYPE: OpenAI

async_client

Asynchronous OpenAI client with custom authentication.

TYPE: AsyncOpenAI

Source code in docs/microservices/rag/src/rag/pipelines/components/f13_openai_text_embedder.py
@component
class F13OpenAITextEmbedder(OpenAITextEmbedder):
    """A component for embedding strings using OpenAI-conform API.

    Usage example:
    ```python
    import F13OpenAITextEmbedder

    text_to_embed = "I love pizza!"

    text_embedder = F13OpenAITextEmbedder()

    print(text_embedder.run(text_to_embed))

    # {'embedding': [0.017020374536514282, -0.023255806416273117, ...],
    # 'meta': {'model': 'text-embedding-ada-002-v2',
    #          'usage': {'prompt_tokens': 4, 'total_tokens': 4}}}
    ```

    Attributes:
        timeout (float): Timeout duration (in seconds) for OpenAI client calls.
        max_retries (int): Maximum number of retries on OpenAI internal errors.
        client (OpenAI): Synchronous OpenAI client with custom authentication.
        async_client (AsyncOpenAI): Asynchronous OpenAI client with custom authentication.
    """

    def __init__(
        self,
        embedding_model: EmbeddingModel,
        timeout: float = 60,
        max_retries: int = 5,
    ) -> None:
        """Create an F13OpenAITextEmbedder component.

        By setting the 'OPENAI_TIMEOUT' and 'OPENAI_MAX_RETRIES' environment variables,
        you can change the timeout and max_retries parameters in the OpenAI client.

        Args:
            embedding_model (EmbeddingModel): EmbeddingModel pydantic object.
            timeout (int, optional): Timeout for OpenAI client calls. If not set, inferred from
                the `OPENAI_TIMEOUT` environment variable or defaults to 30.
            max_retries (int, optional): Maximum retries to establish contact with OpenAI if it
                returns an internal error. If not set, inferred from the `OPENAI_MAX_RETRIES`
                environment variable or defaults to 5.
        """
        super(F13OpenAITextEmbedder, self).__init__(  # noqa: UP008, because haystack components need this init
            model=embedding_model.model,
            api_base_url=str(embedding_model.api.url),
            api_key=Secret.from_token("UNUSED_TOKEN"),
            max_retries=max_retries,
            timeout=timeout,
        )

        if embedding_model.api.auth:
            auth_header = embedding_model.api.auth.get_auth_header()
            auth_client = CustomAuthClient(auth_header=auth_header)
            async_auth_client = CustomAsyncAuthClient(auth_header=auth_header)
        else:
            auth_client = CustomAuthClient(auth_header=None)
            async_auth_client = CustomAsyncAuthClient(auth_header=None)

        self.client = OpenAI(
            http_client=auth_client,
            api_key="",
            base_url=str(embedding_model.api.url),
            timeout=self.timeout,
            max_retries=self.max_retries,
        )

        self.async_client = AsyncOpenAI(
            http_client=async_auth_client,
            api_key="",
            base_url=str(embedding_model.api.url),
            timeout=self.timeout,
            max_retries=self.max_retries,
        )
id_setter

This module implements a haystack pipeline component that ssets the document_id hash.

CLASS DESCRIPTION
IDSetter

Sets the document_id hash.

IDSetter

Sets the document_id hash.

ATTRIBUTE DESCRIPTION
ignore_keys_for_doc_hash

Metadata keys to ignore when creating the document hash.

TYPE: list[str]

METHOD DESCRIPTION
run

Process documents, merge YAML metadata and return with IDs.

Source code in docs/microservices/rag/src/rag/pipelines/components/id_setter.py
@component
class IDSetter:
    """Sets the document_id hash.

    Attributes:
        ignore_keys_for_doc_hash (list[str]): Metadata keys to ignore when creating the document hash.
    """

    def __init__(self, ignore_keys_for_doc_hash: list[str] = None) -> None:
        """Initialize the IDSetter.

        Args:
            ignore_keys_for_doc_hash (list[str]): All metadata keys that should be ignored when creating the ID hash.
        """
        self.ignore_keys_for_doc_hash = (
            ignore_keys_for_doc_hash if ignore_keys_for_doc_hash is not None else []
        )

    @component.output_types(documents=list[Document], document_ids=list[str])
    def run(
        self, documents: list[Document], doc_ids: list[str] = None
    ) -> dict[str, list[Document] | list[str]]:
        """Process documents, merge YAML metadata and return with IDs.

        Process the list of documents to merge YAML metadata and return processed documents along with
        IDs of documents missing corresponding YAML metadata.

        Args:
            documents (list[Document]): List of Document objects to process.
            doc_ids (list[str], optional): List of document IDs to set as the document IDs. Defaults to None.

        Returns:
            A dictionary containing:
                - documents: List of Document objects with merged metadata.
                - document_ids: List of document IDs.
        """
        final_doc_ids = []
        input_doc_ids = doc_ids or [None] * len(documents)
        for doc, precomputed_id in zip(documents, input_doc_ids):
            if precomputed_id:
                doc.id = precomputed_id
            else:
                meta = {
                    k: v
                    for k, v in doc.meta.items()
                    if k not in self.ignore_keys_for_doc_hash
                }
                doc.id = self._create_id(content=doc.content, meta=meta)
            final_doc_ids.append(doc.id)

        return {"documents": documents, "document_ids": final_doc_ids}

    @staticmethod
    def _create_id(content: str, meta: dict[str, Any] | str, **kwargs: object) -> str:
        """Create a unique SHA-256 hash ID based on the given content, metadata, and additional keyword arguments.

        Args:
            content (str): The content to hash (usually the document content).
            meta (dict or str): The metadata to hash.
            **kwargs (object): Additional keyword arguments to include in the hash.

        Returns:
            A SHA-256 hash string uniquely identifying the input data.
        """
        data = f"{content}{meta}{kwargs}"
        return hashlib.sha256(data.encode("utf-8")).hexdigest()
run
run(documents, doc_ids=None)

Process documents, merge YAML metadata and return with IDs.

Process the list of documents to merge YAML metadata and return processed documents along with IDs of documents missing corresponding YAML metadata.

PARAMETER DESCRIPTION
documents

List of Document objects to process.

TYPE: list[Document]

doc_ids

List of document IDs to set as the document IDs. Defaults to None.

TYPE: list[str] DEFAULT: None

RETURNS DESCRIPTION
dict[str, list[Document] | list[str]]

A dictionary containing: - documents: List of Document objects with merged metadata. - document_ids: List of document IDs.

Source code in docs/microservices/rag/src/rag/pipelines/components/id_setter.py
@component.output_types(documents=list[Document], document_ids=list[str])
def run(
    self, documents: list[Document], doc_ids: list[str] = None
) -> dict[str, list[Document] | list[str]]:
    """Process documents, merge YAML metadata and return with IDs.

    Process the list of documents to merge YAML metadata and return processed documents along with
    IDs of documents missing corresponding YAML metadata.

    Args:
        documents (list[Document]): List of Document objects to process.
        doc_ids (list[str], optional): List of document IDs to set as the document IDs. Defaults to None.

    Returns:
        A dictionary containing:
            - documents: List of Document objects with merged metadata.
            - document_ids: List of document IDs.
    """
    final_doc_ids = []
    input_doc_ids = doc_ids or [None] * len(documents)
    for doc, precomputed_id in zip(documents, input_doc_ids):
        if precomputed_id:
            doc.id = precomputed_id
        else:
            meta = {
                k: v
                for k, v in doc.meta.items()
                if k not in self.ignore_keys_for_doc_hash
            }
            doc.id = self._create_id(content=doc.content, meta=meta)
        final_doc_ids.append(doc.id)

    return {"documents": documents, "document_ids": final_doc_ids}
database_rag

This module implements a RAG system based on database retrievals as data source.

CLASS DESCRIPTION
DatabaseRagPipe

Initializes an instance of the f13-server specific DatabaseRagPipe for database-based retrieval operations.

DatabaseRagPipe

Bases: BaseRagPipe

Initializes an instance of the f13-server specific DatabaseRagPipe for database-based retrieval operations.

This class extends BaseRagPipe and loads pipelines defined in configs/rag_pipeline_config.yml.

ATTRIBUTE DESCRIPTION
rag_type

Identifier for the RAG type used in this pipeline ("database").

TYPE: str

name

The name of the pipeline instance (inherited from BaseRagPipe).

TYPE: str

rag_pipe

The haystack RAG pipeline (inherited).

TYPE: AsyncPipeline

indexing_pipe

The haystack indexing pipeline (inherited).

TYPE: AsyncPipeline

llm_generators

Dictionary of LLM generation configurations from pipeline metadata (inherited).

TYPE: dict

_required_metadata_keys

Required metadata keys expected in the pipeline (inherited).

TYPE: list[str]

_required_components

Required component names expected in the pipeline (inherited).

TYPE: list[str]

METHOD DESCRIPTION
generate_answer

Run the rag pipeline to generate an answer based on a query.

indexing

Run the indexing pipeline on the provided list of documents.

Source code in docs/microservices/rag/src/rag/pipelines/database_rag.py
class DatabaseRagPipe(BaseRagPipe):
    """Initializes an instance of the f13-server specific DatabaseRagPipe for database-based retrieval operations.

    This class extends BaseRagPipe and loads pipelines defined in `configs/rag_pipeline_config.yml`.

    Attributes:
        rag_type (str): Identifier for the RAG type used in this pipeline ("database").
        name (str): The name of the pipeline instance (inherited from BaseRagPipe).
        rag_pipe (AsyncPipeline): The haystack RAG pipeline (inherited).
        indexing_pipe (AsyncPipeline): The haystack indexing pipeline (inherited).
        llm_generators (dict): Dictionary of LLM generation configurations from pipeline metadata (inherited).
        _required_metadata_keys (list[str]): Required metadata keys expected in the pipeline (inherited).
        _required_components (list[str]): Required component names expected in the pipeline (inherited).
    """

    def __init__(
        self,
        indexing_pipeline: AsyncPipeline,
        rag_pipeline: AsyncPipeline,
        pipe_name: str,
    ) -> None:
        """Initialize a new instance of DatabaseRagPipe.

        Pipeline Metadata Requirements:
            - `rag_answers_keys`: A list of relevant keys to access RAG answers.
            - `retrieved_docs_keys`: A list of relevant keys to access retrieved documents.
            - `llm_generators`: A dict of dictionaries with the available answer generation LLMs.
            - The models are selected with a Haystack component called `llm_router`.

        Pipeline Component Requirements:
            The following components are required in the pipeline: ["query", "filters"]

        Args:
            indexing_pipeline (Pipeline): Haystack indexing pipeline.
            rag_pipeline (Pipeline): Haystack RAG pipeline.
            pipe_name (str): The name of this specific pipeline.
        """
        self.rag_type = "database"
        super().__init__(
            indexing_pipeline=indexing_pipeline,
            rag_pipeline=rag_pipeline,
            pipe_name=pipe_name,
        )

    async def indexing(
        self, files: list[Document], double_doc_handling: DoubleDocsHandling
    ) -> tuple[int, int]:
        """Run the indexing pipeline on the provided list of documents.

        An ingestion timestamp is automatically added to the metadata of each document.

        Args:
            files (list[Document]): List of Document objects to be indexed.
            double_doc_handling (DoubleDocsHandling):
                string indicate if embedding should be forced if doc is already in the DB

        Returns:
            Number of ingested chunks.

        Raises:
            AttributeError: If `self.indexing_pipe` is not initialized or deployed.
        """
        if self.indexing_pipe is None:
            raise AttributeError(
                "`self.indexing_pipe` is not deployed. Please run `self.deploy_indexing_pipe`"
            )

        current_time = datetime.now(tz=pytz.timezone("Europe/Berlin"))
        n_docs = 0
        n_chunks = 0
        for doc in files:
            try:
                doc_id = self._compute_doc_hash(doc)
                doc.meta["ingestion_date"] = current_time

                doc.id = doc_id

                file_id_already_exists, existing_docs = self._check_if_file_id_exists(
                    doc_id
                )

                if file_id_already_exists:
                    logger.info(
                        f"Document with title {doc.meta['title']} already exists. Skipping embedding."
                    )
                    if double_doc_handling == DoubleDocsHandling.FORCE:
                        self._update_all_meta_data(existing_docs, doc.meta)
                    else:
                        continue

                status = await self.indexing_pipe.run_async(
                    data={
                        "file_type_router": {
                            "sources": [],
                        },
                        "converted_docs_joiner": {"documents": [doc]},
                        "set_doc_id": {"doc_ids": [doc_id]},
                    }
                )
                n_chunks = n_chunks + status["document_writer"]["documents_written"]
                logger.debug(status)
                n_docs = n_docs + 1
            except Exception as e:
                logger.error(f"Failed to write document {doc.id}: {e}")

        return n_docs, n_chunks

    async def generate_answer(
        self,
        query: str,
        language_model: str,
        meta_data_filters: list[MetadataFilter] | None = None,
        max_chunks_to_use: int = None,
    ) -> tuple[dict, dict]:
        """Run the rag pipeline to generate an answer based on a query.

        This method processes the input query through a Retrieval-Augmented Generation (RAG) pipeline,
        which retrieves relevant documents and generates an answer based on that context.

        Args:
            query (str): The input query for which an answer is to be generated.
            language_model (str): The model used for generating the answer.
            meta_data_filters (List[MetadataFilter] | None): List of metadata filters to apply during retrieval.
            max_chunks_to_use (int): Maximum number of chunks to use for the retrieval query.
                If None it is overwritten by the default max_chunks_to_use value of the selected llm.

        Returns:
            Tuple containing a dictionary with generated answers and a dictionary of documents
            used for answer generation.

        Raises:
            KeyError: If the language_model is not found in the valid LLM generator keys.
        """
        if self.rag_pipe is None:
            raise AttributeError(
                "`self.rag_pipe` is not deployed. Please run `self.deploy_rag_pipe`"
            )

        self._validate_language_model_key(language_model)

        meta_data_filters = self._generate_meta_data_filters(
            meta_data_filters=meta_data_filters
        )

        docs_component_name = self.rag_pipe.metadata["retrieved_docs_keys"][0]
        max_chunks_to_use_component = self.rag_pipe.metadata[
            "max_chunks_to_use_component"
        ][0]

        data = {"query": {"value": query}, "filters": {"value": meta_data_filters}}

        if (
            max_chunks_to_use is None
            and "max_chunks_to_use" in self.llm_generators[language_model]
        ):
            max_chunks_to_use = self.llm_generators[language_model]["max_chunks_to_use"]

        if max_chunks_to_use is not None:
            data[max_chunks_to_use_component] = {"top_k": max_chunks_to_use}

        self.add_llm_router_arg_inplace(language_model_key=language_model, data=data)

        result = await self.rag_pipe.run_async(
            data=data,
            include_outputs_from=[docs_component_name],
        )

        documents = get_dict_value(
            result, self.rag_pipe.metadata["retrieved_docs_keys"]
        )
        answer = get_dict_value(result, self.rag_pipe.metadata["rag_answers_keys"])

        logger.debug(
            "relevant sources for answer generation: "
            + "\n ####### \n".join([f"{s.meta} \n{s.content}" for s in documents])
        )

        return answer, documents

    @staticmethod
    def _generate_meta_data_filters(
        meta_data_filters: list[MetadataFilter] | None = None,
    ) -> dict | None:
        """Generate a filter based on the provided metadata filtering options.

        Args:
            meta_data_filters (list | None): A list of pydantic dataclass objects with metadata filtering options.

        Returns:
            A dictionary representing the generated filter, or None if no filters are needed.
        """
        if meta_data_filters is None:
            return None

        conditions = []

        for meta_data_filter in meta_data_filters:
            # add conditions for lists
            conditions_per_filter = []

            if meta_data_filter.end_date is not None:
                condition = {
                    "field": "meta.date",
                    "operator": "<=",
                    "value": meta_data_filter.end_date,
                }
                conditions_per_filter.append(condition)

            if meta_data_filter.start_date is not None:
                condition = {
                    "field": "meta.date",
                    "operator": ">=",
                    "value": meta_data_filter.start_date,
                }
                conditions_per_filter.append(condition)

            if meta_data_filter.source is not None:
                condition = {
                    "field": "meta.source",
                    "operator": "==",
                    "value": meta_data_filter.source,
                }
                conditions_per_filter.append(condition)

            if len(conditions_per_filter) == 0:
                continue
            else:
                conditions.append(
                    {"operator": "AND", "conditions": conditions_per_filter}
                )

        filters = {"operator": "OR", "conditions": conditions}
        logger.debug(f"Haystack filter for ES {filters}")
        return filters
generate_answer async
generate_answer(query, language_model, meta_data_filters=None, max_chunks_to_use=None)

Run the rag pipeline to generate an answer based on a query.

This method processes the input query through a Retrieval-Augmented Generation (RAG) pipeline, which retrieves relevant documents and generates an answer based on that context.

PARAMETER DESCRIPTION
query

The input query for which an answer is to be generated.

TYPE: str

language_model

The model used for generating the answer.

TYPE: str

meta_data_filters

List of metadata filters to apply during retrieval.

TYPE: List[MetadataFilter] | None DEFAULT: None

max_chunks_to_use

Maximum number of chunks to use for the retrieval query. If None it is overwritten by the default max_chunks_to_use value of the selected llm.

TYPE: int DEFAULT: None

RETURNS DESCRIPTION
dict

Tuple containing a dictionary with generated answers and a dictionary of documents

dict

used for answer generation.

RAISES DESCRIPTION
KeyError

If the language_model is not found in the valid LLM generator keys.

Source code in docs/microservices/rag/src/rag/pipelines/database_rag.py
async def generate_answer(
    self,
    query: str,
    language_model: str,
    meta_data_filters: list[MetadataFilter] | None = None,
    max_chunks_to_use: int = None,
) -> tuple[dict, dict]:
    """Run the rag pipeline to generate an answer based on a query.

    This method processes the input query through a Retrieval-Augmented Generation (RAG) pipeline,
    which retrieves relevant documents and generates an answer based on that context.

    Args:
        query (str): The input query for which an answer is to be generated.
        language_model (str): The model used for generating the answer.
        meta_data_filters (List[MetadataFilter] | None): List of metadata filters to apply during retrieval.
        max_chunks_to_use (int): Maximum number of chunks to use for the retrieval query.
            If None it is overwritten by the default max_chunks_to_use value of the selected llm.

    Returns:
        Tuple containing a dictionary with generated answers and a dictionary of documents
        used for answer generation.

    Raises:
        KeyError: If the language_model is not found in the valid LLM generator keys.
    """
    if self.rag_pipe is None:
        raise AttributeError(
            "`self.rag_pipe` is not deployed. Please run `self.deploy_rag_pipe`"
        )

    self._validate_language_model_key(language_model)

    meta_data_filters = self._generate_meta_data_filters(
        meta_data_filters=meta_data_filters
    )

    docs_component_name = self.rag_pipe.metadata["retrieved_docs_keys"][0]
    max_chunks_to_use_component = self.rag_pipe.metadata[
        "max_chunks_to_use_component"
    ][0]

    data = {"query": {"value": query}, "filters": {"value": meta_data_filters}}

    if (
        max_chunks_to_use is None
        and "max_chunks_to_use" in self.llm_generators[language_model]
    ):
        max_chunks_to_use = self.llm_generators[language_model]["max_chunks_to_use"]

    if max_chunks_to_use is not None:
        data[max_chunks_to_use_component] = {"top_k": max_chunks_to_use}

    self.add_llm_router_arg_inplace(language_model_key=language_model, data=data)

    result = await self.rag_pipe.run_async(
        data=data,
        include_outputs_from=[docs_component_name],
    )

    documents = get_dict_value(
        result, self.rag_pipe.metadata["retrieved_docs_keys"]
    )
    answer = get_dict_value(result, self.rag_pipe.metadata["rag_answers_keys"])

    logger.debug(
        "relevant sources for answer generation: "
        + "\n ####### \n".join([f"{s.meta} \n{s.content}" for s in documents])
    )

    return answer, documents
indexing async
indexing(files, double_doc_handling)

Run the indexing pipeline on the provided list of documents.

An ingestion timestamp is automatically added to the metadata of each document.

PARAMETER DESCRIPTION
files

List of Document objects to be indexed.

TYPE: list[Document]

double_doc_handling

string indicate if embedding should be forced if doc is already in the DB

TYPE: DoubleDocsHandling

RETURNS DESCRIPTION
tuple[int, int]

Number of ingested chunks.

RAISES DESCRIPTION
AttributeError

If self.indexing_pipe is not initialized or deployed.

Source code in docs/microservices/rag/src/rag/pipelines/database_rag.py
async def indexing(
    self, files: list[Document], double_doc_handling: DoubleDocsHandling
) -> tuple[int, int]:
    """Run the indexing pipeline on the provided list of documents.

    An ingestion timestamp is automatically added to the metadata of each document.

    Args:
        files (list[Document]): List of Document objects to be indexed.
        double_doc_handling (DoubleDocsHandling):
            string indicate if embedding should be forced if doc is already in the DB

    Returns:
        Number of ingested chunks.

    Raises:
        AttributeError: If `self.indexing_pipe` is not initialized or deployed.
    """
    if self.indexing_pipe is None:
        raise AttributeError(
            "`self.indexing_pipe` is not deployed. Please run `self.deploy_indexing_pipe`"
        )

    current_time = datetime.now(tz=pytz.timezone("Europe/Berlin"))
    n_docs = 0
    n_chunks = 0
    for doc in files:
        try:
            doc_id = self._compute_doc_hash(doc)
            doc.meta["ingestion_date"] = current_time

            doc.id = doc_id

            file_id_already_exists, existing_docs = self._check_if_file_id_exists(
                doc_id
            )

            if file_id_already_exists:
                logger.info(
                    f"Document with title {doc.meta['title']} already exists. Skipping embedding."
                )
                if double_doc_handling == DoubleDocsHandling.FORCE:
                    self._update_all_meta_data(existing_docs, doc.meta)
                else:
                    continue

            status = await self.indexing_pipe.run_async(
                data={
                    "file_type_router": {
                        "sources": [],
                    },
                    "converted_docs_joiner": {"documents": [doc]},
                    "set_doc_id": {"doc_ids": [doc_id]},
                }
            )
            n_chunks = n_chunks + status["document_writer"]["documents_written"]
            logger.debug(status)
            n_docs = n_docs + 1
        except Exception as e:
            logger.error(f"Failed to write document {doc.id}: {e}")

    return n_docs, n_chunks
draw_pipeline

Visualizes haystack pipelines.

FUNCTION DESCRIPTION
draw_pipeline

Create a Graphviz visualization of the pipeline.

draw_pipeline
draw_pipeline(graph, path)

Create a Graphviz visualization of the pipeline.

PARAMETER DESCRIPTION
graph

Graphviz visualization object of the pipeline.

TYPE: MultiDiGraph

path

The file path where the image will be saved.

TYPE: str

Source code in docs/microservices/rag/src/rag/pipelines/draw_pipeline.py
def draw_pipeline(graph: networkx.MultiDiGraph, path: str) -> None:
    """Create a Graphviz visualization of the pipeline.

    Args:
        graph: Graphviz visualization object of the pipeline.
        path: The file path where the image will be saved.
    """
    graphviz = to_agraph(graph.copy())
    graphviz.layout("dot")
    graphviz.draw(path)
file_rag

This module implements a RAG system based on files as data source.

CLASS DESCRIPTION
FileRagPipe

Initializes an instance of the f13-server specific FileRagPipe for file-based retrieval operations.

FileRagPipe

Bases: BaseRagPipe

Initializes an instance of the f13-server specific FileRagPipe for file-based retrieval operations.

This class extends BaseRagPipe and is specifically configured for file-based pipelines on the f13-server.

ATTRIBUTE DESCRIPTION
rag_type

Identifier for the RAG type used in this pipeline ("file").

TYPE: str

file_deletion_pipe

Haystack pipeline for deleting indexed documents by ID.

TYPE: AsyncPipeline

name

The name of the pipeline instance (inherited from BaseRagPipe).

TYPE: str

rag_pipe

The haystack RAG pipeline (inherited).

TYPE: AsyncPipeline

indexing_pipe

The haystack indexing pipeline (inherited).

TYPE: AsyncPipeline

llm_generators

Dictionary of LLM generation configurations from pipeline metadata (inherited).

TYPE: dict

_required_metadata_keys

Required metadata keys expected in the pipeline (inherited).

TYPE: list[str]

_required_components

Required component names expected in the pipeline (inherited).

TYPE: list[str]

METHOD DESCRIPTION
deploy_rag_pipe

This method loads the pipeline from yaml file and instantiates it as a haystack pipeline.

file_deletion

Run the temporary file deleter pipeline to delete files older than a specified timestamp.

generate_answer

Run the RAG pipeline to generate an answer based on a query.

indexing

Run the file indexing pipeline on the provided files.

Source code in docs/microservices/rag/src/rag/pipelines/file_rag.py
class FileRagPipe(BaseRagPipe):
    """Initializes an instance of the f13-server specific FileRagPipe for file-based retrieval operations.

    This class extends BaseRagPipe and is specifically configured for file-based pipelines on the f13-server.

    Attributes:
        rag_type (str): Identifier for the RAG type used in this pipeline ("file").
        file_deletion_pipe (AsyncPipeline): Haystack pipeline for deleting indexed documents by ID.
        name (str): The name of the pipeline instance (inherited from BaseRagPipe).
        rag_pipe (AsyncPipeline): The haystack RAG pipeline (inherited).
        indexing_pipe (AsyncPipeline): The haystack indexing pipeline (inherited).
        llm_generators (dict): Dictionary of LLM generation configurations from pipeline metadata (inherited).
        _required_metadata_keys (list[str]): Required metadata keys expected in the pipeline (inherited).
        _required_components (list[str]): Required component names expected in the pipeline (inherited).
    """

    def __init__(
        self,
        indexing_pipeline: AsyncPipeline,
        rag_pipeline: AsyncPipeline,
        file_deletion_pipeline: AsyncPipeline,
        pipe_name: str,
    ) -> None:
        """Initialize a new instance of BaseRagPipe.

        Pipeline Metadata Requirements:
            - `rag_answers_keys`: A list of relevant keys to access rag answers.
            - `retrieved_docs_keys`: A list of relevant keys to access retrieved documents.
            - `llm_generators`: A dict of dictionaries with the available answer generation LLMs.
            - The models are selected with a Haystack component called `llm_router`.

        Pipeline Component Requirements:
            The following components are required in the pipeline: ["query", "filters"]

        Args:
            indexing_pipeline (AsyncPipeline): haystack indexing pipeline.
            rag_pipeline (AsyncPipeline): haystack RAG pipeline.
            file_deletion_pipeline (AsyncPipeline): haystack doc-id deletion pipeline.
            pipe_name (str): The name of this specific pipeline.
        """
        self.rag_type = "file"
        super().__init__(
            indexing_pipeline=indexing_pipeline,
            rag_pipeline=rag_pipeline,
            pipe_name=pipe_name,
        )

        self.file_deletion_pipe = file_deletion_pipeline

    async def indexing(self, files: list[UploadFile]) -> list[str]:
        """Run the file indexing pipeline on the provided files.

        Takes a list of byte streams representing files
        and runs it with the provided data. Returns a list of source IDs.

        Args:
            files (list[UploadFile]): A list of fastapi.UploadFile objects.

        Returns:
            A list of source IDs generated by the indexing pipeline.
        """
        if self.indexing_pipe is None:
            raise AttributeError(
                "`self.indexing_pipe` is not deployed. Please run `self.deploy_indexing_pipe`"
            )

        current_time = datetime.now(tz=pytz.timezone("Europe/Berlin"))

        source_ids = []
        for file in files:
            try:
                files_byte_stream = convert_uploadfile_to_byte_stream(file)
                doc_id = self._compute_doc_hash(file)
                source_ids.append(doc_id)

                file_id_already_exists, existing_docs = self._check_if_file_id_exists(
                    doc_id
                )
                if file_id_already_exists:
                    logger.info(
                        f"File {file.filename} already exists. Skipping indexing. Updating ingestion date."
                    )
                    self._update_ingestion_date(existing_docs, current_time)
                    continue

                result = await self.indexing_pipe.run_async(
                    data={
                        "file_type_router": {
                            "sources": [files_byte_stream],
                            "meta": {"ingestion_date": current_time},
                        },
                        "set_doc_id": {"doc_ids": [doc_id]},
                    }
                )
                if not result["set_doc_id"]["document_ids"]:
                    logger.error(
                        "Something during indexing went wrong. No Documents embedded."
                    )

            except Exception as e:
                logger.info(f"Error processing file {file.filename}: {e}")
                continue
        return source_ids

    async def generate_answer(
        self,
        query: str,
        language_model: str,
        source_ids: list[str] | None = None,
        max_chunks_to_use: int | None = None,
    ) -> tuple[dict, dict]:
        """Run the RAG pipeline to generate an answer based on a query.

        This method processes the input query through a Retrieval-Augmented Generation (RAG) pipeline,
        which retrieves relevant documents and generates an answer based on that context.

        Args:
            query (str): The input query for which an answer is to be generated.
            language_model (str): The model used for generating the answer.
            source_ids (list[str] | None): Document IDs of the files available for file retrieval.
            max_chunks_to_use (int | None): Maximum number of chunks to use for the retrieval query.

        Returns:
            A tuple containing a dictionary with generated answers and a dictionary of
            documents used for answer generation.
        """
        if self.rag_pipe is None:
            raise AttributeError(
                "`self.rag_pipe` is not deployed. Please run `self.deploy_rag_pipe`"
            )

        if not source_ids:
            answer = "Bitte stellen Sie ein Dokument bereit."
            documents = []
            return answer, documents

        meta_data_filters = {
            "field": "meta.source_id",
            "operator": "in",
            "value": source_ids,
        }
        docs_component_name = self.rag_pipe.metadata["retrieved_docs_keys"][0]
        max_chunks_to_use_component = self.rag_pipe.metadata[
            "max_chunks_to_use_component"
        ][0]
        data = {
            "query": {"value": query},
            "filters": {"value": meta_data_filters},
        }

        if (
            max_chunks_to_use is None
            and "max_chunks_to_use" in self.llm_generators[language_model]
        ):
            max_chunks_to_use = self.llm_generators[language_model]["max_chunks_to_use"]

        if max_chunks_to_use is not None:
            data[max_chunks_to_use_component] = {"top_k": max_chunks_to_use}

        self.add_llm_router_arg_inplace(language_model_key=language_model, data=data)

        result = await self.rag_pipe.run_async(
            data=data,
            include_outputs_from=[docs_component_name],
        )

        documents = get_dict_value(
            result, self.rag_pipe.metadata["retrieved_docs_keys"]
        )
        answer = get_dict_value(result, self.rag_pipe.metadata["rag_answers_keys"])

        logger.debug(
            "relevant sources for answer generation: "
            + "\n ####### \n".join([f"{s.meta} \n{s.content}" for s in documents])
        )

        return answer, documents

    async def file_deletion(self, timestamp: datetime) -> int:
        """Run the temporary file deleter pipeline to delete files older than a specified timestamp.

        The pipeline uses a filter to delete documents with an ingestion date
        older than or equal to the provided timestamp. It then runs the retrieval
        pipeline to delete these documents.

        Args:
            timestamp (date): The maximum ingestion date for files to be deleted.

        Returns:
            The number of documents deleted by the pipeline.
        """
        if self.file_deletion_pipe is None:
            raise AttributeError(
                "`self.file_deletion_pipe` is not deployed. Please run `self.file_deletion_pipe`"
            )

        filters = {"field": "meta.ingestion_date", "operator": "<=", "value": timestamp}

        n_deleted_docs = await self.file_deletion_pipe.run_async(
            data={
                "document_id_retriever": {"filters": filters},
            }
        )

        return n_deleted_docs.get("document_deleter", {}).get("documents_deleted", 0)

    def deploy_rag_pipe(self) -> None:
        """This method loads the pipeline from yaml file and instantiates it as a haystack pipeline."""
        self.file_deletion_pipe.warm_up()
        return super().deploy_rag_pipe()
deploy_rag_pipe
deploy_rag_pipe()

This method loads the pipeline from yaml file and instantiates it as a haystack pipeline.

Source code in docs/microservices/rag/src/rag/pipelines/file_rag.py
def deploy_rag_pipe(self) -> None:
    """This method loads the pipeline from yaml file and instantiates it as a haystack pipeline."""
    self.file_deletion_pipe.warm_up()
    return super().deploy_rag_pipe()
file_deletion async
file_deletion(timestamp)

Run the temporary file deleter pipeline to delete files older than a specified timestamp.

The pipeline uses a filter to delete documents with an ingestion date older than or equal to the provided timestamp. It then runs the retrieval pipeline to delete these documents.

PARAMETER DESCRIPTION
timestamp

The maximum ingestion date for files to be deleted.

TYPE: date

RETURNS DESCRIPTION
int

The number of documents deleted by the pipeline.

Source code in docs/microservices/rag/src/rag/pipelines/file_rag.py
async def file_deletion(self, timestamp: datetime) -> int:
    """Run the temporary file deleter pipeline to delete files older than a specified timestamp.

    The pipeline uses a filter to delete documents with an ingestion date
    older than or equal to the provided timestamp. It then runs the retrieval
    pipeline to delete these documents.

    Args:
        timestamp (date): The maximum ingestion date for files to be deleted.

    Returns:
        The number of documents deleted by the pipeline.
    """
    if self.file_deletion_pipe is None:
        raise AttributeError(
            "`self.file_deletion_pipe` is not deployed. Please run `self.file_deletion_pipe`"
        )

    filters = {"field": "meta.ingestion_date", "operator": "<=", "value": timestamp}

    n_deleted_docs = await self.file_deletion_pipe.run_async(
        data={
            "document_id_retriever": {"filters": filters},
        }
    )

    return n_deleted_docs.get("document_deleter", {}).get("documents_deleted", 0)
generate_answer async
generate_answer(query, language_model, source_ids=None, max_chunks_to_use=None)

Run the RAG pipeline to generate an answer based on a query.

This method processes the input query through a Retrieval-Augmented Generation (RAG) pipeline, which retrieves relevant documents and generates an answer based on that context.

PARAMETER DESCRIPTION
query

The input query for which an answer is to be generated.

TYPE: str

language_model

The model used for generating the answer.

TYPE: str

source_ids

Document IDs of the files available for file retrieval.

TYPE: list[str] | None DEFAULT: None

max_chunks_to_use

Maximum number of chunks to use for the retrieval query.

TYPE: int | None DEFAULT: None

RETURNS DESCRIPTION
dict

A tuple containing a dictionary with generated answers and a dictionary of

dict

documents used for answer generation.

Source code in docs/microservices/rag/src/rag/pipelines/file_rag.py
async def generate_answer(
    self,
    query: str,
    language_model: str,
    source_ids: list[str] | None = None,
    max_chunks_to_use: int | None = None,
) -> tuple[dict, dict]:
    """Run the RAG pipeline to generate an answer based on a query.

    This method processes the input query through a Retrieval-Augmented Generation (RAG) pipeline,
    which retrieves relevant documents and generates an answer based on that context.

    Args:
        query (str): The input query for which an answer is to be generated.
        language_model (str): The model used for generating the answer.
        source_ids (list[str] | None): Document IDs of the files available for file retrieval.
        max_chunks_to_use (int | None): Maximum number of chunks to use for the retrieval query.

    Returns:
        A tuple containing a dictionary with generated answers and a dictionary of
        documents used for answer generation.
    """
    if self.rag_pipe is None:
        raise AttributeError(
            "`self.rag_pipe` is not deployed. Please run `self.deploy_rag_pipe`"
        )

    if not source_ids:
        answer = "Bitte stellen Sie ein Dokument bereit."
        documents = []
        return answer, documents

    meta_data_filters = {
        "field": "meta.source_id",
        "operator": "in",
        "value": source_ids,
    }
    docs_component_name = self.rag_pipe.metadata["retrieved_docs_keys"][0]
    max_chunks_to_use_component = self.rag_pipe.metadata[
        "max_chunks_to_use_component"
    ][0]
    data = {
        "query": {"value": query},
        "filters": {"value": meta_data_filters},
    }

    if (
        max_chunks_to_use is None
        and "max_chunks_to_use" in self.llm_generators[language_model]
    ):
        max_chunks_to_use = self.llm_generators[language_model]["max_chunks_to_use"]

    if max_chunks_to_use is not None:
        data[max_chunks_to_use_component] = {"top_k": max_chunks_to_use}

    self.add_llm_router_arg_inplace(language_model_key=language_model, data=data)

    result = await self.rag_pipe.run_async(
        data=data,
        include_outputs_from=[docs_component_name],
    )

    documents = get_dict_value(
        result, self.rag_pipe.metadata["retrieved_docs_keys"]
    )
    answer = get_dict_value(result, self.rag_pipe.metadata["rag_answers_keys"])

    logger.debug(
        "relevant sources for answer generation: "
        + "\n ####### \n".join([f"{s.meta} \n{s.content}" for s in documents])
    )

    return answer, documents
indexing async
indexing(files)

Run the file indexing pipeline on the provided files.

Takes a list of byte streams representing files and runs it with the provided data. Returns a list of source IDs.

PARAMETER DESCRIPTION
files

A list of fastapi.UploadFile objects.

TYPE: list[UploadFile]

RETURNS DESCRIPTION
list[str]

A list of source IDs generated by the indexing pipeline.

Source code in docs/microservices/rag/src/rag/pipelines/file_rag.py
async def indexing(self, files: list[UploadFile]) -> list[str]:
    """Run the file indexing pipeline on the provided files.

    Takes a list of byte streams representing files
    and runs it with the provided data. Returns a list of source IDs.

    Args:
        files (list[UploadFile]): A list of fastapi.UploadFile objects.

    Returns:
        A list of source IDs generated by the indexing pipeline.
    """
    if self.indexing_pipe is None:
        raise AttributeError(
            "`self.indexing_pipe` is not deployed. Please run `self.deploy_indexing_pipe`"
        )

    current_time = datetime.now(tz=pytz.timezone("Europe/Berlin"))

    source_ids = []
    for file in files:
        try:
            files_byte_stream = convert_uploadfile_to_byte_stream(file)
            doc_id = self._compute_doc_hash(file)
            source_ids.append(doc_id)

            file_id_already_exists, existing_docs = self._check_if_file_id_exists(
                doc_id
            )
            if file_id_already_exists:
                logger.info(
                    f"File {file.filename} already exists. Skipping indexing. Updating ingestion date."
                )
                self._update_ingestion_date(existing_docs, current_time)
                continue

            result = await self.indexing_pipe.run_async(
                data={
                    "file_type_router": {
                        "sources": [files_byte_stream],
                        "meta": {"ingestion_date": current_time},
                    },
                    "set_doc_id": {"doc_ids": [doc_id]},
                }
            )
            if not result["set_doc_id"]["document_ids"]:
                logger.error(
                    "Something during indexing went wrong. No Documents embedded."
                )

        except Exception as e:
            logger.info(f"Error processing file {file.filename}: {e}")
            continue
    return source_ids
openai_custom_auth_client

This module provides an OpenAI client that supports Bearer-Token-Authentication and Basic-Authentication.

CLASS DESCRIPTION
CustomAsyncAuthClient

Custom HTTP transport for asynchronous OpenAI client.

CustomAuthClient

Custom HTTP transport for OpenAI client.

CustomAsyncAuthClient

Bases: AsyncClient

Custom HTTP transport for asynchronous OpenAI client.

This class supports both Bearer Token Authentication and Basic Authentication. If auth_type is 'token', the secret is expected to be the API key. If auth_type is 'basic_auth', the secret is expected to be a base64-encoded string of 'username:password'.

ATTRIBUTE DESCRIPTION
auth_header

Authentication header for the httpx client.

TYPE: str

METHOD DESCRIPTION
send

Synchronous method for sending HTTP requests.

Source code in docs/microservices/rag/src/rag/pipelines/openai_custom_auth_client.py
class CustomAsyncAuthClient(httpx.AsyncClient):
    """Custom HTTP transport for asynchronous OpenAI client.

    This class supports both Bearer Token Authentication and Basic Authentication.
    If `auth_type` is 'token', the `secret` is expected to be the API key.
    If `auth_type` is 'basic_auth', the `secret` is expected to be a base64-encoded string of 'username:password'.

    Attributes:
        auth_header (str): Authentication header for the httpx client.

    Methods:
        send(request, *args, **kwargs): Synchronous method for sending HTTP requests.
    """

    def __init__(
        self,
        auth_header: str | None = "",
        *args: object,
        **kwargs: object,
    ) -> None:
        """Custom HTTP transport for OpenAI client that supports Bearer-Token and Basic Authentication.

        Args:
            auth_header (str | None):   Header containing auth information
                                        (bearer token or basic_auth `username:password`).
            *args (object): Variable length argument list.
            **kwargs (object): Arbitrary keyword arguments.
        """
        super().__init__(*args, **kwargs)
        self.auth_header = auth_header

    async def send(
        self, request: httpx.Request, *args: object, **kwargs: object
    ) -> httpx.Response:
        """Override the send method to remove the Authorization header if no authentication was set.

        Args:
            request (httpx.Request): The HTTP request to be sent.
            *args (object): Variable length argument list.
            **kwargs (object): Arbitrary keyword arguments.

        Returns:
            The HTTP response received.
        """
        if "Authorization" in request.headers:
            del request.headers["Authorization"]
        if self.auth_header:
            request.headers["Authorization"] = self.auth_header
        return await super().send(request, *args, **kwargs)
send async
send(request, *args, **kwargs)

Override the send method to remove the Authorization header if no authentication was set.

PARAMETER DESCRIPTION
request

The HTTP request to be sent.

TYPE: Request

*args

Variable length argument list.

TYPE: object DEFAULT: ()

**kwargs

Arbitrary keyword arguments.

TYPE: object DEFAULT: {}

RETURNS DESCRIPTION
Response

The HTTP response received.

Source code in docs/microservices/rag/src/rag/pipelines/openai_custom_auth_client.py
async def send(
    self, request: httpx.Request, *args: object, **kwargs: object
) -> httpx.Response:
    """Override the send method to remove the Authorization header if no authentication was set.

    Args:
        request (httpx.Request): The HTTP request to be sent.
        *args (object): Variable length argument list.
        **kwargs (object): Arbitrary keyword arguments.

    Returns:
        The HTTP response received.
    """
    if "Authorization" in request.headers:
        del request.headers["Authorization"]
    if self.auth_header:
        request.headers["Authorization"] = self.auth_header
    return await super().send(request, *args, **kwargs)
CustomAuthClient

Bases: Client

Custom HTTP transport for OpenAI client.

This class supports both Bearer Token Authentication and Basic Authentication. If auth_type is 'token', the secret is expected to be the API key. If auth_type is 'basic_auth', the secret is expected to be a base64-encoded string of 'username:password'.

ATTRIBUTE DESCRIPTION
auth_header

Authentication header for the httpx client.

TYPE: str

METHOD DESCRIPTION
send

Synchronous method for sending HTTP requests.

Source code in docs/microservices/rag/src/rag/pipelines/openai_custom_auth_client.py
class CustomAuthClient(httpx.Client):
    """Custom HTTP transport for OpenAI client.

    This class supports both Bearer Token Authentication and Basic Authentication.
    If `auth_type` is 'token', the `secret` is expected to be the API key.
    If `auth_type` is 'basic_auth', the `secret` is expected to be a base64-encoded string of 'username:password'.

    Attributes:
        auth_header (str): Authentication header for the httpx client.

    Methods:
        send(request, *args, **kwargs): Synchronous method for sending HTTP requests.
    """

    def __init__(
        self,
        auth_header: str | None = "",
        *args: object,
        **kwargs: object,
    ) -> None:
        """Custom HTTP transport for OpenAI client that supports Bearer-Token and Basic Authentication.

        Args:
            auth_header (str | None):   Header containing auth information
                                        (bearer token or basic_auth `username:password`).
            *args (object): Variable length argument list.
            **kwargs (object): Arbitrary keyword arguments.
        """
        super().__init__(*args, **kwargs)
        self.auth_header = auth_header

    def send(
        self, request: httpx.Request, *args: object, **kwargs: object
    ) -> httpx.Response:
        """Override the send method to remove the Authorization header if no authentication was set.

        Args:
            request (httpx.Request): The HTTP request to be sent.
            *args (object): Variable length argument list.
            **kwargs (object): Arbitrary keyword arguments.

        Returns:
            The HTTP response received.
        """
        if "Authorization" in request.headers:
            del request.headers["Authorization"]
        if self.auth_header:
            request.headers["Authorization"] = self.auth_header
        return super().send(request, *args, **kwargs)
send
send(request, *args, **kwargs)

Override the send method to remove the Authorization header if no authentication was set.

PARAMETER DESCRIPTION
request

The HTTP request to be sent.

TYPE: Request

*args

Variable length argument list.

TYPE: object DEFAULT: ()

**kwargs

Arbitrary keyword arguments.

TYPE: object DEFAULT: {}

RETURNS DESCRIPTION
Response

The HTTP response received.

Source code in docs/microservices/rag/src/rag/pipelines/openai_custom_auth_client.py
def send(
    self, request: httpx.Request, *args: object, **kwargs: object
) -> httpx.Response:
    """Override the send method to remove the Authorization header if no authentication was set.

    Args:
        request (httpx.Request): The HTTP request to be sent.
        *args (object): Variable length argument list.
        **kwargs (object): Arbitrary keyword arguments.

    Returns:
        The HTTP response received.
    """
    if "Authorization" in request.headers:
        del request.headers["Authorization"]
    if self.auth_header:
        request.headers["Authorization"] = self.auth_header
    return super().send(request, *args, **kwargs)
pipeline_definitions

Implementation of the pipeline definitions of the RAG.

MODULE DESCRIPTION
haystack_pipelines

This module sets up a hybrid RAG pipeline for file and database question answering.

pipeline_moduls

This module provides functions to generate haystack RAG pipelines.

haystack_pipelines

This module sets up a hybrid RAG pipeline for file and database question answering.

FUNCTION DESCRIPTION
create_indexing_pipeline

Create and optionally save a Haystack indexing pipeline.

create_rag_pipeline

Create the complete RAG pipeline consisting of retrieval and answer generation.

create_retrieval_pipeline

Create Haystack retrieval pipeline.

get_rag_pipelines

Generate and save hybrid RAG pipeline.

ATTRIBUTE DESCRIPTION
args

Be very careful when using pipeline visualizations with sensitive data,

args module-attribute
args = parse_args()

Be very careful when using pipeline visualizations with sensitive data, as the Mermaid graph service is an external server that does not have documented privacy policies or data retention guidelines. This increases the risk of unintended exposure of confidential information. mermaid-draw-params - format: Output format ('img', 'svg', or 'pdf'). Default: 'img'. - type: Image type for /img endpoint ('jpeg', 'png', 'webp'). Default: 'png'. - theme: Mermaid theme ('default', 'neutral', 'dark', 'forest'). Default: 'neutral'. - bgColor: Background color in hexadecimal (e.g., 'FFFFFF') or named format (e.g., '!white'). - width: Width of the output image (integer). - height: Height of the output image (integer). - scale: Scaling factor (1–3). Only applicable if 'width' or 'height' is specified. - fit: Whether to fit the diagram size to the page (PDF only, boolean). - paper: Paper size for PDFs (e.g., 'a4', 'a3'). Ignored if 'fit' is true. - landscape: Landscape orientation for PDFs (boolean). Ignored if 'fit' is true.

create_indexing_pipeline
create_indexing_pipeline(embedding_config, indexing_pipeline_config, pipeline_save_config, **_)

Create and optionally save a Haystack indexing pipeline.

PARAMETER DESCRIPTION
embedding_config

Embedding model configuration.

TYPE: EmbeddingModel

indexing_pipeline_config

Configuration for the indexing pipeline containing: - index (str): Elasticsearch index to store the documents to. - es_host (str): Elasticsearch host.

TYPE: IndexingPipelineConfig

pipeline_save_config

Configuration for pipelines saving process: - file_path (str | Path): Path to save the pipeline to. - pipe_label (str): Pipeline label. - png (str or None): The type of PNG image to generate. Can be "mermaid", "local" or None.

TYPE: PipelineSaveConfig

RETURNS DESCRIPTION
AsyncPipeline | None

The pipeline if not saved to disk, otherwise None.

Source code in docs/microservices/rag/src/rag/pipelines/pipeline_definitions/haystack_pipelines.py
def create_indexing_pipeline(
    embedding_config: EmbeddingModel,
    indexing_pipeline_config: IndexingPipelineConfig,
    pipeline_save_config: PipelineSaveConfig,
    **_: object,
) -> AsyncPipeline | None:
    """Create and optionally save a Haystack indexing pipeline.

    Args:
        embedding_config (EmbeddingModel): Embedding model configuration.
        indexing_pipeline_config (IndexingPipelineConfig): Configuration for the indexing pipeline containing:
            - index (str): Elasticsearch index to store the documents to.
            - es_host (str): Elasticsearch host.
        pipeline_save_config (PipelineSaveConfig): Configuration for pipelines saving process:
            - file_path (str | Path): Path to save the pipeline to.
            - pipe_label (str): Pipeline label.
            - png (str or None): The type of PNG image to generate. Can be "mermaid", "local" or None.

    Returns:
        The pipeline if not saved to disk, otherwise None.
    """
    metadata = {"pipe_label": pipeline_save_config.pipe_label}

    document_embedder = F13OpenAIDocumentEmbedder(
        embedding_model=embedding_config,
        timeout=settings.llm_api_timeout,
    )

    file_type_router = FileTypeRouter(
        mime_types=[
            "text/plain",
            "application/pdf",
            "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
        ]
    )

    document_store = ElasticsearchDocumentStore(
        hosts=indexing_pipeline_config.es_host,
        index=indexing_pipeline_config.index,
    )

    pdf_converter = PyPDFToDocument()
    text_file_converter = TextFileToDocument()
    docx_converter = TikaDocumentConverter()

    set_id = IDSetter(
        ignore_keys_for_doc_hash=[
            "ingestion_date",
            "source_id",
            "id",
            "embedding",
        ]
    )

    document_splitter = DocumentSplitter(
        split_by="word",
        split_length=200,
        split_overlap=20,
    )
    converted_docs_joiner = DocumentJoiner()

    document_writer = DocumentWriter(document_store)

    indexing_pipeline = AsyncPipeline(metadata=metadata)
    indexing_pipeline.add_component(instance=file_type_router, name="file_type_router")
    indexing_pipeline.add_component(
        instance=text_file_converter, name="text_file_converter"
    )
    indexing_pipeline.add_component(instance=pdf_converter, name="pdf_converter")
    indexing_pipeline.add_component(instance=docx_converter, name="tika_converter")
    indexing_pipeline.add_component(instance=set_id, name="set_doc_id")
    indexing_pipeline.add_component(
        instance=converted_docs_joiner, name="converted_docs_joiner"
    )
    indexing_pipeline.add_component("document_splitter", document_splitter)
    indexing_pipeline.add_component("document_embedder", document_embedder)
    indexing_pipeline.add_component("document_writer", document_writer)

    indexing_pipeline.connect(
        "file_type_router.text/plain", "text_file_converter.sources"
    )
    indexing_pipeline.connect(
        "file_type_router.application/pdf", "pdf_converter.sources"
    )
    indexing_pipeline.connect(
        "file_type_router.application/vnd.openxmlformats-officedocument.wordprocessingml.document",
        "tika_converter.sources",
    )
    indexing_pipeline.connect("text_file_converter", "converted_docs_joiner")
    indexing_pipeline.connect("pdf_converter", "converted_docs_joiner")
    indexing_pipeline.connect("tika_converter", "converted_docs_joiner")
    indexing_pipeline.connect("converted_docs_joiner", "set_doc_id")
    indexing_pipeline.connect("set_doc_id.documents", "document_splitter")
    indexing_pipeline.connect("document_splitter", "document_embedder")
    indexing_pipeline.connect("document_embedder", "document_writer")
    # saving indexing pipeline
    # saving pipeline only for the retrieval of documents
    if pipeline_save_config.file_path:
        save_pipe_to(
            pipeline_save_config.file_path,
            indexing_pipeline,
            png=pipeline_save_config.png,
        )
    else:
        return indexing_pipeline
create_rag_pipeline
create_rag_pipeline(embedding_config, retrieval_config, indexing_pipeline_config, pipeline_save_config, rag_llms=None, **_)

Create the complete RAG pipeline consisting of retrieval and answer generation.

PARAMETER DESCRIPTION
retrieval_config

Reranker config.

TYPE: RetrievalConfig

embedding_config

Embedding model config.

TYPE: EmbeddingModel

indexing_pipeline_config
  • es_host (str): Elasticsearch host to connect to.
  • index (str): Elasticsearch index to use for retrieval.

TYPE: IndexingPipelineConfig

pipeline_save_config

Configuration for pipelines saving process: - file_path (str | Path): Path to save the pipeline to. - pipe_label (str): Pipeline label. - png (str or None): The type of PNG image to generate. Can be "mermaid", "local" or None.

TYPE: PipelineSaveConfig

rag_llms

RAG model config.

TYPE: dict[str, LLM] DEFAULT: None

RETURNS DESCRIPTION
AsyncPipeline

RAG pipeline.

Source code in docs/microservices/rag/src/rag/pipelines/pipeline_definitions/haystack_pipelines.py
def create_rag_pipeline(
    embedding_config: EmbeddingModel,
    retrieval_config: RetrievalConfig,
    indexing_pipeline_config: IndexingPipelineConfig,
    pipeline_save_config: PipelineSaveConfig,
    rag_llms: dict[str, LLM] = None,
    **_: object,
) -> AsyncPipeline:
    """Create the complete RAG pipeline consisting of retrieval and answer generation.

    Args:
        retrieval_config (RetrievalConfig): Reranker config.
        embedding_config (EmbeddingModel): Embedding model config.
        indexing_pipeline_config (IndexingPipelineConfig):
            - es_host (str): Elasticsearch host to connect to.
            - index (str): Elasticsearch index to use for retrieval.
        pipeline_save_config (PipelineSaveConfig): Configuration for pipelines saving process:
            - file_path (str | Path): Path to save the pipeline to.
            - pipe_label (str): Pipeline label.
            - png (str or None): The type of PNG image to generate. Can be "mermaid", "local" or None.
        rag_llms (dict[str, LLM]): RAG model config.

    Returns:
        RAG pipeline.
    """
    retrieval_pipe = create_retrieval_pipeline(
        retrieval_config=retrieval_config,
        embedding_config=embedding_config,
        indexing_pipeline_config=indexing_pipeline_config,
        pipeline_save_config=pipeline_save_config,
    )

    pipe = add_answer_generation_pipeline(
        retrieval_pipe=retrieval_pipe,
        es_host=indexing_pipeline_config.es_host,
        index=indexing_pipeline_config.index,
        pipeline_save_config=pipeline_save_config,
        rag_llms=rag_llms,
    )
    return pipe
create_retrieval_pipeline
create_retrieval_pipeline(embedding_config, retrieval_config, indexing_pipeline_config, pipeline_save_config, **_)

Create Haystack retrieval pipeline.

PARAMETER DESCRIPTION
embedding_config

Embedding model config.

TYPE: EmbeddingModel

retrieval_config

Retrieval config.

TYPE: RetrievalConfig

indexing_pipeline_config
  • es_host (str): Elasticsearch host to connect to.
  • index (str): Elasticsearch index to use for retrieval.

TYPE: IndexingPipelineConfig

pipeline_save_config

Configuration for pipelines saving process: - file_path (str | Path): Path to save the pipeline to. - pipe_label (str): Pipeline label. - png (str or None): The type of PNG image to generate. Can be "mermaid", "local" or None.

TYPE: PipelineSaveConfig

RETURNS DESCRIPTION
AsyncPipeline

Retrieval pipeline.

Source code in docs/microservices/rag/src/rag/pipelines/pipeline_definitions/haystack_pipelines.py
def create_retrieval_pipeline(
    embedding_config: EmbeddingModel,
    retrieval_config: RetrievalConfig,
    indexing_pipeline_config: IndexingPipelineConfig,
    pipeline_save_config: PipelineSaveConfig,
    **_: object,
) -> AsyncPipeline:
    """Create Haystack retrieval pipeline.

    Args:
        embedding_config (EmbeddingModel): Embedding model config.
        retrieval_config (RetrievalConfig): Retrieval config.
        indexing_pipeline_config (IndexingPipelineConfig):
            - es_host (str): Elasticsearch host to connect to.
            - index (str): Elasticsearch index to use for retrieval.
        pipeline_save_config (PipelineSaveConfig): Configuration for pipelines saving process:
            - file_path (str | Path): Path to save the pipeline to.
            - pipe_label (str): Pipeline label.
            - png (str or None): The type of PNG image to generate. Can be "mermaid", "local" or None.

    Returns:
        Retrieval pipeline.
    """
    metadata = {"pipe_label": pipeline_save_config.pipe_label}

    text_embedder = F13OpenAITextEmbedder(
        embedding_model=embedding_config,
        timeout=settings.llm_api_timeout,
    )

    document_store = ElasticsearchDocumentStore(
        hosts=indexing_pipeline_config.es_host, index=indexing_pipeline_config.index
    )
    embedding_retriever = EmbeddingRetriever(
        document_store=document_store, top_k=retrieval_config.retriever_top_k
    )
    bm25_retriever = BM25Retriever(
        document_store=document_store, top_k=retrieval_config.retriever_top_k
    )

    if retrieval_config.include_ranker:
        document_joiner_top_k = 2 * retrieval_config.retriever_top_k
    else:
        document_joiner_top_k = retrieval_config.max_chunks_to_use

    result_document_joiner = DocumentJoiner(
        join_mode="reciprocal_rank_fusion",
        sort_by_score=True,
        top_k=document_joiner_top_k,
    )

    query_branch = BranchJoiner(str)
    filter_branch = BranchJoiner(dict[str, Any])

    grouping = MetaFieldGroupingRanker(group_by="source_id")
    retrieval_pipeline = AsyncPipeline(metadata=metadata)

    retrieval_pipeline.add_component("query", query_branch)
    retrieval_pipeline.add_component("filters", filter_branch)
    retrieval_pipeline.add_component("text_embedder", text_embedder)
    retrieval_pipeline.add_component("embedding_retriever", embedding_retriever)
    retrieval_pipeline.add_component("bm25_retriever", bm25_retriever)
    retrieval_pipeline.add_component(
        instance=result_document_joiner, name="retrieved_docs_joiner"
    )
    retrieval_pipeline.add_component("document_grouping", grouping)

    retrieval_pipeline.connect("query.value", "text_embedder")
    retrieval_pipeline.connect(
        "text_embedder.embedding", "embedding_retriever.query_embedding"
    )
    retrieval_pipeline.connect("filters.value", "embedding_retriever.filters")
    retrieval_pipeline.connect("query.value", "bm25_retriever.query")
    retrieval_pipeline.connect("filters.value", "bm25_retriever.filters")
    retrieval_pipeline.connect("bm25_retriever", "retrieved_docs_joiner")
    retrieval_pipeline.connect("embedding_retriever", "retrieved_docs_joiner")

    # add ranker component
    if retrieval_config.include_ranker:
        ranker = SentenceTransformersSimilarityRanker(
            model=retrieval_config.ranker_model_path,
            meta_fields_to_embed=["title"],
            score_threshold=retrieval_config.ranker_score_threshold,
            top_k=retrieval_config.max_chunks_to_use,
        )
        retrieval_pipeline.add_component("ranker", ranker)
        retrieval_pipeline.connect(
            "retrieved_docs_joiner.documents", "ranker.documents"
        )
        retrieval_pipeline.connect("query.value", "ranker.query")
        retrieval_pipeline.connect("ranker.documents", "document_grouping.documents")
        retrieval_pipeline.metadata["max_chunks_to_use_component"] = ["ranker"]
    else:
        result_document_joiner.top_k = retrieval_config.max_chunks_to_use
        retrieval_pipeline.connect(
            "retrieved_docs_joiner.documents", "document_grouping.documents"
        )
        retrieval_pipeline.metadata["max_chunks_to_use_component"] = [
            "retrieved_docs_joiner"
        ]

    retrieval_pipeline.metadata["retrieved_docs_keys"] = [
        "document_grouping",
        "documents",
    ]

    # saving pipeline only for the retrieval of documents
    if pipeline_save_config.file_path:
        save_pipe_to(
            pipeline_save_config.file_path,
            retrieval_pipeline,
            png=pipeline_save_config.png,
        )
    else:
        return retrieval_pipeline
get_rag_pipelines
get_rag_pipelines(es_host, **_)

Generate and save hybrid RAG pipeline.

PARAMETER DESCRIPTION
es_host

Elasticsearch host, e.g., "http://localhost:9200".

TYPE: str

Source code in docs/microservices/rag/src/rag/pipelines/pipeline_definitions/haystack_pipelines.py
def get_rag_pipelines(es_host: str, **_: object) -> None:
    """Generate and save hybrid RAG pipeline.

    Args:
        es_host (str): Elasticsearch host, e.g., "http://localhost:9200".
    """
    logger.debug("Creating Haystack Pipelines.")
    pipe_config = rag_pipeline_config.model_dump()["pipeline"]

    # load configuration parameters of the LLMs to use from yaml file
    rag_llms = llm_config.rag
    embedding_config = llm_config.embedding.get(pipe_config["embedding_model_name"])

    creation_dict = {
        "deleter": create_deletion_pipeline,
        "indexing": create_indexing_pipeline,
        "rag": create_rag_pipeline,
    }

    pipeline_registry = {}
    for rag_type in ["file", "database"]:
        for task, creation_f in creation_dict.items():
            if rag_type == "database" and task == "deleter":
                continue

            pipe_label = f"{rag_type}_{task}"
            pipeline_save_config = PipelineSaveConfig(
                file_path=None,
                pipe_label=pipe_label,
                png="local",
            )
            indexing_pipeline_config = IndexingPipelineConfig(
                es_host=es_host,
                index=pipe_config["index"][rag_type],
            )
            pipeline_registry[pipe_label] = creation_f(
                embedding_config=embedding_config,
                retrieval_config=rag_pipeline_config.pipeline.retrieval_config,
                indexing_pipeline_config=indexing_pipeline_config,
                pipeline_save_config=pipeline_save_config,
                rag_llms=rag_llms,
            )

    return pipeline_registry
pipeline_moduls

This module provides functions to generate haystack RAG pipelines.

FUNCTION DESCRIPTION
add_answer_generation_pipeline

Create a Haystack pipeline for answer generation.

create_deletion_pipeline

Create a Haystack pipeline to delete documents from temporary storage.

openai_generation_pipe_component

Create the generation pipeline using an OpenAI-conform LLM API.

add_answer_generation_pipeline
add_answer_generation_pipeline(retrieval_pipe, rag_llms, pipeline_save_config, **_)

Create a Haystack pipeline for answer generation.

PARAMETER DESCRIPTION
retrieval_pipe

The retrieval pipeline to use.

TYPE: AsyncPipeline

rag_llms

The RAG config to use.

TYPE: dict[str, RAG]

pipeline_save_config

Configuration for pipelines saving process: - file_path (str | Path): Path to save the pipeline to. - pipe_label (str): Pipeline label. - png (str or None): The type of PNG image to generate. Can be "mermaid", "local" or None.

TYPE: PipelineSaveConfig

RETURNS DESCRIPTION
AsyncPipeline

The Haystack pipeline.

Source code in docs/microservices/rag/src/rag/pipelines/pipeline_definitions/pipeline_moduls.py
def add_answer_generation_pipeline(
    retrieval_pipe: AsyncPipeline,
    rag_llms: dict[str, LLM],
    pipeline_save_config: PipelineSaveConfig,
    **_: object,
) -> AsyncPipeline:
    """Create a Haystack pipeline for answer generation.

    Args:
        retrieval_pipe (AsyncPipeline): The retrieval pipeline to use.
        rag_llms (dict[str, RAG]): The RAG config to use.
        pipeline_save_config (PipelineSaveConfig): Configuration for pipelines saving process:
            - file_path (str | Path): Path to save the pipeline to.
            - pipe_label (str): Pipeline label.
            - png (str or None): The type of PNG image to generate. Can be "mermaid", "local" or None.

    Returns:
        The Haystack pipeline.
    """
    pipe = retrieval_pipe
    metadata = {"pipe_label": pipeline_save_config.pipe_label}
    generator_info = {}

    # initialize conditional LLM routing
    llm_routes = [
        {
            "condition": "{{ generator_model| string == '" + llm_name + "' }}",
            "output": "{{query}}",
            "output_name": llm_name,
            "output_type": str,
        }
        for llm_name, llm in rag_llms.items()
    ]

    joiner = BranchJoiner(str)

    rag_answer_component_name = "rag_answers"
    pipe.add_component(rag_answer_component_name, joiner)
    pipe.add_component("llm_router", ConditionalRouter(routes=llm_routes))
    pipe.connect("query", "llm_router.query")
    docs_component_name = pipe.metadata["retrieved_docs_keys"][0]

    metadata["rag_answers_keys"] = [rag_answer_component_name, "value"]
    # initialize LLMs from LLMConfig File (RAG section)
    if rag_llms:
        for llm_key, llm in rag_llms.items():
            generator_info[llm_key] = llm.model_dump(
                include=["label", "model", "max_chunks_to_use"]
            )
            generation_config = GenerationConfig(
                docs_sender_name=docs_component_name,
                routed_query_name=f"llm_router.{llm_key}",
                name_prefix=llm_key,
                rag_answer_component_name=rag_answer_component_name,
            )
            pipe = openai_generation_pipe_component(
                pipe=pipe,
                llm=llm,
                config=generation_config,
            )

    metadata["llm_generators"] = generator_info

    pipe.metadata.update(metadata)

    # saving pipeline only for the retrieval of documents
    if pipeline_save_config.file_path:
        save_pipe_to(pipeline_save_config.file_path, pipe, png=pipeline_save_config.png)

    return pipe
create_deletion_pipeline
create_deletion_pipeline(indexing_pipeline_config, pipeline_save_config, **_)

Create a Haystack pipeline to delete documents from temporary storage.

PARAMETER DESCRIPTION
indexing_pipeline_config
  • es_host (str): Elasticsearch host to connect to.
  • index (str): Elasticsearch index to use for retrieval.

TYPE: IndexingPipelineConfig

pipeline_save_config

Configuration for pipelines saving process: - file_path (str | Path): Path to save the pipeline to. - pipe_label (str): Pipeline label. - png (str or None): The type of PNG image to generate. Can be "mermaid", "local" or None.

TYPE: PipelineSaveConfig

Source code in docs/microservices/rag/src/rag/pipelines/pipeline_definitions/pipeline_moduls.py
def create_deletion_pipeline(
    indexing_pipeline_config: IndexingPipelineConfig,
    pipeline_save_config: PipelineSaveConfig,
    **_: object,
) -> None:
    """Create a Haystack pipeline to delete documents from temporary storage.

    Args:
        indexing_pipeline_config (IndexingPipelineConfig):
            - es_host (str): Elasticsearch host to connect to.
            - index (str): Elasticsearch index to use for retrieval.
        pipeline_save_config (PipelineSaveConfig): Configuration for pipelines saving process:
            - file_path (str | Path): Path to save the pipeline to.
            - pipe_label (str): Pipeline label.
            - png (str or None): The type of PNG image to generate. Can be "mermaid", "local" or None.

    """
    metadata = {"pipe_label": pipeline_save_config.pipe_label}

    document_store = DocumentStore(
        hosts=indexing_pipeline_config.es_host, index=indexing_pipeline_config.index
    )

    document_deleter = DocumentDeleter(document_store=document_store)
    document_id_retriever = DocumentIDRetriever(document_store=document_store)

    delete_temporary_pipe = AsyncPipeline(metadata=metadata)
    delete_temporary_pipe.add_component(
        instance=document_deleter, name="document_deleter"
    )
    delete_temporary_pipe.add_component(
        instance=document_id_retriever, name="document_id_retriever"
    )
    delete_temporary_pipe.connect("document_id_retriever", "document_deleter")

    # saving pipeline only for the retrieval of documents
    if pipeline_save_config.file_path:
        save_pipe_to(
            pipeline_save_config.file_path,
            delete_temporary_pipe,
            png=pipeline_save_config.png,
        )
    else:
        return delete_temporary_pipe
openai_generation_pipe_component
openai_generation_pipe_component(pipe, llm, config)

Create the generation pipeline using an OpenAI-conform LLM API.

PARAMETER DESCRIPTION
pipe

Pipeline to serialize and optionally generate an image for.

TYPE: AsyncPipeline

llm

Language model object used for response generation.

TYPE: LLM

config

Configuration object containing: - docs_sender_name (str): Name of the document sender component. - routed_query_name (str): Name of the routed query component. - name_prefix (str): Prefix used for naming pipeline components. - rag_answer_component_name (str): Name of the final RAG answer component.

TYPE: GenerationConfig

RETURNS DESCRIPTION
AsyncPipeline

Generated Haystack pipeline.

Source code in docs/microservices/rag/src/rag/pipelines/pipeline_definitions/pipeline_moduls.py
def openai_generation_pipe_component(
    pipe: AsyncPipeline,
    llm: LLM,
    config: GenerationConfig,
) -> AsyncPipeline:
    """Create the generation pipeline using an OpenAI-conform LLM API.

    Args:
        pipe (AsyncPipeline): Pipeline to serialize and optionally generate an image for.
        llm (LLM): Language model object used for response generation.
        config (GenerationConfig): Configuration object containing:
            - docs_sender_name (str): Name of the document sender component.
            - routed_query_name (str): Name of the routed query component.
            - name_prefix (str): Prefix used for naming pipeline components.
            - rag_answer_component_name (str): Name of the final RAG answer component.

    Returns:
        Generated Haystack pipeline.
    """
    name_prefix = f"{config.name_prefix}_" if config.name_prefix else ""

    generation_kwargs = {
        "max_completion_tokens": llm.inference.max_new_tokens,
        "temperature": llm.inference.temperature,
        "top_p": llm.inference.top_p,
    }

    generator = F13OpenAIChatGenerator(
        llm=llm,
        generation_kwargs=generation_kwargs,
        timeout=settings.llm_api_timeout,
    )

    adapter = OutputAdapter(template="{{ replies[0].text }}", output_type=str)

    pipe.add_component(
        f"{name_prefix}prompt_builder",
        ChatPromptBuilder(
            template=[
                ChatMessage.from_system(llm.prompt_config.system.generate),
                ChatMessage.from_user(llm.prompt_config.user.generate),
            ],
            required_variables=["query", "documents"],
        ),
    )
    pipe.add_component(f"{name_prefix}adapter", adapter)
    pipe.add_component(f"{name_prefix}llm", generator)

    pipe.connect(config.docs_sender_name, f"{name_prefix}prompt_builder.documents")
    pipe.connect(config.routed_query_name, f"{name_prefix}prompt_builder.query")
    pipe.connect(f"{name_prefix}prompt_builder", f"{name_prefix}llm")
    pipe.connect(f"{name_prefix}llm.replies", f"{name_prefix}adapter")
    pipe.connect(f"{name_prefix}adapter", config.rag_answer_component_name)

    return pipe
rag_registry

RAG-Registry class for setting-up, storing and accessing RAG pipelines.

CLASS DESCRIPTION
RAGRegistry

Sets up and stores RAG pipelines and makes access possible.

RAGRegistry

Sets up and stores RAG pipelines and makes access possible.

ATTRIBUTE DESCRIPTION
file_rag

Instance of the FileRag-Pipeline.

TYPE: FileRagPipe

database_rag

Instance of the DatabaseRag-Pipeline.

TYPE: DatabaseRagPipe

METHOD DESCRIPTION
file_rag_tempfile_deletion

Runs the temporary file deleter pipeline to delete files older than a specified timestamp.

index_database_rag

Run the database indexing pipeline on the provided files.

run_database_rag

Run the database-rag pipeline to generate an answer based on a query.

run_file_rag

Run the file-rag pipeline to generate an answer based on a query.

Source code in docs/microservices/rag/src/rag/rag_registry.py
class RAGRegistry:
    """Sets up and stores RAG pipelines and makes access possible.

    Attributes:
        file_rag (FileRagPipe): Instance of the FileRag-Pipeline.
        database_rag (DatabaseRagPipe): Instance of the DatabaseRag-Pipeline.
    """

    def __init__(self) -> None:
        """Initializes the RAG Pipelines."""
        self.file_rag: FileRagPipe
        self.database_rag: DatabaseRagPipe

        self._check_for_ranker_model()
        self.file_rag, self.database_rag = self._initialize_pipelines()

    def _check_for_ranker_model(self) -> None:
        """Checks if the RAG pipeline has a ranker model defined and tries to download it."""
        if rag_pipeline_config.pipeline.retrieval_config.include_ranker:
            logger.info("RAG pipeline has a ranker model defined.")
            reranker_model_path = download_reranker_model()

            if not reranker_model_path:
                logger.critical(
                    "Re-ranker model could not be downloaded. Creating Retrieval-Pipeline without Re-ranker."
                )
                rag_pipeline_config.pipeline.retrieval_config.include_ranker = False

    def _initialize_pipelines(self) -> tuple[FileRagPipe, DatabaseRagPipe]:
        try:
            file_rag, database_rag = setup_rag(
                debug_haystack_pipelines=settings.debug_haystack_pipelines
            )
            return file_rag, database_rag
        except Exception as e:
            logger.exception(e)
            sys.exit(-1)  # Terminate application if pipeline not available

    async def run_database_rag(self, rag_input: RAGInput) -> list[RAGOutput]:
        """Run the database-rag pipeline to generate an answer based on a query.

        This method processes the input query through a Retrieval-Augmented Generation (RAG) pipeline,
        which retrieves relevant documents and generates an answer based on that context.

        Args:
            rag_input (RAGInput): A dataclass containing the list of questions.

        Returns:
            A list of dictionaries, each containing a `'question'`, `'answer'`, and `'sources'`.
        """
        if rag_input.language_model not in settings.active_llms.rag:
            logger.error(f"Invalid language model selected: {rag_input.language_model}")
            raise HTTPException(
                status_code=status.HTTP_400_BAD_REQUEST,
                detail=(
                    f"Es wurde ein ungültiges Sprachmodell ausgewählt ({rag_input.language_model})."
                    " Bitte versuchen Sie es mit einem anderen Modell."
                ),
            )

        question = rag_input.question
        try:
            logger.info("Dokumente werden gesucht und eine Antwort wird generiert.")
            meta_data_filters = rag_input.meta_data_filters
            answer, sources = await self.database_rag.generate_answer(
                query=question,
                language_model=rag_input.language_model,
                meta_data_filters=meta_data_filters,
                max_chunks_to_use=rag_input.max_chunks_to_use,
            )

            if len(sources) == 0:
                logger.warning("No matching documents retrieved.")

        except Exception as e:
            logger.error(f"Answer generation failed: {e}")
            raise HTTPException(
                status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
                detail="Die Beantwortung der Frage ist fehlgeschlagen.",
            )

        result = apply_output_format(question, answer, sources)
        logger.debug(f"Answer of the Database-RAG: {result}")

        return [result]

    async def index_database_rag(
        self, db_ingestion: DBIngestionInput
    ) -> dict[str, Any]:
        """Run the database indexing pipeline on the provided files.

        Args:
            db_ingestion: DBIngestionInput: A client-defined connection input containing the ingestion data.

        Returns:
            A status and message response, for example:

            {
                "status": "success",
                "message": "Ingested <n> documents and <m> chunks into the database."
            }
        """
        logger.info("Dokumente werden hochgeladen.")

        docs = [Document(**doc.model_dump()) for doc in db_ingestion.documents]

        n_docs = 0
        n_chunks = 0
        failed_batches = []
        batches = split_list_into_batches(docs, db_ingestion.batch_size)
        for batch in batches:
            try:
                n_docs_batch, n_batch_chunks = await self.database_rag.indexing(
                    files=batch, double_doc_handling=db_ingestion.double_doc_handling
                )
                n_docs += n_docs_batch
                n_chunks += n_batch_chunks
                if n_docs_batch == 0:
                    failed_batches.append(
                        [batch_item.to_dict() for batch_item in batch]
                    )
            except Exception as e:
                logger.exception(
                    f"Error during indexing of the current batch. Error: {e}"
                )
                failed_batches.append([batch_item.to_dict() for batch_item in batch])

        result = {
            "status": "success",
            "message": f"Ingested {n_docs} documents and {n_chunks} chunks into the database.",
        }

        if failed_batches:
            result["status"] = "failed"
            result["failed_batches"] = str(failed_batches)

        return result

    async def run_file_rag(
        self, rag_input: RAGInput, files: list[UploadFile]
    ) -> list[RAGOutput]:
        """Run the file-rag pipeline to generate an answer based on a query.

        This method processes the input query through a Retrieval-Augmented Generation (RAG) pipeline,
        which retrieves relevant documents and generates an answer based on that context.

        Args:
            rag_input (RAGInput): A dataclass containing the list of questions.
            files (list[UploadFiles]): A list of fastapi.UploadFile objects.

        Returns:
            A list of dictionaries, each containing a `'question'`, `'answer'`, and `'sources'`.
        """
        if rag_input.language_model not in settings.active_llms.rag:
            logger.error(f"Invalid language model selected: {rag_input.language_model}")
            raise HTTPException(
                status_code=status.HTTP_400_BAD_REQUEST,
                detail=(
                    f"Es wurde ein ungültiges Sprachmodell ausgewählt ({rag_input.language_model})."
                    " Bitte versuchen Sie es mit einem anderen Modell."
                ),
            )

        source_ids = await self._index_file_rag(files_byte_stream=files)

        return [await self._answer_file_rag(rag_input, source_ids=source_ids)]

    async def _answer_file_rag(
        self, rag_input: RAGInput, source_ids: list[str] = None
    ) -> RAGOutput:
        """Run the file-rag pipeline to generate an answer based on a query.

        This method processes the input query through a Retrieval-Augmented Generation (RAG) pipeline,
        which retrieves relevant documents and generates an answer based on that context.

        Args:
            rag_input (RAGInput): A dataclass containing the list of questions.
            source_ids (list[str]): A list of document IDs to restrict the retrieval context.

        Returns:
            The question, generated answer, and sources.
        """
        question = rag_input.question
        try:
            logger.info(
                "Dokumente werden gesucht und eine Antwort wird generiert.",
            )
            answer, sources = await self.file_rag.generate_answer(
                query=question,
                language_model=rag_input.language_model,
                source_ids=source_ids,
                max_chunks_to_use=rag_input.max_chunks_to_use,
            )

            if len(sources) == 0:
                logger.warning(
                    "No matching documents retrieved.",
                )

        except Exception as e:
            logger.error(f"Answer generation failed: {e}")
            raise HTTPException(
                status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
                detail="Die Beantwortung dieser Frage ist fehlgeschlagen.",
            )

        result = apply_output_format(question, answer, sources)
        logger.debug(f"Answer of the File-RAG: {result}")
        return result

    async def _index_file_rag(self, files_byte_stream: list) -> list[str]:
        """Run the file indexing pipeline on the provided files.

        Takes a list of byte streams representing files, loads and warms up the indexing pipeline,
        and runs it with the provided data. Returns a list of source IDs.

        Args:
            files_byte_stream (list): A list of byte streams representing files.

        Returns:
            A list of source IDs generated by the indexing pipeline.
        """
        logger.info("{%d}} Dokumente werden hochgeladen.", len(files_byte_stream))

        try:
            doc_ids = await self.file_rag.indexing(files=files_byte_stream)
        except Exception:
            logger.exception("Unexpected error during indexing.")
            doc_ids = []

        return doc_ids

    async def file_rag_tempfile_deletion(self, timestamp: datetime.datetime) -> int:
        """Runs the temporary file deleter pipeline to delete files older than a specified timestamp.

        The pipeline uses a filter to delete documents with an ingestion date
        older than or equal to the provided timestamp.
        It then runs the retrieval pipeline to delete these documents.

        Args:
            timestamp (datetime): The maximum ingestion datetime for files to be deleted.

        Returns:
            The number of documents deleted by the pipeline.

        Raises:
            Exception: If any error occurs during pipeline execution.
        """
        try:
            n_deleted_files = await self.file_rag.file_deletion(timestamp=timestamp)

        except Exception:
            logger.exception("Unexpected error during temporary file deletion.")
            n_deleted_files = 0
        return n_deleted_files
file_rag_tempfile_deletion async
file_rag_tempfile_deletion(timestamp)

Runs the temporary file deleter pipeline to delete files older than a specified timestamp.

The pipeline uses a filter to delete documents with an ingestion date older than or equal to the provided timestamp. It then runs the retrieval pipeline to delete these documents.

PARAMETER DESCRIPTION
timestamp

The maximum ingestion datetime for files to be deleted.

TYPE: datetime

RETURNS DESCRIPTION
int

The number of documents deleted by the pipeline.

RAISES DESCRIPTION
Exception

If any error occurs during pipeline execution.

Source code in docs/microservices/rag/src/rag/rag_registry.py
async def file_rag_tempfile_deletion(self, timestamp: datetime.datetime) -> int:
    """Runs the temporary file deleter pipeline to delete files older than a specified timestamp.

    The pipeline uses a filter to delete documents with an ingestion date
    older than or equal to the provided timestamp.
    It then runs the retrieval pipeline to delete these documents.

    Args:
        timestamp (datetime): The maximum ingestion datetime for files to be deleted.

    Returns:
        The number of documents deleted by the pipeline.

    Raises:
        Exception: If any error occurs during pipeline execution.
    """
    try:
        n_deleted_files = await self.file_rag.file_deletion(timestamp=timestamp)

    except Exception:
        logger.exception("Unexpected error during temporary file deletion.")
        n_deleted_files = 0
    return n_deleted_files
index_database_rag async
index_database_rag(db_ingestion)

Run the database indexing pipeline on the provided files.

PARAMETER DESCRIPTION
db_ingestion

DBIngestionInput: A client-defined connection input containing the ingestion data.

TYPE: DBIngestionInput

RETURNS DESCRIPTION
dict[str, Any]

A status and message response, for example:

dict[str, Any]

{ "status": "success", "message": "Ingested documents and chunks into the database."

dict[str, Any]

}

Source code in docs/microservices/rag/src/rag/rag_registry.py
async def index_database_rag(
    self, db_ingestion: DBIngestionInput
) -> dict[str, Any]:
    """Run the database indexing pipeline on the provided files.

    Args:
        db_ingestion: DBIngestionInput: A client-defined connection input containing the ingestion data.

    Returns:
        A status and message response, for example:

        {
            "status": "success",
            "message": "Ingested <n> documents and <m> chunks into the database."
        }
    """
    logger.info("Dokumente werden hochgeladen.")

    docs = [Document(**doc.model_dump()) for doc in db_ingestion.documents]

    n_docs = 0
    n_chunks = 0
    failed_batches = []
    batches = split_list_into_batches(docs, db_ingestion.batch_size)
    for batch in batches:
        try:
            n_docs_batch, n_batch_chunks = await self.database_rag.indexing(
                files=batch, double_doc_handling=db_ingestion.double_doc_handling
            )
            n_docs += n_docs_batch
            n_chunks += n_batch_chunks
            if n_docs_batch == 0:
                failed_batches.append(
                    [batch_item.to_dict() for batch_item in batch]
                )
        except Exception as e:
            logger.exception(
                f"Error during indexing of the current batch. Error: {e}"
            )
            failed_batches.append([batch_item.to_dict() for batch_item in batch])

    result = {
        "status": "success",
        "message": f"Ingested {n_docs} documents and {n_chunks} chunks into the database.",
    }

    if failed_batches:
        result["status"] = "failed"
        result["failed_batches"] = str(failed_batches)

    return result
run_database_rag async
run_database_rag(rag_input)

Run the database-rag pipeline to generate an answer based on a query.

This method processes the input query through a Retrieval-Augmented Generation (RAG) pipeline, which retrieves relevant documents and generates an answer based on that context.

PARAMETER DESCRIPTION
rag_input

A dataclass containing the list of questions.

TYPE: RAGInput

RETURNS DESCRIPTION
list[RAGOutput]

A list of dictionaries, each containing a 'question', 'answer', and 'sources'.

Source code in docs/microservices/rag/src/rag/rag_registry.py
async def run_database_rag(self, rag_input: RAGInput) -> list[RAGOutput]:
    """Run the database-rag pipeline to generate an answer based on a query.

    This method processes the input query through a Retrieval-Augmented Generation (RAG) pipeline,
    which retrieves relevant documents and generates an answer based on that context.

    Args:
        rag_input (RAGInput): A dataclass containing the list of questions.

    Returns:
        A list of dictionaries, each containing a `'question'`, `'answer'`, and `'sources'`.
    """
    if rag_input.language_model not in settings.active_llms.rag:
        logger.error(f"Invalid language model selected: {rag_input.language_model}")
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=(
                f"Es wurde ein ungültiges Sprachmodell ausgewählt ({rag_input.language_model})."
                " Bitte versuchen Sie es mit einem anderen Modell."
            ),
        )

    question = rag_input.question
    try:
        logger.info("Dokumente werden gesucht und eine Antwort wird generiert.")
        meta_data_filters = rag_input.meta_data_filters
        answer, sources = await self.database_rag.generate_answer(
            query=question,
            language_model=rag_input.language_model,
            meta_data_filters=meta_data_filters,
            max_chunks_to_use=rag_input.max_chunks_to_use,
        )

        if len(sources) == 0:
            logger.warning("No matching documents retrieved.")

    except Exception as e:
        logger.error(f"Answer generation failed: {e}")
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail="Die Beantwortung der Frage ist fehlgeschlagen.",
        )

    result = apply_output_format(question, answer, sources)
    logger.debug(f"Answer of the Database-RAG: {result}")

    return [result]
run_file_rag async
run_file_rag(rag_input, files)

Run the file-rag pipeline to generate an answer based on a query.

This method processes the input query through a Retrieval-Augmented Generation (RAG) pipeline, which retrieves relevant documents and generates an answer based on that context.

PARAMETER DESCRIPTION
rag_input

A dataclass containing the list of questions.

TYPE: RAGInput

files

A list of fastapi.UploadFile objects.

TYPE: list[UploadFiles]

RETURNS DESCRIPTION
list[RAGOutput]

A list of dictionaries, each containing a 'question', 'answer', and 'sources'.

Source code in docs/microservices/rag/src/rag/rag_registry.py
async def run_file_rag(
    self, rag_input: RAGInput, files: list[UploadFile]
) -> list[RAGOutput]:
    """Run the file-rag pipeline to generate an answer based on a query.

    This method processes the input query through a Retrieval-Augmented Generation (RAG) pipeline,
    which retrieves relevant documents and generates an answer based on that context.

    Args:
        rag_input (RAGInput): A dataclass containing the list of questions.
        files (list[UploadFiles]): A list of fastapi.UploadFile objects.

    Returns:
        A list of dictionaries, each containing a `'question'`, `'answer'`, and `'sources'`.
    """
    if rag_input.language_model not in settings.active_llms.rag:
        logger.error(f"Invalid language model selected: {rag_input.language_model}")
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=(
                f"Es wurde ein ungültiges Sprachmodell ausgewählt ({rag_input.language_model})."
                " Bitte versuchen Sie es mit einem anderen Modell."
            ),
        )

    source_ids = await self._index_file_rag(files_byte_stream=files)

    return [await self._answer_file_rag(rag_input, source_ids=source_ids)]
utils

This module contains helper functions for RAG pipelines.

FUNCTION DESCRIPTION
apply_output_format

Apply the output format to a question, its answer, and the associated sources with metadata.

convert_uploadfile_to_byte_stream

Convert an uploaded file object into a ByteStream instance.

format_content_text

Format the content text for display in the UI.

get_dict_value

Extract a nested value from a dictionary.

save_pipe_to

Serialize the pipe configuration to a YAML file and optionally generate a PNG image representing the pipe.

save_to_yaml

Save data to a YAML file.

split_list_into_batches

Split a list into batches of n elements.

apply_output_format
apply_output_format(question, answer, sources_raw, db2ui_map=None)

Apply the output format to a question, its answer, and the associated sources with metadata.

PARAMETER DESCRIPTION
question

The question to be formatted.

TYPE: str

answer

The answer to the question.

TYPE: str

sources_raw

A list of source objects with associated metadata.

TYPE: list[Any]

db2ui_map

Mapping from database metadata_keys to the displayed strings in the UI. Default mapping is defined in the rag_pipeline_config

TYPE: dict[str, str] | None DEFAULT: None

RETURNS DESCRIPTION
RAGOutput

The formatted question, answer, and sources.

Source code in docs/microservices/rag/src/rag/utils.py
def apply_output_format(
    question: str, answer: str, sources_raw: list, db2ui_map: dict | None = None
) -> RAGOutput:
    """Apply the output format to a question, its answer, and the associated sources with metadata.

    Args:
        question (str): The question to be formatted.
        answer (str): The answer to the question.
        sources_raw (list[Any]): A list of source objects with associated metadata.
        db2ui_map (dict[str,str] | None): Mapping from database metadata_keys to the displayed strings in the UI.
                                   Default mapping is defined in the rag_pipeline_config

    Returns:
        The formatted question, answer, and sources.
    """
    if db2ui_map is None:
        db2ui_map = rag_pipeline_config.db2ui_map.model_dump()
    sources = []
    for source in sources_raw:
        # extract metadata listed in db2ui-map and apply human-readable name
        metadata = {db2ui_map[k]: v for k, v in source.meta.items() if k in db2ui_map}

        # extract url if set
        source_url = source.meta.get("url", None)

        # apply content-formatting
        formatted_content = format_content_text(source.content)
        sources.append(
            {"content": formatted_content, "meta": metadata, "url": source_url}
        )

    out = RAGOutput(question=question, answer=answer, sources=sources)

    return out
convert_uploadfile_to_byte_stream
convert_uploadfile_to_byte_stream(upload_file)

Convert an uploaded file object into a ByteStream instance.

Reads the file content and wraps it in a ByteStream along with metadata such as the file name and MIME type.

PARAMETER DESCRIPTION
upload_file

The uploaded file object (e.g., an instance of FastAPI's UploadFile).

TYPE: UploadFile

RETURNS DESCRIPTION
ByteStream

An object containing the file's bytes, file name, and MIME type.

Source code in docs/microservices/rag/src/rag/utils.py
def convert_uploadfile_to_byte_stream(upload_file: UploadFile) -> ByteStream:
    """Convert an uploaded file object into a `ByteStream` instance.

    Reads the file content and wraps it in a `ByteStream` along with metadata
    such as the file name and MIME type.

    Args:
        upload_file: The uploaded file object (e.g., an instance of FastAPI's `UploadFile`).

    Returns:
        An object containing the file's bytes, file name, and MIME type.
    """
    byte_stream = ByteStream(
        data=upload_file.file.read(),
        meta={"file_name": upload_file.filename},
        mime_type=upload_file.content_type,
    )
    return byte_stream
format_content_text
format_content_text(content_text)

Format the content text for display in the UI.

PARAMETER DESCRIPTION
content_text

The content text to be formatted.

TYPE: str

RETURNS DESCRIPTION
str

The formatted content text.

Source code in docs/microservices/rag/src/rag/utils.py
def format_content_text(content_text: str) -> str:
    """Format the content text for display in the UI.

    Args:
        content_text (str): The content text to be formatted.

    Returns:
        The formatted content text.
    """
    # if content is None return empty string
    if not content_text:
        return ""

    # remove single linebreaks
    text = content_text.replace("\n\n", "<|LINEBREAK|>")
    text = text.replace("-\n", "")
    text = text.replace("\n", " ")
    text = text.replace("<|LINEBREAK|>", "\n")

    # remove leading metadata
    text = text.split(rag_pipeline_config.pipeline.metadata_title_separator)[-1]

    text = text.strip()

    return text
get_dict_value
get_dict_value(dd, keys)

Extract a nested value from a dictionary.

PARAMETER DESCRIPTION
dd

The dictionary from which the value will be extracted.

TYPE: dict

keys

A list representing the path of keys to the desired value.

TYPE: list

RETURNS DESCRIPTION
dict

The extracted value from the dictionary.

Source code in docs/microservices/rag/src/rag/utils.py
def get_dict_value(dd: dict, keys: list[str]) -> dict:
    """Extract a nested value from a dictionary.

    Args:
        dd (dict): The dictionary from which the value will be extracted.
        keys (list): A list representing the path of keys to the desired value.

    Returns:
        The extracted value from the dictionary.
    """
    d = deepcopy(dd)
    for k in keys:
        d = d[k]
    return d
save_pipe_to
save_pipe_to(file_path, pipe, png=None)

Serialize the pipe configuration to a YAML file and optionally generate a PNG image representing the pipe.

PARAMETER DESCRIPTION
file_path

The path where the pipe configuration will be saved (as a YAML file).

TYPE: str

pipe

The pipe to serialize and optionally generate an image for.

TYPE: AsyncPipeline

png

The type of PNG image to generate. Can be "mermaid", "local", or None.

TYPE: str or None DEFAULT: None

RAISES DESCRIPTION
ImportError

If local PNG export fails due to a missing or faulty dependency.

Source code in docs/microservices/rag/src/rag/utils.py
def save_pipe_to(file_path: str, pipe: AsyncPipeline, png: str = None) -> None:
    """Serialize the pipe configuration to a YAML file and optionally generate a PNG image representing the pipe.

    Args:
        file_path (str): The path where the pipe configuration will be saved (as a YAML file).
        pipe (AsyncPipeline): The pipe to serialize and optionally generate an image for.
        png (str or None): The type of PNG image to generate. Can be "mermaid", "local", or `None`.

    Raises:
        ImportError: If local PNG export fails due to a missing or faulty dependency.
    """
    pipe_config = pipe.to_dict()
    save_to_yaml(file_path, pipe_config)
    png_path = Path(file_path).with_suffix(".png")
    if png == "mermaid":
        try:
            pipe.draw(png_path)
        except Exception as e:
            logger.warning(
                f"There was an issues with mermaid.ink to draw the pipeline: {e}"
            )
    elif png == "local":
        spec = importlib.util.find_spec("pygraphviz")
        if spec is not None:
            draw_pipeline(graph=pipe.graph, path=png_path)
        else:
            logger.warning(
                "Pipeline PNG-Export failed. Could not import `pygraphviz`. Please install via: \n"
                "'pip install pygraphviz'\n"
                "(You might need to run this first: 'apt install libgraphviz-dev graphviz')"
            )
    elif png is None:
        pass
    else:
        logger.warning(f"no png generated. Option {png=} does not exist")
save_to_yaml
save_to_yaml(file_path, data_to_write)

Save data to a YAML file.

PARAMETER DESCRIPTION
file_path

Path to the YAML file.

TYPE: str

data_to_write

Data to write to the YAML file.

TYPE: dict

Source code in docs/microservices/rag/src/rag/utils.py
def save_to_yaml(file_path: str, data_to_write: dict) -> None:
    """Save data to a YAML file.

    Args:
        file_path (str): Path to the YAML file.
        data_to_write (dict): Data to write to the YAML file.
    """
    with open(file_path, "w") as outfile:
        yaml.dump(data_to_write, outfile, default_flow_style=False, allow_unicode=True)
split_list_into_batches
split_list_into_batches(lst, n)

Split a list into batches of n elements.

PARAMETER DESCRIPTION
lst

The list to be split.

TYPE: list

n

The size of each batch.

TYPE: int

RETURNS DESCRIPTION
list

A list of batches, where each batch is a sublist of length n (except possibly the last one).

Source code in docs/microservices/rag/src/rag/utils.py
def split_list_into_batches(lst: list, n: int) -> list:
    """Split a list into batches of `n` elements.

    Args:
        lst (list): The list to be split.
        n (int): The size of each batch.

    Returns:
        A list of batches, where each batch is a sublist of length `n` (except possibly the last one).
    """
    return [lst[i : i + n] for i in range(0, len(lst), n)]

settings

Load all settings from a central place, not hidden in utils.

utils

Utils functions for logging, LLM availability check and configuration processing.

MODULE DESCRIPTION
base_logger

Set up the root logger for the entire application. This logger will log messages to the console and a file.

check_model_api_availability

This module provides functions to check LLM-APIs for availability.

delete_elasticsearch_tempfiles

This module contains a function to delete documents in the tempfile index in the Elasticsearch database.

download_reranker_model

Download re-ranker model defined in rag_pipeline_config.yml from huggingface.

process_configs

Methods to load and config and start checks of config integrity.

base_logger

Set up the root logger for the entire application. This logger will log messages to the console and a file.

FUNCTION DESCRIPTION
setup_logger

Initialize the logger with the desired log level and add handlers.

setup_logger
setup_logger()

Initialize the logger with the desired log level and add handlers.

Sets up the root logger, which all other loggers inherit from. Adds file, console and exit handlers to the logger and sets the format.

Source code in docs/microservices/rag/src/utils/base_logger.py
def setup_logger() -> None:
    """Initialize the logger with the desired log level and add handlers.

    Sets up the root logger, which all other loggers inherit from.
    Adds file, console and exit handlers to the logger and sets the format.
    """
    # root logger, all other loggers inherit from this
    logger = logging.getLogger()

    # create different handlers for log file and console
    file_handler = logging.handlers.RotatingFileHandler(
        filename=settings.log_file,
        maxBytes=settings.log_file_max_bytes,
        backupCount=settings.log_file_backup_count,
    )
    console_handler = logging.StreamHandler()

    # define log format and set for each handler
    formatter = logging.Formatter(
        fmt="%(asctime)s - %(levelname)8s - %(module)s - %(funcName)s: %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S%z",
    )
    file_handler.setFormatter(formatter)
    console_handler.setFormatter(formatter)

    # add handlers to the logger
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)

    # Set level for 'haystack' logger
    logging.getLogger("haystack").setLevel(settings.haystack_log_level)

    # Set global logger level
    logger.setLevel(settings.log_level)
check_model_api_availability

This module provides functions to check LLM-APIs for availability.

To check a certain LLM use await check_model_api(llm). To get all LLMs that are activated in configs/general.yml, use await get_available_llms().

FUNCTION DESCRIPTION
get_available_llms

Returns a list of available LLMs.

is_model_api_available

Check if API is available using credentials.

get_available_llms async
get_available_llms()

Returns a list of available LLMs.

RETURNS DESCRIPTION
list[dict[str, str]]

List of available LLMs with selected infos.

Source code in docs/microservices/rag/src/utils/check_model_api_availability.py
async def get_available_llms() -> list[dict[str, str]]:
    """Returns a list of available LLMs.

    Returns:
        List of available LLMs with selected infos.
    """
    available_llms = {}

    # iterate over model_groups (services), i.e. chat, RAG, embedding, ...
    for model_group_key in llm_config:
        available_llms[model_group_key] = []
        logger.debug(f"Checking APIs for {model_group_key}-LLMs.")
        model_group = llm_config[model_group_key]

        for llm_name, llm in model_group.items():
            logger.debug(f"Checking availability of {llm_name}")
            if await is_model_api_available(llm_api=llm.api, llm_name=llm_name):
                llm_dict = llm.model_dump(
                    include=["label", "is_remote", "max_chunks_to_use"]
                )
                llm_dict["name"] = llm_name

                available_llms[model_group_key].append(llm_dict)

    if not available_llms["embedding"]:
        return []  # if no embedding model available, then RAG-service is unavailable

    return available_llms["rag"]
is_model_api_available async
is_model_api_available(llm_api, llm_name, timeout_in_s=10)

Check if API is available using credentials.

Availability is checked by sending a HEAD, GET, or POST request. If a health_check endpoint is provided, the request is sent to that endpoint; otherwise, it is sent to the main API URL.

PARAMETER DESCRIPTION
llm_api

the LLMAPI instance to check

TYPE: LLMAPI

llm_name

ID of the LLM as used in the config file as reference

TYPE: str

timeout_in_s

http timeout in seconds; defaults to 10

TYPE: int DEFAULT: 10

RETURNS DESCRIPTION
bool

True if the model API is available.

Source code in docs/microservices/rag/src/utils/check_model_api_availability.py
async def is_model_api_available(
    llm_api: LLMAPI,
    llm_name: str,
    timeout_in_s: int = 10,
) -> bool:
    """Check if API is available using credentials.

    Availability is checked by sending a HEAD, GET, or POST request. If a health_check endpoint is provided,
    the request is sent to that endpoint; otherwise, it is sent to the main API URL.

    Args:
        llm_api (LLMAPI): the LLMAPI instance to check
        llm_name (str): ID of the LLM as used in the config file as reference
        timeout_in_s (int): http timeout in seconds; defaults to 10

    Returns:
        True if the model API is available.
    """
    headers = {"Content-type": "application/json"}

    # Authorization is not always needed
    if llm_api.auth:
        headers["Authorization"] = llm_api.auth.get_auth_header()

    url = llm_api.get_health_check_url()

    # test health check endpoint with GET, HEAD and POST
    try:
        async with httpx.AsyncClient() as client:
            response = await client.get(
                url,
                headers=headers,
                timeout=timeout_in_s,
            )
        logger.debug(
            f"{url} health check via GET request: {response.status_code=}, LLM: '{llm_name}"
        )

        # test with HEAD
        if response.status_code != HTTPStatus.OK:
            async with httpx.AsyncClient() as client:
                response = await client.head(
                    url,
                    headers=headers,
                    timeout=timeout_in_s,
                )
            logger.debug(
                f"{url} health check via HEAD request: {response.status_code=}, LLM: '{llm_name}"
            )

        # test with POST
        if response.status_code != HTTPStatus.OK:
            async with httpx.AsyncClient() as client:
                response = await client.post(
                    url,
                    headers=headers,
                    timeout=timeout_in_s,
                )
            logger.debug(
                f"{url} health check via POST request: {response.status_code=}, LLM: '{llm_name}"
            )

    except Exception as e:
        logger.warning(
            f"Exception when trying to reach LLM API. Error: {e}, LLM: '{llm_name}"
        )
        return False

    if response.status_code != HTTPStatus.OK:
        logger.warning(
            f"LLM unavailable: Could not establish connection to LLM-API. LLM: '{llm_name}"
        )

    return response.status_code == HTTPStatus.OK
delete_elasticsearch_tempfiles

This module contains a function to delete documents in the tempfile index in the Elasticsearch database.

FUNCTION DESCRIPTION
cleanup_elasticsearch_tempindex_task

Cleanup temp-file index in Elasticsearch database.

cleanup_elasticsearch_tempindex_task
cleanup_elasticsearch_tempindex_task()

Cleanup temp-file index in Elasticsearch database.

RETURNS DESCRIPTION
bool

True if cleanup was successful.

Source code in docs/microservices/rag/src/utils/delete_elasticsearch_tempfiles.py
def cleanup_elasticsearch_tempindex_task() -> bool:
    """Cleanup temp-file index in Elasticsearch database.

    Returns:
        True if cleanup was successful.
    """
    timestamp = datetime.now(tz=timezone("Europe/Berlin")) - timedelta(minutes=15)
    try:
        n_docs = rag_registry.rag_registry.file_rag_tempfile_deletion(timestamp)
        logger.info(
            f"Cleaned uploaded documents in elasticsearch temporary index, "
            f"that were older than one hour. {n_docs} were deleted."
        )
        return True

    except Exception:
        logger.exception("Failed to cleanup elasticsearch temporary index")
        return False
download_reranker_model

Download re-ranker model defined in rag_pipeline_config.yml from huggingface.

FUNCTION DESCRIPTION
download_reranker_model

Check if re-ranker model needs to be obtained from Huggingface and download it.

download_reranker_model
download_reranker_model()

Check if re-ranker model needs to be obtained from Huggingface and download it.

RETURNS DESCRIPTION
str | None

Local path of file or None if model could not be loaded.

Source code in docs/microservices/rag/src/utils/download_reranker_model.py
def download_reranker_model() -> str | None:
    """Check if re-ranker model needs to be obtained from Huggingface and download it.

    Returns:
        Local path of file or None if model could not be loaded.
    """
    config = rag_pipeline_config.pipeline.retrieval_config

    logger.info(
        f"Downloading re-ranker model. {config.ranker_model} and storing it in {config.ranker_model_path}"
    )
    try:
        reranker_model_path = snapshot_download(
            repo_id=config.ranker_model, local_dir=config.ranker_model_path
        )
        logger.info(f"Saved re-ranker model to {reranker_model_path}")

        return reranker_model_path
    except Exception:
        logger.exception("Failed to download re-ranker model")
        return None
process_configs

Methods to load and config and start checks of config integrity.

FUNCTION DESCRIPTION
load_all_configs

Load config settings from respective paths.

load_from_yml_in_pydantic_model

Load config from 'list_of_yaml_paths' into given pydantic-Model.

load_yaml

Load yaml.

merge_specific_cfgs_in_place

Copy Prompt-config to appropriate section in general llm_config. Edit in-place!

postprocess_configs

Post-Process loaded configs.

remove_inactive_models

Remove models from all use cases, if they are not in 'active_llms'. Edit in-place!

load_all_configs
load_all_configs(general_config_paths, path_to_llm_prompts, path_to_llm_model_configs, path_to_rag_config)

Load config settings from respective paths.

PARAMETER DESCRIPTION
general_config_paths

Path to config, matching 'Settings'

TYPE: Path

path_to_llm_prompts

Path to config, matching 'LLMPromptMaps'

TYPE: Path

path_to_llm_model_configs

Path to config, matching 'LLMConfig'

TYPE: Path

path_to_rag_config

Path to config, matching 'RAGPipelineConfig'

TYPE: Path

RETURNS DESCRIPTION
tuple[Settings, LLMConfig, RAGPipelineConfig]

Config loaded into their Pydantic Model.

Source code in docs/microservices/rag/src/utils/process_configs.py
def load_all_configs(
    general_config_paths: Path,
    path_to_llm_prompts: Path,
    path_to_llm_model_configs: Path,
    path_to_rag_config: Path,
) -> tuple[Settings, LLMConfig, RAGPipelineConfig]:
    """Load config settings from respective paths.

    Args:
        general_config_paths (Path): Path to config, matching 'Settings'
        path_to_llm_prompts (Path): Path to config, matching 'LLMPromptMaps'
        path_to_llm_model_configs (Path): Path to config, matching 'LLMConfig'
        path_to_rag_config (Path): Path to config, matching 'RAGPipelineConfig'

    Returns:
        Config loaded into their Pydantic Model.

    """
    settings = load_from_yml_in_pydantic_model(general_config_paths, Settings)
    llm_prompts = load_from_yml_in_pydantic_model(path_to_llm_prompts, LLMPromptMaps)
    llm_config = load_from_yml_in_pydantic_model(path_to_llm_model_configs, LLMConfig)
    rag_config = load_from_yml_in_pydantic_model(path_to_rag_config, RAGPipelineConfig)

    postprocess_configs(settings, llm_prompts, llm_config)

    return settings, llm_config, rag_config
load_from_yml_in_pydantic_model
load_from_yml_in_pydantic_model(yaml_path, pydantic_reference_model)

Load config from 'list_of_yaml_paths' into given pydantic-Model.

PARAMETER DESCRIPTION
yaml_path

Yaml to load

TYPE: Path

pydantic_reference_model

pydantic model to load yaml into

TYPE: BaseModel

RETURNS DESCRIPTION
BaseModel

BaseModel derived pydantic data class.

Source code in docs/microservices/rag/src/utils/process_configs.py
def load_from_yml_in_pydantic_model(
    yaml_path: Path, pydantic_reference_model: BaseModel
) -> BaseModel:
    """Load config from 'list_of_yaml_paths' into given pydantic-Model.

    Args:
        yaml_path (Path): Yaml to load
        pydantic_reference_model (BaseModel): pydantic model to load yaml into

    Returns:
        BaseModel derived pydantic data class.

    """
    data = load_yaml(yaml_path)

    try:
        pydantic_class = pydantic_reference_model(**data)
        logger.info(f"Config loaded from: '{yaml_path}'")
        return pydantic_class

    except ValidationError as e:
        logger.critical(f"Error loading config: '{e}'")
        raise e
load_yaml
load_yaml(yaml_path)

Load yaml.

PARAMETER DESCRIPTION
yaml_path

Path to yaml

TYPE: list[Path]

RETURNS DESCRIPTION
dict[str, Any]

Content of loaded yaml.

Source code in docs/microservices/rag/src/utils/process_configs.py
def load_yaml(yaml_path: Path) -> dict[str, Any]:
    """Load yaml.

    Args:
        yaml_path (list[Path]): Path to yaml

    Returns:
        Content of loaded yaml.

    """
    if not yaml_path.exists():
        logger.error(f"Invalid path: '{yaml_path}'")
        raise FileNotFoundError

    with open(yaml_path) as file:
        return yaml.safe_load(file)
merge_specific_cfgs_in_place
merge_specific_cfgs_in_place(llm_config, llm_prompts)

Copy Prompt-config to appropriate section in general llm_config. Edit in-place!

Only if 'prompt_map' in LLMConfig can be found in LLMPromptMaps, it will be merged. i.e. try to generalize sth. like this:

cfg["test_model:local"].prompt_config = prompt[cfg["test_model:local"].prompt_map]

PARAMETER DESCRIPTION
llm_config

Target for merge of Prompt parameter

TYPE: LLMConfig

llm_prompts

Source to merge Prompt parameter from

TYPE: LLMPromptMaps

RETURNS DESCRIPTION
bool

True if no problems occurred.

Source code in docs/microservices/rag/src/utils/process_configs.py
def merge_specific_cfgs_in_place(
    llm_config: LLMConfig, llm_prompts: LLMPromptMaps
) -> bool:
    """Copy Prompt-config to appropriate section in general llm_config. Edit in-place!

    Only if 'prompt_map' in LLMConfig can be found in LLMPromptMaps, it will be merged.
    i.e. try to generalize sth. like this:

    cfg["test_model:local"].prompt_config = prompt[cfg["test_model:local"].prompt_map]

    Args:
        llm_config (LLMConfig): Target for merge of Prompt parameter
        llm_prompts (LLMPromptMaps): Source to merge Prompt parameter from

    Returns:
        True if no problems occurred.

    """
    no_issues_occurred = True
    for usecase in llm_config:
        if usecase == "embedding":
            continue
        # load identical use-cases, i.e. chat, RAG
        try:
            cfg = getattr(llm_config, usecase)
            prompt = getattr(llm_prompts, usecase)
        except AttributeError as e:
            logger.exception(e)
            logger.warning(
                f"Usecase '{usecase}' not matching between prompt- and general llm config. \
                    Skipping cfg-merge for '{usecase}' .."
            )
            no_issues_occurred = False
            continue

        # copy prompt config to its use-case- and model-counterpart
        for model in cfg:
            prompt_map_to_use = cfg[model].prompt_map
            if prompt_map_to_use in prompt:
                cfg[model].prompt_config = prompt[prompt_map_to_use]
            else:
                logger.warning(
                    f"'prompt_map: {prompt_map_to_use}' from LLM-config not in prompt-config for '{usecase}'. \
                        Skipping .."
                )
                no_issues_occurred = False
                continue

    return no_issues_occurred
postprocess_configs
postprocess_configs(settings, llm_prompts, llm_config)

Post-Process loaded configs.

Remove unused models (from settings.active_llms), merge LLMPromptMaps into LLMConfig.

PARAMETER DESCRIPTION
settings

Config matching pydantic 'Settings'.

TYPE: Settings

llm_prompts

Config matching pydantic 'LLMPromptMaps'.

TYPE: LLMPromptMaps

llm_config

Config matching pydantic 'LLMConfig'.

TYPE: LLMConfig

RETURNS DESCRIPTION
LLMConfig

Merged and filtered LLM configuration.

Source code in docs/microservices/rag/src/utils/process_configs.py
def postprocess_configs(
    settings: Settings, llm_prompts: LLMPromptMaps, llm_config: LLMConfig
) -> LLMConfig:
    """Post-Process loaded configs.

    Remove unused models (from settings.active_llms), merge LLMPromptMaps into LLMConfig.

    Args:
        settings (Settings): Config matching pydantic 'Settings'.
        llm_prompts (LLMPromptMaps): Config matching pydantic 'LLMPromptMaps'.
        llm_config (LLMConfig): Config matching pydantic 'LLMConfig'.

    Returns:
        Merged and filtered LLM configuration.
    """
    remove_inactive_models(llm_config, settings.active_llms)
    merge_specific_cfgs_in_place(llm_config, llm_prompts)

    return llm_config
remove_inactive_models
remove_inactive_models(input_config, active_llms)

Remove models from all use cases, if they are not in 'active_llms'. Edit in-place!

PARAMETER DESCRIPTION
input_config

Config to change

TYPE: LLMConfig

active_llms

Models to keep - remove other

TYPE: list[str]

Source code in docs/microservices/rag/src/utils/process_configs.py
def remove_inactive_models(input_config: LLMConfig, active_llms: list[str]) -> None:
    """Remove models from all use cases, if they are not in 'active_llms'. Edit in-place!

    Args:
        input_config (LLMConfig): Config to change
        active_llms (list[str]): Models to keep - remove other

    """
    for usecase in input_config:
        if usecase == "embedding":
            continue
        cfg = getattr(input_config, usecase)
        active_llms_for_usecase = getattr(active_llms, usecase)
        for model in list(cfg):
            if model not in active_llms_for_usecase:
                cfg.pop(model)