Zum Inhalt

Chat

chat

MODULE DESCRIPTION
main

Main module of the application.

src

Source code of the chat containing core components and utilities.

main

Main module of the application.

This module serves as the entry point for the program. It imports necessary modules, sets up any initial configuration or data structures, and possibly defines main functions or classes that are used throughout the application.

src

Source code of the chat containing core components and utilities.

MODULE DESCRIPTION
app

Initialize the app.

chat

Implementation of the core logic and interaction flow of the chat.

endpoints

Endpoints of the chat microservice.

models

Data model classes for loading and validation API and configuration parameters.

openai_custom_auth

Costumized Httpx Authentication Client.

settings

Load all settings from a central place, not hidden in utils.

utils

Utils functions for logging, LLM availability check and configuration processing.

app

Initialize the app.

FUNCTION DESCRIPTION
lifespan

Sets up a scheduler and updates available llms.

lifespan async
lifespan(_app)

Sets up a scheduler and updates available llms.

This lifespan function is started on startup of FastAPI. The first part - till yield is executed on startup and initializes a scheduler to regulary check the LLM-API. The second part is executed on shutdown and is used to clean up the scheduler.

The available LLMs - i.e. the LLMs where API-checks passed - are cached in FastAPI state object as app.state.available_llms.

Source code in docs/microservices/chat/src/app.py
@asynccontextmanager
async def lifespan(_app: FastAPI) -> AsyncGenerator[None]:
    """Sets up a scheduler and updates available llms.

    This lifespan function is started on startup of FastAPI. The first part
    - till `yield` is executed on startup and initializes a scheduler to regulary
    check the LLM-API. The second part is executed on shutdown and is used to
    clean up the scheduler.

    The available LLMs - i.e. the LLMs where API-checks passed - are cached in
    FastAPI state object as `app.state.available_llms`.
    """

    async def update_llm_state() -> None:
        _app.state.available_llms = await get_available_llms()

    # store available LLMs in FastAPI app state
    _app.state.available_llms = await get_available_llms()

    # setup a scheduler
    scheduler = AsyncIOScheduler()
    scheduler.add_job(
        update_llm_state,
        "interval",
        seconds=settings.check_llm_api_interval_in_s,
    )
    scheduler.start()

    yield

    # cleanup
    scheduler.shutdown()

chat

Implementation of the core logic and interaction flow of the chat.

MODULE DESCRIPTION
chat_completion

Chat completion model running on an OpenAI-conform API.

chat_registry

Chat-Registry class for storing and accessing Chat-Providers (OpenAIChatCompletion).

chat_completion

Chat completion model running on an OpenAI-conform API.

CLASS DESCRIPTION
OpenAIChatCompletion

Chat completion model running on an OpenAI-conform API.

OpenAIChatCompletion

Chat completion model running on an OpenAI-conform API.

ATTRIBUTE DESCRIPTION
llm

Object describing the LLM.

TYPE: LLM

auth_client

Authentication client for various APIs.

TYPE: CustomAuthClient

llm_client

LLM client using AsnycOpenAI API.

TYPE: AsyncOpenAI

METHOD DESCRIPTION
run_chat_completion

Continues a chat history by generating the next assistant message.

run_chat_completion_stream

Continues a chat history by generating the next assistant message.

Source code in docs/microservices/chat/src/chat/chat_completion.py
class OpenAIChatCompletion:
    """Chat completion model running on an OpenAI-conform API.

    Attributes:
        llm (LLM): Object describing the LLM.
        auth_client (CustomAuthClient): Authentication client for various APIs.
        llm_client (AsyncOpenAI): LLM client using AsnycOpenAI API.
    """

    def __init__(self, llm: LLM) -> None:
        """Initializes the model with the LLM and the credentials."""
        self.llm: LLM = llm
        self.auth_client: CustomAuthClient = self._setup_auth_client()
        self.llm_client: AsyncOpenAI = self._setup_llm_client()

    async def run_chat_completion(self, chat_input: ChatInput) -> ChatOutput:
        """Continues a chat history by generating the next assistant message.

        Args:
            chat_input (ChatInput): Chat containing new message and chat history.

        Returns:
            Generated chat message output.
        """
        messages = self._preprocess_chat_history(chat_input.as_list)
        chat_output_raw = await self._generate(messages, response_format="text")
        chat_output = self._postprocess_result(chat_output=chat_output_raw)

        return chat_output

    async def run_chat_completion_stream(
        self, chat_input: ChatInput
    ) -> AsyncGenerator[str]:
        """Continues a chat history by generating the next assistant message.

        Args:
            chat_input (ChatInput): Chat containing new message and chat history.

        Returns:
            Generated chat output as stream.
        """
        messages = self._preprocess_chat_history(chat_input.as_list)
        raw_stream = self._generate_stream(
            messages=messages,
            response_format="text",
        )
        async for processed_chunk in self._postprocess_stream(raw_stream):
            yield ChatStreamOutput(**processed_chunk).model_dump_json() + "\n"

    def _preprocess_chat_history(
        self,
        chat_list: list[dict[str, str]],
    ) -> list[dict[str, str]]:
        """Ensures system prompt exists and trims the entire chat history.

        Ensures system prompt exists and adds it if necessary.
        Trims the chat history to fit within the model's maximum context length.
        The oldest messages are removed first if the token limit is exceeded.
        The system message is always preserved at the beginning of the history.

        Args:
            chat_list (list[dict[str, str]]): The chat history with roles and content.

        Returns:
            list[dict[str, str]]: The trimmed chat history.
        """
        max_new_tokens = self.llm.inference.max_new_tokens or 0
        max_total_tokens = self.llm.max_context_tokens + max_new_tokens

        system_prompt = {
            "role": "system",
            "content": self.llm.prompt_config.system.generate,
        }

        if not chat_list or chat_list[0].get("role") != "system":
            chat_list = [system_prompt] + chat_list

        def estimate_tokens(text: str) -> int:
            return int(len(text) / self.llm.character_to_token)

        total_tokens = estimate_tokens(chat_list[0]["content"])
        trimmed_history = [chat_list[0]]

        for message in reversed(chat_list[1:]):
            message_tokens = estimate_tokens(message.get("content", ""))
            if total_tokens + message_tokens > max_total_tokens:
                break
            trimmed_history.insert(1, message)
            total_tokens += message_tokens

        if len(trimmed_history) < len(chat_list):
            removed_count = len(chat_list) - len(trimmed_history)
            logger.debug("Chat history was trimmed: %d messages removed", removed_count)

        user_messages = [msg for msg in trimmed_history if msg.get("role") != "system"]
        if not user_messages:
            logger.warning(
                "Chat history is too short after trimming. "
                "Consider reducing the length of input messages or adjusting the LLM parameters: "
                "'max_context_tokens' and 'max_new_tokens'."
            )

        return trimmed_history

    async def _generate(
        self,
        messages: list[dict[str, str]],
        response_format: str = "text",
    ) -> ChatOutput:
        """General generation function that generates an output given an input text.

        Args:
            messages (list[dict]): A list of dictionaries where each dictionary contains the "role" and the "content".
            response_format (str): Format of the response.

        Returns:
            Generated chat message output.
        """
        try:
            response = await self.llm_client.chat.completions.create(
                model=self.llm.model,
                messages=messages,
                response_format={"type": response_format},
                max_completion_tokens=self.llm.inference.max_new_tokens,
                temperature=self.llm.inference.temperature,
                top_p=self.llm.inference.top_p,
                stream=False,
            )
            logger.debug(f"Response from LLM client: {response}")

        except BadRequestError as e:
            logger.error(f"Invalid request to OpenAI API: {e}")
            raise HTTPException(
                status_code=status.HTTP_400_BAD_REQUEST,
                detail=(
                    "Fehler beim Verarbeiten Ihrer Anfrage durch das Sprachmodell. "
                    "Bitte kürzen Sie Ihren Eingabetext oder versuchen Sie es mit einem anderen Sprachmodell."
                ),
            )

        except (APITimeoutError, httpx.TimeoutException) as e:
            logger.error(f"{self.llm.label} API call timed out: {e}")
            raise HTTPException(
                status_code=status.HTTP_504_GATEWAY_TIMEOUT,
                detail=(
                    "Das verwendete Sprachmodell ist momentan nicht erreichbar "
                    "oder benötigt aufgrund hoher Auslastung länger als üblich, "
                    "um eine Antwort zu generieren. Bitte versuchen Sie es in wenigen Momenten erneut."
                ),
            )

        except Exception as e:
            logger.error(
                f"{self.llm.label} API call of Chat-Completion to LLM failed: {e}"
            )
            raise HTTPException(
                status_code=status.HTTP_502_BAD_GATEWAY,
                detail="Interner Fehler beim Aufruf des Sprachmodells. Bitte versuchen Sie es später erneut.",
            )

        try:
            content: str = response.choices[0].message.content
        except Exception as e:
            logger.error(f"{self.llm.label} chat content not available: {e}")
            raise HTTPException(
                status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
                detail="Fehler beim Verarbeiten der Antwort des Sprachmodells.",
            )

        return ChatOutput(response=content)

    async def _generate_stream(
        self,
        messages: list[dict[str, str]],
        response_format: str = "text",
    ) -> Stream[dict]:
        """Take a list of messages as input and return the stream of a model-generated message as output.

        Args:
            messages (list[dict[str, str]]): Messages as input to the model.
            response_format (str): Format of the response.

        Returns:
            Stream of model-generated messages.
        """
        try:
            response = await self.llm_client.chat.completions.create(
                model=self.llm.model,
                messages=messages,
                response_format={"type": response_format},
                max_completion_tokens=self.llm.inference.max_new_tokens,
                temperature=self.llm.inference.temperature,
                top_p=self.llm.inference.top_p,
                stream=True,
            )

            async for chunk in response:
                yield chunk

        except BadRequestError as e:
            logger.error(f"Invalid request to OpenAI API: {e}")
            yield {
                "type": "error",
                "error": (
                    "Fehler beim Verarbeiten Ihrer Anfrage durch das Sprachmodell. "
                    "Bitte kürzen Sie Ihren Eingabetext oder versuchen Sie es mit einem anderen Sprachmodell."
                ),
            }

        except (APITimeoutError, httpx.TimeoutException) as e:
            logger.error(f"{self.llm.label} API call timed out: {e}")
            yield {
                "type": "error",
                "error": (
                    "Das verwendete Sprachmodell ist momentan nicht erreichbar "
                    "oder benötigt aufgrund hoher Auslastung länger als üblich, "
                    "um eine Antwort zu generieren. Bitte versuchen Sie es in wenigen Momenten erneut."
                ),
            }

        except Exception as e:
            logger.error(f"Error during streaming: {e}")
            yield {
                "type": "error",
                "error": "Es ist ein Fehler aufgetreten. Bitte versuchen Sie es später erneut.",
            }

    def _postprocess_result(self, chat_output: ChatOutput) -> ChatOutput:
        """Postprocess reasoning block of a chat output.

        Args:
            chat_output (ChatOutput): Generated chat output in raw format.

        Returns:
            Post-processed chat output with reasoning excluded,
            reasoning text stored separately in `chat_output.reason`.
        """
        response = chat_output.response

        if self.llm.reasoning_config.is_reasoning_model:
            reasoning_regex = (
                rf"{self.llm.reasoning_config.reasoning_start_marker}"
                r"(.*?)"
                rf"{self.llm.reasoning_config.reasoning_end_marker}"
            )
            match = re.search(reasoning_regex, response, flags=re.DOTALL)
            if match:
                chat_output.reason = match.group(1).strip()
                chat_output.response = re.sub(
                    reasoning_regex, "", response, flags=re.DOTALL
                ).lstrip()
            else:
                chat_output.reason = ""

        return chat_output

    async def _postprocess_stream(
        self,
        stream: Stream[dict],
    ) -> AsyncGenerator[dict]:
        """Postprocesses the raw chat completion stream.

        Splits the model output into separate "reason" and "response" outputs,
        handling multi-token start/end markers and incremental token streaming.

        Args:
            stream (Stream[dict]): Async generator of raw chat chunks.

        Yields:
            Structured stream outputs with keys:
            - 'type': 'reason' or 'response'
            - 'content': partial text content
            - 'finish_reason': optional finish reason from the model
        """
        state = StreamState()

        async for chunk in stream:
            if self._is_error_chunk(chunk):
                yield chunk
                break

            # Extract text and finish_reason from current chunk
            text, finish_reason = self._extract_text_and_finish_reason(chunk)
            logger.debug(f"Text: {text}, Finish Reason: {finish_reason}")

            # finish_reason="length": generation exceeded max completion tokens or max context length.
            if finish_reason == "length":
                yield {
                    "type": "error",
                    "error": (
                        "Ihre Anfrage ist zu lang und überschreitet das Kontextfenster des Modells. "
                        "Bitte kürzen Sie Ihren Eingabetext oder reduzieren Sie die Anzahl der Nachrichten."
                    ),
                }
                break

            # Continue with next chunk if text empty (besides "Stop-Chunk")
            if not text and not finish_reason:
                continue
            elif not text:
                text = ""

            # Stream non-reasoning model:
            if not self.llm.reasoning_config.is_reasoning_model:
                yield self._format_output(
                    output_type="response", content=text, finish_reason=finish_reason
                )
                continue

            # Stream reasoning model:
            async for output in self._handle_reasoning_stream(
                state, text, finish_reason
            ):
                yield output

    def _is_error_chunk(self, chunk: dict) -> bool:
        """Checks if the given stream chunk represents an error.

        Args:
            chunk (dict): A single stream chunk from the model.

        Returns:
            bool: True if the chunk is an error (has type 'error'), False otherwise.
        """
        return isinstance(chunk, dict) and chunk.get("type") == "error"

    def _extract_text_and_finish_reason(
        self, chunk: ChatCompletionChunk
    ) -> tuple[str, str | None]:
        """Extracts content text and finish_reason from a stream chunk.

        Args:
            chunk (ChatCompletionChunk): Raw stream chunk.

        Returns:
            The token text and optional finish_reason.
        """
        if not chunk.choices:
            return None, None

        text = chunk.choices[0].delta.content
        finish_reason = getattr(chunk.choices[0], "finish_reason", None)
        return text, finish_reason

    def _format_output(
        self, output_type: str, content: str, finish_reason: str | None
    ) -> dict:
        """Builds a structured output dictionary.

        Args:
            output_type (str): Either "reason" or "response".
            content (str): Text content for the output.
            finish_reason (str | None): Finish reason if provided by the model.

        Returns:
            Output with keys {"type", "content", "finish_reason"}.
        """
        return {
            "type": output_type,
            "content": content,
            "finish_reason": finish_reason,
        }

    async def _handle_reasoning_stream(
        self,
        state: StreamState,
        text: str,
        finish_reason: str | None,
    ) -> AsyncGenerator[dict]:
        """Handles reasoning-mode streaming for reasoning-enabled models.

        Buffers incoming text, detects reasoning start/end markers,
        and processes content according to the current state mode
        ('idle', 'reason', or 'response').

        Args:
            state (StreamState): The current streaming state, including buffer,
                mode, and accumulated outputs.
            text (str): The incremental text chunk from the model.
            finish_reason (str | None): The optional finish reason if
                provided by the model.

        Yields:
            Structured stream outputs with keys:
            - 'type': 'reason' or 'response'
            - 'content': partial text content
            - 'finish_reason': optional finish reason from the model
        """
        # Always append text to buffer first
        state.buffer += text

        # Detect start of reasoning
        if state.mode == "idle":
            self._check_reasoning_start(
                state=state,
                start_marker=self.llm.reasoning_config.reasoning_start_marker,
            )
            if state.mode == "idle":
                return

        # Process reasoning or response depending on mode
        if state.mode == "reason":
            self._process_reasoning_buffer(
                state=state,
                finish_reason=finish_reason,
                reasoning_end_marker=self.llm.reasoning_config.reasoning_end_marker,
                end_marker_len=len(self.llm.reasoning_config.reasoning_end_marker),
            )
        elif state.mode == "response":
            self._process_response_buffer(
                state=state,
                text=text,
                finish_reason=finish_reason,
            )

        # Stream outputs and remove them from state
        while state.outputs:
            yield state.outputs.pop(0)

    def _check_reasoning_start(
        self,
        state: StreamState,
        start_marker: str,
    ) -> None:
        """Checks and updates the streaming state based on the reasoning start marker.

        Args:
            state (StreamState): The current streaming state.
            start_marker (str): Marker that signals the beginning of reasoning.

        Returns:
            The function modifies `state` in place.
            - `state.mode` will be set to "reason", "response", or remain "idle".
            - `state.outputs` may receive a response chunk if the buffer
            does not start with (a subset of) the marker.
        """
        marker_len = len(start_marker)
        buffer = state.buffer.lstrip("\n\r ")

        if buffer.startswith(start_marker):
            state.buffer = buffer[marker_len:]  # remove marker
            state.mode = "reason"
            return

        for i in range(marker_len - 1, 0, -1):
            if start_marker.startswith(buffer[:i]):
                return

        state.mode = "response"

    def _process_reasoning_buffer(
        self,
        state: StreamState,
        finish_reason: str | None,
        reasoning_end_marker: str,
        end_marker_len: int,
    ) -> None:
        """Processes reasoning text until the end marker is found.

        Streams reasoning chunks incrementally. If the end marker is detected,
        switches the mode to 'response' and emits any remaining buffer content.

        Args:
            state (StreamState): Current stream state.
            finish_reason (str | None): Optional finish reason from the model.
            reasoning_end_marker (str): End marker for reasoning.
            end_marker_len (int): Length of the reasoning end marker.

        Returns:
            The function modifies `state` in place.
        """
        if reasoning_end_marker in state.buffer:
            reason_part, rest = state.buffer.split(reasoning_end_marker, 1)
            if reason_part:
                state.outputs.append(
                    self._format_output("reason", reason_part, finish_reason)
                )
            state.mode = "response"
            state.buffer = rest
            if state.buffer:
                state.outputs.append(
                    self._format_output(
                        "response", state.buffer.lstrip("\n\r "), finish_reason
                    )
                )
                state.first_response_sent = True
            state.buffer = ""
        elif len(state.buffer) > end_marker_len:
            reason_chunk = state.buffer[:-end_marker_len]
            state.outputs.append(
                self._format_output("reason", reason_chunk, finish_reason)
            )
            state.buffer = state.buffer[-end_marker_len:]

    def _process_response_buffer(
        self,
        state: StreamState,
        text: str,
        finish_reason: str | None,
    ) -> None:
        """Processes response tokens after reasoning has finished.

        Emits response outputs, trims leading newlines if it's the first response,
        and clears the buffer.

        Args:
            state (StreamState): Current stream state.
            text (str): New token text to add.
            finish_reason (str | None): Optional finish reason from the model.

        Returns:
            The function modifies `state` in place.
        """
        if not state.first_response_sent:
            text = text.lstrip("\n\r ")
            state.first_response_sent = True
        state.outputs.append(self._format_output("response", text, finish_reason))
        state.buffer = ""

    def _setup_auth_client(self) -> CustomAuthClient:
        """Set up authentication client for various APIs.

        Sets up an authentication client using either a token, credentials or no authentication method.

        Returns:
            Authentication client.
        """
        if self.llm.api.auth:
            auth_client = CustomAuthClient(
                secret=self.llm.api.auth.secret.get_secret_value(),
                auth_type=self.llm.api.auth.type,
                timeout=self.llm.api.timeout,
            )
        else:
            auth_client = CustomAuthClient(timeout=self.llm.api.timeout)

        return auth_client

    def _setup_llm_client(self) -> AsyncOpenAI:
        """Initializing the LLM client using AsnycOpenAI API.

        Returns:
            Asynchronous OpenAI client.
        """
        llm_client = AsyncOpenAI(
            api_key=" ",
            http_client=self.auth_client,
            base_url=str(self.llm.api.url),
        )

        return llm_client
run_chat_completion async
run_chat_completion(chat_input)

Continues a chat history by generating the next assistant message.

PARAMETER DESCRIPTION
chat_input

Chat containing new message and chat history.

TYPE: ChatInput

RETURNS DESCRIPTION
ChatOutput

Generated chat message output.

Source code in docs/microservices/chat/src/chat/chat_completion.py
async def run_chat_completion(self, chat_input: ChatInput) -> ChatOutput:
    """Continues a chat history by generating the next assistant message.

    Args:
        chat_input (ChatInput): Chat containing new message and chat history.

    Returns:
        Generated chat message output.
    """
    messages = self._preprocess_chat_history(chat_input.as_list)
    chat_output_raw = await self._generate(messages, response_format="text")
    chat_output = self._postprocess_result(chat_output=chat_output_raw)

    return chat_output
run_chat_completion_stream async
run_chat_completion_stream(chat_input)

Continues a chat history by generating the next assistant message.

PARAMETER DESCRIPTION
chat_input

Chat containing new message and chat history.

TYPE: ChatInput

RETURNS DESCRIPTION
AsyncGenerator[str]

Generated chat output as stream.

Source code in docs/microservices/chat/src/chat/chat_completion.py
async def run_chat_completion_stream(
    self, chat_input: ChatInput
) -> AsyncGenerator[str]:
    """Continues a chat history by generating the next assistant message.

    Args:
        chat_input (ChatInput): Chat containing new message and chat history.

    Returns:
        Generated chat output as stream.
    """
    messages = self._preprocess_chat_history(chat_input.as_list)
    raw_stream = self._generate_stream(
        messages=messages,
        response_format="text",
    )
    async for processed_chunk in self._postprocess_stream(raw_stream):
        yield ChatStreamOutput(**processed_chunk).model_dump_json() + "\n"
chat_registry

Chat-Registry class for storing and accessing Chat-Providers (OpenAIChatCompletion).

CLASS DESCRIPTION
ChatRegistry

Manages and stores Chat-Providers (OpenAIChatCompletion) and makes access possible.

ChatRegistry

Manages and stores Chat-Providers (OpenAIChatCompletion) and makes access possible.

ATTRIBUTE DESCRIPTION
chat_models

Chat models.

TYPE: dict[str, OpenAIChatCompletion]

llm_config

Model configuration for chat initialzation.

TYPE: LLMConfig

METHOD DESCRIPTION
run_chat_completion

Starts the chat completion of selected language model.

run_chat_completion_json_stream

Runs the chat completion process in json format using the selected language model.

run_chat_completion_text_stream

Runs the chat completion process in text format using the selected language model.

Source code in docs/microservices/chat/src/chat/chat_registry.py
class ChatRegistry:
    """Manages and stores Chat-Providers (OpenAIChatCompletion) and makes access possible.

    Attributes:
        chat_models (dict[str, OpenAIChatCompletion]): Chat models.
        llm_config (LLMConfig): Model configuration for chat initialzation.
    """

    def __init__(self, llm_config: LLMConfig) -> None:
        """Initializes the list of chat models."""
        self.llm_config: LLMConfig = llm_config
        self.chat_models: dict[str, OpenAIChatCompletion] = self._initialize_models()

    def _initialize_models(self) -> dict[str, OpenAIChatCompletion]:
        """Load all available chat models based on custom configuration.

        Returns:
            All model objects with custom configuration.
        """
        models = {}

        for model_name, llm in self.llm_config.chat.items():
            models[model_name] = OpenAIChatCompletion(llm=llm)

        logger.debug(f"Initialized {len(models)} chat models")
        return models

    async def run_chat_completion(
        self, model: OpenAIChatCompletion, chat_input: ChatInput
    ) -> ChatOutput:
        """Starts the chat completion of selected language model.

        Args:
            chat_input (ChatInput): Defines the input to the chat endpoint including the chat message.
            model (OpenAIChatCompletion): Language model to use for chat completion.


        Returns:
            Chat output containing enerated chat message.
        """
        chat_result = await model.run_chat_completion(chat_input)

        logger.info(
            f"Chat completion successfully completed with model: {chat_input.language_model}"
        )

        return chat_result

    async def run_chat_completion_text_stream(
        self,
        model: OpenAIChatCompletion,
        chat_input: ChatInput,
    ) -> AsyncGenerator[str]:
        """Runs the chat completion process in text format using the selected language model.

        Args:
            chat_input (ChatInput): Input to the chat model, including chat history and selected language model.
            model (OpenAIChatCompletion): Language model to use for chat completion.

        Yields:
            The next chunk of generated text and metadata.
        """
        try:
            async for chunk in model.run_chat_completion_stream(chat_input):
                chunk_dict = json.loads(chunk)
                logger.debug(chunk_dict)
                if chunk_dict.get("type") == "response" and chunk_dict.get("content"):
                    yield chunk_dict["content"]
                elif chunk_dict.get("type") == "error" and chunk_dict.get("error"):
                    yield "\nSYSTEM-WARNUNG: " + chunk_dict.get("error")
            logger.info(
                f"Streaming chat completed with model: {chat_input.language_model}"
            )
        except Exception as e:
            logger.error(f"Error while processing chat input with model: {model}: {e}")
            yield "Es ist ein Fehler aufgetreten. Bitte versuchen Sie es später erneut."

    async def run_chat_completion_json_stream(
        self,
        model: OpenAIChatCompletion,
        chat_input: ChatInput,
    ) -> AsyncGenerator[str]:
        """Runs the chat completion process in json format using the selected language model.

        Args:
            chat_input (ChatInput): Input to the chat model, including chat history and selected language model.
            model (OpenAIChatCompletion): Language model to use for chat completion.

        Yields:
            The next chunk of generated text and metadata.
        """
        try:
            async for chunk in model.run_chat_completion_stream(chat_input):
                if chunk:
                    yield chunk
            logger.info(
                f"Streaming chat completed with model: {chat_input.language_model}"
            )
        except Exception as e:
            logger.error(f"Error while processing chat input with model: {model}: {e}")
            yield ChatStreamOutput(
                type="error",
                error="Es ist ein Fehler aufgetreten. Bitte versuchen Sie es später erneut.",
            ).model_dump_json()
run_chat_completion async
run_chat_completion(model, chat_input)

Starts the chat completion of selected language model.

PARAMETER DESCRIPTION
chat_input

Defines the input to the chat endpoint including the chat message.

TYPE: ChatInput

model

Language model to use for chat completion.

TYPE: OpenAIChatCompletion

RETURNS DESCRIPTION
ChatOutput

Chat output containing enerated chat message.

Source code in docs/microservices/chat/src/chat/chat_registry.py
async def run_chat_completion(
    self, model: OpenAIChatCompletion, chat_input: ChatInput
) -> ChatOutput:
    """Starts the chat completion of selected language model.

    Args:
        chat_input (ChatInput): Defines the input to the chat endpoint including the chat message.
        model (OpenAIChatCompletion): Language model to use for chat completion.


    Returns:
        Chat output containing enerated chat message.
    """
    chat_result = await model.run_chat_completion(chat_input)

    logger.info(
        f"Chat completion successfully completed with model: {chat_input.language_model}"
    )

    return chat_result
run_chat_completion_json_stream async
run_chat_completion_json_stream(model, chat_input)

Runs the chat completion process in json format using the selected language model.

PARAMETER DESCRIPTION
chat_input

Input to the chat model, including chat history and selected language model.

TYPE: ChatInput

model

Language model to use for chat completion.

TYPE: OpenAIChatCompletion

YIELDS DESCRIPTION
AsyncGenerator[str]

The next chunk of generated text and metadata.

Source code in docs/microservices/chat/src/chat/chat_registry.py
async def run_chat_completion_json_stream(
    self,
    model: OpenAIChatCompletion,
    chat_input: ChatInput,
) -> AsyncGenerator[str]:
    """Runs the chat completion process in json format using the selected language model.

    Args:
        chat_input (ChatInput): Input to the chat model, including chat history and selected language model.
        model (OpenAIChatCompletion): Language model to use for chat completion.

    Yields:
        The next chunk of generated text and metadata.
    """
    try:
        async for chunk in model.run_chat_completion_stream(chat_input):
            if chunk:
                yield chunk
        logger.info(
            f"Streaming chat completed with model: {chat_input.language_model}"
        )
    except Exception as e:
        logger.error(f"Error while processing chat input with model: {model}: {e}")
        yield ChatStreamOutput(
            type="error",
            error="Es ist ein Fehler aufgetreten. Bitte versuchen Sie es später erneut.",
        ).model_dump_json()
run_chat_completion_text_stream async
run_chat_completion_text_stream(model, chat_input)

Runs the chat completion process in text format using the selected language model.

PARAMETER DESCRIPTION
chat_input

Input to the chat model, including chat history and selected language model.

TYPE: ChatInput

model

Language model to use for chat completion.

TYPE: OpenAIChatCompletion

YIELDS DESCRIPTION
AsyncGenerator[str]

The next chunk of generated text and metadata.

Source code in docs/microservices/chat/src/chat/chat_registry.py
async def run_chat_completion_text_stream(
    self,
    model: OpenAIChatCompletion,
    chat_input: ChatInput,
) -> AsyncGenerator[str]:
    """Runs the chat completion process in text format using the selected language model.

    Args:
        chat_input (ChatInput): Input to the chat model, including chat history and selected language model.
        model (OpenAIChatCompletion): Language model to use for chat completion.

    Yields:
        The next chunk of generated text and metadata.
    """
    try:
        async for chunk in model.run_chat_completion_stream(chat_input):
            chunk_dict = json.loads(chunk)
            logger.debug(chunk_dict)
            if chunk_dict.get("type") == "response" and chunk_dict.get("content"):
                yield chunk_dict["content"]
            elif chunk_dict.get("type") == "error" and chunk_dict.get("error"):
                yield "\nSYSTEM-WARNUNG: " + chunk_dict.get("error")
        logger.info(
            f"Streaming chat completed with model: {chat_input.language_model}"
        )
    except Exception as e:
        logger.error(f"Error while processing chat input with model: {model}: {e}")
        yield "Es ist ein Fehler aufgetreten. Bitte versuchen Sie es später erneut."

endpoints

Endpoints of the chat microservice.

FUNCTION DESCRIPTION
fetch_chat_response

Chat completion endpoint.

fetch_chat_response_json_stream

Chat completion endpoint with json-stream.

fetch_chat_response_text_stream

Chat completion endpoint with text-stream.

get_llms

Return model information of available LLMs.

health

Return a health check message.

fetch_chat_response async
fetch_chat_response(chat_input)

Chat completion endpoint.

PARAMETER DESCRIPTION
chat_input

Input containing the chat message.

TYPE: ChatInput

RETURNS DESCRIPTION
ChatOutput

Output of chat response

Source code in docs/microservices/chat/src/endpoints.py
@router.post(
    "/completion",
    response_model=ChatOutput,
    summary="Chat completion endpoint.",
    description=(
        "Performs response for chat completions.\n\n"
        "The endpoint returns a single JSON response containing the chat output.\n\n"
    ),
    openapi_extra={
        "requestBody": {
            "content": {
                "application/json": {
                    "examples": ChatInput.model_config["json_schema_extra"][
                        "openapi_examples"
                    ],
                }
            },
        }
    },
    responses={
        200: {
            "description": "Successful chat response.",
            "content": {
                "application/json": {
                    "examples": ChatOutput.model_config["json_schema_extra"][
                        "openapi_examples"
                    ],
                },
            },
        },
        400: {
            "description": (
                "Invalid LLM API request, such as using an unsupported model or exceeding the context window."
            )
        },
        500: {"description": "Error processing answer of LLM client."},
        502: {"description": "API call of Chat-Completion to LLM failed."},
        504: {
            "description": "API call of Chat-Completion to LLM failed due to timeout."
        },
    },
)
async def fetch_chat_response(chat_input: ChatInput) -> ChatOutput:
    """Chat completion endpoint.

    Args:
        chat_input (ChatInput): Input containing the chat message.

    Returns:
        Output of chat response
    """
    model = chat_registry.chat_models.get(chat_input.language_model)
    if model is None:
        logger.error(f"Invalid language model selected: {chat_input.language_model}")
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=(
                f"Es wurde ein ungültiges Sprachmodell ausgewählt ({chat_input.language_model})."
                " Bitte versuchen Sie es mit einem anderen Modell."
            ),
        )

    logger.info(f"Chat completion started with model: {chat_input.language_model}")

    return await chat_registry.run_chat_completion(model, chat_input)
fetch_chat_response_json_stream async
fetch_chat_response_json_stream(chat_input)

Chat completion endpoint with json-stream.

PARAMETER DESCRIPTION
chat_input

Input containing the chat message.

TYPE: ChatInput

RETURNS DESCRIPTION
StreamingResponse

Output of chat response.

Source code in docs/microservices/chat/src/endpoints.py
@router.post(
    "/v2/completion/stream",
    response_class=StreamingResponse,
    summary="Chat completion endpoint with x-ndjson-stream.",
    description=(
        "Starts a streaming response for chat completions.\n\n"
        "The endpoint streams messages as NDJSON (`application/x-ndjson`) "
        "with different types: `response`, `reason`, and `error`."
    ),
    openapi_extra={
        "requestBody": {
            "content": {
                "application/json": {
                    "examples": ChatInput.model_config["json_schema_extra"][
                        "openapi_examples"
                    ],
                }
            },
        }
    },
    responses={
        200: {
            "description": "Streaming started successfully.",
            "content": {
                "application/x-ndjson": {
                    "examples": ChatStreamOutput.model_config["json_schema_extra"][
                        "openapi_examples"
                    ],
                },
            },
        },
        400: {"description": "Invalid language model."},
    },
)
async def fetch_chat_response_json_stream(chat_input: ChatInput) -> StreamingResponse:
    """Chat completion endpoint with json-stream.

    Args:
        chat_input (ChatInput): Input containing the chat message.

    Returns:
        Output of chat response.
    """
    model = chat_registry.chat_models.get(chat_input.language_model)
    if model is None:
        logger.error(f"Invalid language model selected: {chat_input.language_model}")
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=(
                f"Es wurde ein ungültiges Sprachmodell ausgewählt ({chat_input.language_model})."
                " Bitte versuchen Sie es mit einem anderen Modell."
            ),
        )

    logger.info(
        f"Streaming chat completion started with model: {chat_input.language_model}"
    )

    return StreamingResponse(
        chat_registry.run_chat_completion_json_stream(model, chat_input),
        media_type="application/x-ndjson",
    )
fetch_chat_response_text_stream async
fetch_chat_response_text_stream(chat_input)

Chat completion endpoint with text-stream.

PARAMETER DESCRIPTION
chat_input

Input containing the chat message.

TYPE: ChatInput

RETURNS DESCRIPTION
StreamingResponse

Output of the chat response.

Source code in docs/microservices/chat/src/endpoints.py
@router.post(
    "/completion/stream",
    response_class=StreamingResponse,
    summary="Chat completion endpoint with text-stream.",
    description=(
        "Starts a streaming response for chat completions.\n\n"
        "The endpoint streams messages as text (`text/event-stream`)."
    ),
    openapi_extra={
        "requestBody": {
            "content": {
                "application/json": {
                    "examples": ChatInput.model_config["json_schema_extra"][
                        "openapi_examples"
                    ],
                }
            },
        }
    },
    responses={
        200: {
            "description": "Streaming started successfully.",
            "content": {
                "text/event-stream": {
                    "schema": {
                        "type": "string",
                        "example": "Hello, how can I help you today?\n\n",
                    },
                    "examples": {
                        "response": {
                            "summary": "Chat response",
                            "description": (
                                "This is the standard output ",
                                "returned by the chat model for a normal request.",
                            ),
                            "value": "Hello, how can I help you today?\n\n",
                        },
                        "context_length_exceeded_during_stream": {
                            "summary": "Context length exceeded during stream",
                            "description": (
                                "This example shows the output when the input exceeds the model's context window."
                            ),
                            "value": (
                                "Ihre Anfrage ist zu lang und überschreitet das Kontextfenster des Modells. "
                                "Bitte kürzen Sie Ihren Eingabetext oder reduzieren Sie die Anzahl der Nachrichten."
                            ),
                        },
                        "bad_request": {
                            "summary": "Invalid request",
                            "description": (
                                "This example shows the output produced when the input exceeds the context window "
                                "or when the request is malformed or lacks required parameters."
                            ),
                            "value": (
                                "Fehler beim Verarbeiten Ihrer Anfrage durch das Sprachmodell. Bitte kürzen Sie "
                                "Ihren Eingabetext oder versuchen Sie es mit einem anderen Sprachmodell."
                            ),
                        },
                        "internal_error": {
                            "summary": "Internal error",
                            "description": (
                                "This example shows the output when an unexpected error occurs during "
                                "streaming.",
                            ),
                            "value": "Es ist ein Fehler aufgetreten. Bitte versuchen Sie es später erneut.",
                        },
                    },
                }
            },
        },
        400: {"description": "Invalid language model."},
    },
)
async def fetch_chat_response_text_stream(chat_input: ChatInput) -> StreamingResponse:
    """Chat completion endpoint with text-stream.

    Args:
        chat_input (ChatInput): Input containing the chat message.

    Returns:
        Output of the chat response.
    """
    model = chat_registry.chat_models.get(chat_input.language_model)
    if model is None:
        logger.error(f"Invalid language model selected: {chat_input.language_model}")
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=(
                f"Es wurde ein ungültiges Sprachmodell ausgewählt ({chat_input.language_model})."
                " Bitte versuchen Sie es mit einem anderen Modell."
            ),
        )

    logger.info(
        f"Streaming chat completion started with model: {chat_input.language_model}"
    )

    return StreamingResponse(
        chat_registry.run_chat_completion_text_stream(model, chat_input),
        media_type="text/event-stream",
    )
get_llms async
get_llms(request)

Return model information of available LLMs.

PARAMETER DESCRIPTION
request

Request-Data.

TYPE: Request

RETURNS DESCRIPTION
list[dict]

The list of available models.

Source code in docs/microservices/chat/src/endpoints.py
@router.get(
    "/llms",
    summary="List available language models.",
    description=("Returns a list of available language models (LLMs).\n\n"),
    responses={
        200: {
            "description": "List of available LLMs.",
            "content": {
                "application/json": {
                    "example": [
                        {
                            "label": "test_model:mock",
                            "is_remote": False,
                            "name": "test_model_mock",
                        },
                    ]
                }
            },
        },
        500: {"description": "Internal server error accessing microservice"},
    },
)
async def get_llms(request: Request) -> list[dict]:
    """Return model information of available LLMs.

    Args:
        request (Request): Request-Data.

    Returns:
        The list of available models.
    """
    app = request.app  # indirectly access the FastAPI app object
    return app.state.available_llms
health async
health()

Return a health check message.

RETURNS DESCRIPTION
dict[str, str]

The health check message as a dictonary.

Source code in docs/microservices/chat/src/endpoints.py
@router.get(
    "/",
    summary="Health check endpoint",
    description=(
        "Returns a simple message indicating that the chat service is running.\n\n"
        "Use this endpoint to verify that the service is alive and responsive."
    ),
    responses={
        200: {
            "description": "Health check successful",
            "content": {"application/json": {"example": {"status": "Chat is running"}}},
        },
        500: {"description": "Internal server error"},
    },
)
@router.get(
    "/health",
    summary="Health check endpoint",
    description=(
        "Returns a simple message indicating that the chat service is running.\n\n"
        "Use this endpoint to verify that the service is alive and responsive."
    ),
    responses={
        200: {
            "description": "Health check successful",
            "content": {"application/json": {"example": {"status": "Chat is running"}}},
        },
        500: {"description": "Internal server error"},
    },
)
async def health() -> dict[str, str]:
    """Return a health check message.

    Returns:
        The health check message as a dictonary.
    """
    return {"message": f"{settings.service_name} is running"}

models

Data model classes for loading and validation API and configuration parameters.

MODULE DESCRIPTION
api_input

pydantic Models for API input parameters.

api_output

pydantic Models for API output parameters.

chat_state

pydantic Models for chat completion parameters.

general

Load and check Settings from yml.

llms

pydantic model for LLM config.

api_input

pydantic Models for API input parameters.

CLASS DESCRIPTION
ChatInput

Model defining the input of a valid chat request.

ChatMessage

Message input model used to store the content of chat messages.

ChatInput

Bases: BaseModel

Model defining the input of a valid chat request.

ATTRIBUTE DESCRIPTION
new_message

The new user message to be processed.

TYPE: ChatMessage

chat_history

List of previous chat messages forming the conversation context.

TYPE: list[ChatMessage]

language_model

The identifier of the language model to use.

TYPE: str

request_timestamp

Timestamp of the request.

TYPE: int | None

Source code in docs/microservices/chat/src/models/api_input.py
class ChatInput(BaseModel):
    """Model defining the input of a valid chat request.

    Attributes:
        new_message (ChatMessage): The new user message to be processed.
        chat_history (list[ChatMessage]): List of previous chat messages forming the conversation context.
        language_model (str): The identifier of the language model to use.
        request_timestamp (int | None): Timestamp of the request.
    """

    new_message: ChatMessage
    chat_history: list[ChatMessage] = []
    language_model: str
    request_timestamp: int | None = Field(
        None,
        description="Unix timestamp indicating when the request was made.",
        deprecated=True,
    )

    @property
    def as_list(self) -> list[dict[str, str]]:
        """Transforms the chat history plus the new message into a list of dictionaries containing the role and message.

        Returns:
            Each dictionary contains keys 'role' and 'content'.
        """
        chat_history_list = [
            {"role": message.role, "content": message.content}
            for message in self.chat_history
        ]
        chat_history_list.append(
            {"role": self.new_message.role, "content": self.new_message.content}
        )
        return chat_history_list

    model_config = ConfigDict(
        json_schema_extra={
            "openapi_examples": {
                "standard": {
                    "summary": "Simple chat input",
                    "description": "Standard input with short chat history.",
                    "value": {
                        "new_message": {
                            "role": "user",
                            "content": "What's the weather like today?",
                        },
                        "chat_history": [
                            {"role": "user", "content": "Hi"},
                            {
                                "role": "assistant",
                                "content": "Hello! How can I help you today?",
                            },
                        ],
                        "language_model": "test_model_mock",
                    },
                }
            }
        }
    )
as_list property
as_list

Transforms the chat history plus the new message into a list of dictionaries containing the role and message.

RETURNS DESCRIPTION
list[dict[str, str]]

Each dictionary contains keys 'role' and 'content'.

ChatMessage

Bases: BaseModel

Message input model used to store the content of chat messages.

ATTRIBUTE DESCRIPTION
content

The textual content of the message.

TYPE: str

role

The role of the message sender. Must be one of "system", "user", "assistant". Defaults to "user".

TYPE: str

Source code in docs/microservices/chat/src/models/api_input.py
class ChatMessage(BaseModel):
    """Message input model used to store the content of chat messages.

    Attributes:
        content (str): The textual content of the message.
        role (str): The role of the message sender. Must be one of "system", "user", "assistant".
            Defaults to "user".
    """

    content: str
    role: Literal["user", "system", "assistant"] = "user"
api_output

pydantic Models for API output parameters.

CLASS DESCRIPTION
ChatOutput

Chat response model of chat output.

ChatStreamOutput

Chat stream response model of chat output.

ChatOutput

Bases: BaseModel

Chat response model of chat output.

ATTRIBUTE DESCRIPTION
response

The generated chat response.

TYPE: str

reason

Optional reasoning or explanation for the response.

TYPE: str | None

Source code in docs/microservices/chat/src/models/api_output.py
class ChatOutput(BaseModel):
    """Chat response model of chat output.

    Attributes:
        response (str): The generated chat response.
        reason (str | None): Optional reasoning or explanation for the response.
    """

    response: str
    reason: str | None = None

    model_config = ConfigDict(
        json_schema_extra={
            "openapi_examples": {
                "simple": {
                    "summary": "Simple chat response",
                    "description": "Used for models that produce a simple response without reasoning.",
                    "value": {
                        "response": "The weather is nice today.",
                    },
                },
                "reasoning": {
                    "summary": "Response with reasoning",
                    "description": "Used for reasoning-enabled models, showing both the answer and its explanation.",
                    "value": {
                        "response": "The weather is nice today.",
                        "reason": "It is sunny outside.",
                    },
                },
            }
        }
    )
ChatStreamOutput

Bases: BaseModel

Chat stream response model of chat output.

ATTRIBUTE DESCRIPTION
type

The kind of output. One of 'reason', 'response', or 'error'.

TYPE: str

content

Partial text content of the stream if type != error.

TYPE: str | None

finish_reason

Optional finish reason from the model.

TYPE: str | None

error

Error message if type == 'error'.

TYPE: str | None

Source code in docs/microservices/chat/src/models/api_output.py
class ChatStreamOutput(BaseModel):
    """Chat stream response model of chat output.

    Attributes:
        type (str): The kind of output. One of 'reason', 'response', or 'error'.
        content (str | None): Partial text content of the stream if type != error.
        finish_reason (str | None): Optional finish reason from the model.
        error (str | None): Error message if type == 'error'.
    """

    type: Literal["reason", "response", "error"]
    content: str | None = None
    finish_reason: str | None = None
    error: str | None = None

    model_config = ConfigDict(
        json_schema_extra={
            "openapi_examples": {
                "response": {
                    "summary": "Chat response",
                    "description": "Standard response returned by the chat model when a normal message is processed.",
                    "value": {
                        "type": "response",
                        "content": "Hello, how can I help you today?",
                        "finish_reason": None,
                        "error": None,
                    },
                },
                "reason": {
                    "summary": "Reason output",
                    "description": "Response including the reasoning or explanation of the model's output.",
                    "value": {
                        "type": "reason",
                        "content": "User said hello. I will answer politely.",
                        "finish_reason": None,
                        "error": None,
                    },
                },
                "context_length_exceeded_during_stream": {
                    "summary": "Context length exceeded during streaming",
                    "description": "Returned when the prompt exceeds the model's context window.",
                    "value": {
                        "type": "error",
                        "content": None,
                        "finish_reason": None,
                        "error": (
                            "Ihre Anfrage ist zu lang und überschreitet das Kontextfenster des Modells. "
                            "Bitte kürzen Sie Ihren Eingabetext oder reduzieren Sie die Anzahl der Nachrichten."
                        ),
                    },
                },
                "bad_request": {
                    "summary": "Invalid request",
                    "description": (
                        "Returned when the input exceeds the context window or the request is malformed "
                        "or has invalid parameters."
                    ),
                    "value": {
                        "type": "error",
                        "content": None,
                        "finish_reason": None,
                        "error": (
                            "Fehler beim Verarbeiten Ihrer Anfrage durch das Sprachmodell. "
                            "Bitte kürzen Sie Ihren Eingabetext oder versuchen Sie es mit einem anderen Sprachmodell."
                        ),
                    },
                },
                "internal_error": {
                    "summary": "Internal error",
                    "description": "Returned when an unexpected error occurs during streaming.",
                    "value": {
                        "type": "error",
                        "content": None,
                        "finish_reason": None,
                        "error": "Es ist ein Fehler aufgetreten. Bitte versuchen Sie es später erneut.",
                    },
                },
            }
        }
    )
chat_state

pydantic Models for chat completion parameters.

CLASS DESCRIPTION
StreamState

Holds the state of a reasoning/response stream.

StreamState

Bases: BaseModel

Holds the state of a reasoning/response stream.

ATTRIBUTE DESCRIPTION
buffer

Temporary storage for incoming text chunks.

TYPE: str

mode

Current mode of the stream. - "idle": Waiting for reasoning to start. - "reason": Currently streaming reasoning tokens. - "response": Currently streaming response tokens.

TYPE: Literal['idle', 'reason', 'response']

first_response_sent

Whether first response output has been sent.

TYPE: bool

outputs

Accumulated structured events from the stream. Each dict contains keys like "type" (reason/response), "content" and "finish_reason".

TYPE: list[dict]

Source code in docs/microservices/chat/src/models/chat_state.py
class StreamState(BaseModel):
    """Holds the state of a reasoning/response stream.

    Attributes:
        buffer (str): Temporary storage for incoming text chunks.
        mode (Literal["idle", "reason", "response"]): Current mode of the stream.
            - "idle": Waiting for reasoning to start.
            - "reason": Currently streaming reasoning tokens.
            - "response": Currently streaming response tokens.
        first_response_sent (bool): Whether first response output has been sent.
        outputs (list[dict]): Accumulated structured events from the stream.
            Each dict contains keys like "type" (reason/response), "content" and "finish_reason".
    """

    buffer: str = ""
    mode: Literal["idle", "reason", "response"] = "idle"
    first_response_sent: bool = False
    outputs: list[dict] = []

    model_config = ConfigDict(arbitrary_types_allowed=True)
general

Load and check Settings from yml.

CLASS DESCRIPTION
ActiveLLMs

Selection of available models for respective use cases.

LogLevel

Enum class specifying possible log levels.

Settings

General Settings for the service.

ActiveLLMs

Bases: BaseModel

Selection of available models for respective use cases.

ATTRIBUTE DESCRIPTION
model_config

Used to ignore other services, which are defined in the config.

TYPE: ConfigDict

chat

List containing available models for chat. It may contain only a subset of all models in llm_models.yml.

TYPE: list[str]

Source code in docs/microservices/chat/src/models/general.py
class ActiveLLMs(BaseModel):
    """Selection of available models for respective use cases.

    Attributes:
        model_config (ConfigDict): Used to ignore other services, which are defined in the config.
        chat (list[str]): List containing available models for chat.
            It may contain only a subset of all models in llm_models.yml.
    """

    # if there are more services defined in the config: just ignore them
    model_config = ConfigDict(extra="ignore")

    chat: list[str]
LogLevel

Bases: StrEnum

Enum class specifying possible log levels.

Source code in docs/microservices/chat/src/models/general.py
class LogLevel(StrEnum):
    """Enum class specifying possible log levels."""

    CRITICAL = "CRITICAL"
    ERROR = "ERROR"
    WARNING = "WARNING"
    INFO = "INFO"
    DEBUG = "DEBUG"

    @classmethod
    def _missing_(cls, value: object) -> None:
        """Convert strings to uppercase and recheck for existance."""
        if isinstance(value, str):
            value = value.upper()
            for level in cls:
                if level == value:
                    return level
        return None
Settings

Bases: BaseModel

General Settings for the service.

ATTRIBUTE DESCRIPTION
model_config

Used to ignore other services, which are defined in the config.

TYPE: ConfigDict

service_name

Name of service, i.e. 'chat'

TYPE: str

active_llms

Selection of available models for respective use cases.

TYPE: ActiveLLMs

log_level

Minimal level of logging output given.

TYPE: LogLevel

log_file_max_bytes

(PositiveInt): Max file size for logfile

TYPE: PositiveInt

log_file_backup_count

Number of log-files to loop over

TYPE: PositiveInt

log_file

Write logfile there.

TYPE: FilePath

check_llm_api_interval_in_s

Interval for checking all LLM APIs (seconds)

TYPE: PositiveInt

n_uvicorn_workers

Number of parallel uvicorn instances.

TYPE: PositiveInt

METHOD DESCRIPTION
ensure_log_dir

Create the log directory after validation.

Source code in docs/microservices/chat/src/models/general.py
class Settings(BaseModel):
    """General Settings for the service.

    Attributes:
        model_config (ConfigDict): Used to ignore other services, which are defined in the config.
        service_name (str): Name of service, i.e. 'chat'
        active_llms (ActiveLLMs): Selection of available models for respective use cases.
        log_level (LogLevel): Minimal level of logging output given.
        log_file_max_bytes: (PositiveInt): Max file size for logfile
        log_file_backup_count (PositiveInt): Number of log-files to loop over
        log_file (FilePath): Write logfile there.
        check_llm_api_interval_in_s (PositiveInt): Interval for checking all LLM APIs (seconds)
        n_uvicorn_workers (PositiveInt): Number of parallel uvicorn instances.
    """

    model_config = ConfigDict(extra="ignore")

    service_name: str = "Chat"
    service_descripton: str = "Generation of chat completions using various LLMs."

    n_uvicorn_workers: PositiveInt = 1

    active_llms: ActiveLLMs

    log_level: LogLevel = LogLevel.INFO
    log_file_max_bytes: PositiveInt = 1 * 1024 * 1024
    log_file_backup_count: PositiveInt = 3
    log_file: FilePath = Path("/chat/logs/log")

    # interval for checking all LLM APIs (seconds)
    check_llm_api_interval_in_s: PositiveInt = 120

    @model_validator(mode="after")
    def ensure_log_dir(self) -> "Settings":
        """Create the log directory after validation."""
        self.log_file.parent.mkdir(parents=True, exist_ok=True)
        return self
ensure_log_dir
ensure_log_dir()

Create the log directory after validation.

Source code in docs/microservices/chat/src/models/general.py
@model_validator(mode="after")
def ensure_log_dir(self) -> "Settings":
    """Create the log directory after validation."""
    self.log_file.parent.mkdir(parents=True, exist_ok=True)
    return self
llms

pydantic model for LLM config.

CLASS DESCRIPTION
APIAuth

Defines Authentification settings for LLM.

LLM

This pydantic class defines the basic structure of a LLM config.

LLMAPI

Defines API-Connection to LLM.

LLMConfig

Base class as loaded from model_configs.yml.

LLMInference

Defines Inference parameters.

LLMPromptConfig

Defines the structure of a LLM prompt configuration.

LLMPromptMaps

Defines complete LLM prompt config.

LLMPrompts

Defines the selectable LLM Prompts.

ReasoningConfig

Configuration for reasoning-capable models.

APIAuth

Bases: BaseModel

Defines Authentification settings for LLM.

ATTRIBUTE DESCRIPTION
type

Either 'token' or 'basic_auth'.

TYPE: Literal

secret_path

File path where the api token or credentials are stored.

TYPE: FilePath

METHOD DESCRIPTION
get_auth_header

Generate auth part of header for http request.

Source code in docs/microservices/chat/src/models/llms.py
class APIAuth(BaseModel):
    """Defines Authentification settings for LLM.

    Attributes:
        type (Literal): Either 'token' or 'basic_auth'.
        secret_path (FilePath): File path where the api token or credentials are stored.
    """

    type: Literal["token", "basic_auth"]
    secret_path: FilePath

    @property
    def secret(self) -> SecretStr:
        """Load secret variable as 'secret'."""
        with open(self.secret_path) as file:
            return SecretStr(file.read().strip())

    def get_auth_header(self) -> str:
        """Generate auth part of header for http request.

        Returns:
            The auth header.
        """
        auth_header = ""

        if self.type == "basic_auth":
            auth_header = f"Basic {base64.b64encode(self.secret.get_secret_value().encode()).decode()}"
        elif self.type == "token":
            auth_header = f"Bearer {self.secret.get_secret_value()}"

        return auth_header
secret property
secret

Load secret variable as 'secret'.

get_auth_header
get_auth_header()

Generate auth part of header for http request.

RETURNS DESCRIPTION
str

The auth header.

Source code in docs/microservices/chat/src/models/llms.py
def get_auth_header(self) -> str:
    """Generate auth part of header for http request.

    Returns:
        The auth header.
    """
    auth_header = ""

    if self.type == "basic_auth":
        auth_header = f"Basic {base64.b64encode(self.secret.get_secret_value().encode()).decode()}"
    elif self.type == "token":
        auth_header = f"Bearer {self.secret.get_secret_value()}"

    return auth_header
LLM

Bases: BaseModel

This pydantic class defines the basic structure of a LLM config.

ATTRIBUTE DESCRIPTION
label

Human-readable model name that can be presented to users.

TYPE: str

model

Model name which is used in API call, e.g. ollama tag.

TYPE: str

prompt_map

Prompt map name to load LLMPromptMaps from.

TYPE: str

is_remote

Is this LLM hosted at an external API?

TYPE: bool

max_context_tokens

Total chat-history length for chat completion.

TYPE: int

character_to_token

Factor to convert character count into approximate token count.

TYPE: float

api

API information.

TYPE: LLMAPI

inference

Inference parameters.

TYPE: LLMInference

prompt_config

Prompts.

TYPE: LLMPromptConfig

reasoning_config

Reasoning configuration.

TYPE: ReasoningConfig

Source code in docs/microservices/chat/src/models/llms.py
class LLM(BaseModel):
    """This pydantic class defines the basic structure of a LLM config.

    Attributes:
        label (str): Human-readable model name that can be presented to users.
        model (str): Model name which is used in API call, e.g. ollama tag.
        prompt_map (str): Prompt map name to load LLMPromptMaps from.
        is_remote (bool): Is this LLM hosted at an external API?
        max_context_tokens (int): Total chat-history length for chat completion.
        character_to_token (float): Factor to convert character count into approximate token count.
        api (LLMAPI): API information.
        inference (LLMInference): Inference parameters.
        prompt_config (LLMPromptConfig): Prompts.
        reasoning_config (ReasoningConfig): Reasoning configuration.
    """

    label: str
    model: str
    prompt_map: str
    is_remote: bool
    max_context_tokens: int = 6144
    character_to_token: float = 4
    api: LLMAPI
    inference: LLMInference
    prompt_config: LLMPromptConfig = None
    reasoning_config: ReasoningConfig = ReasoningConfig()
LLMAPI

Bases: BaseModel

Defines API-Connection to LLM.

ATTRIBUTE DESCRIPTION
url

URL to model.

TYPE: AnyHttpUrl

timeout

Timout of Httpx Authentication Client.

TYPE: float

health_check

Relative path to health check, i.e. '/models'

TYPE: str | None

auth

Authentification settings for LLM

TYPE: APIAuth | None

METHOD DESCRIPTION
get_health_check_url

Get the URL to check if API is available.

Source code in docs/microservices/chat/src/models/llms.py
class LLMAPI(BaseModel):
    """Defines API-Connection to LLM.

    Attributes:
        url (AnyHttpUrl): URL to model.
        timeout (float): Timout of Httpx Authentication Client.
        health_check (str | None): Relative path to health check, i.e. '/models'
        auth (APIAuth | None): Authentification settings for LLM
    """

    url: AnyHttpUrl
    timeout: float = 10
    health_check: str | None = None
    auth: APIAuth | None = None

    def get_health_check_url(self) -> str:
        """Get the URL to check if API is available."""
        if self.health_check:
            # make sure to remove trailing and leading slashes to not override path
            return urljoin(
                str(self.url).rstrip("/") + "/",
                self.health_check.lstrip("/"),
            )
        return str(self.url)
get_health_check_url
get_health_check_url()

Get the URL to check if API is available.

Source code in docs/microservices/chat/src/models/llms.py
def get_health_check_url(self) -> str:
    """Get the URL to check if API is available."""
    if self.health_check:
        # make sure to remove trailing and leading slashes to not override path
        return urljoin(
            str(self.url).rstrip("/") + "/",
            self.health_check.lstrip("/"),
        )
    return str(self.url)
LLMConfig

Bases: BaseModel

Base class as loaded from model_configs.yml.

ATTRIBUTE DESCRIPTION
model_config

Used to ignore other services, which are defined in the config.

TYPE: ConfigDict

chat

Dictionary containing a name and definition of LLMs's available for chat.

TYPE: dict[str, LLM]

Source code in docs/microservices/chat/src/models/llms.py
class LLMConfig(BaseModel):
    """Base class as loaded from model_configs.yml.

    Attributes:
        model_config (ConfigDict): Used to ignore other services, which are defined in the config.
        chat (dict[str, LLM]): Dictionary containing a name and definition of LLMs's available for chat.
    """

    # if there are more services defined in the config: just ignore them
    model_config = ConfigDict(extra="ignore")

    chat: dict[str, LLM]

    def __iter__(self) -> Iterator[str]:
        """Get 'keys' for automatic merge with i.e. LLMPromptConfig."""
        return iter(self.__dict__.keys())

    def __getitem__(self, service: str) -> dict[str, LLM]:
        """Get all LLMs for a given service (e.g. "chat", "rag").

        Args:
            service (str): The service name (e.g., "chat", "rag").

        Returns:
            All configered LLMs for the given service.
        """
        return self.__getattribute__(service)
LLMInference

Bases: BaseModel

Defines Inference parameters.

ATTRIBUTE DESCRIPTION
temperature

Randomness / variation of the output High values indicate more creativity.

TYPE: PositiveFloat | None

max_new_tokens

Maximum number of tokens of the generated response.

TYPE: PositiveInt | None

top_p

Threshold for sampling only from the most likely tokens.

TYPE: PositiveFloat | None

Source code in docs/microservices/chat/src/models/llms.py
class LLMInference(BaseModel):
    """Defines Inference parameters.

    Attributes:
        temperature (PositiveFloat | None): Randomness / variation of the output High values indicate more creativity.
        max_new_tokens (PositiveInt | None): Maximum number of tokens of the generated response.
        top_p (PositiveFloat | None): Threshold for sampling only from the most likely tokens.
    """

    temperature: PositiveFloat | None = 0.7
    max_new_tokens: PositiveInt | None = 2048
    top_p: PositiveFloat | None = 0.9
LLMPromptConfig

Bases: BaseModel

Defines the structure of a LLM prompt configuration.

ATTRIBUTE DESCRIPTION
model_config

Used to ignore other services, which are defined in the config.

TYPE: ConfigDict

system

System prompt.

TYPE: LLMPrompts

user

User prompt.

TYPE: LLMPrompts | None

assistant

Assistant prompt.

TYPE: LLMPrompts | None

Source code in docs/microservices/chat/src/models/llms.py
class LLMPromptConfig(BaseModel):
    """Defines the structure of a LLM prompt configuration.

    Attributes:
        model_config (ConfigDict): Used to ignore other services, which are defined in the config.
        system (LLMPrompts): System prompt.
        user (LLMPrompts | None): User prompt.
        assistant (LLMPrompts | None): Assistant prompt.
    """

    # if there are more prompt types defined that are not used in this service: just ignore them
    model_config = ConfigDict(extra="ignore")

    system: LLMPrompts
    user: LLMPrompts | None = None
    assistant: LLMPrompts | None = None
LLMPromptMaps

Bases: BaseModel

Defines complete LLM prompt config.

ATTRIBUTE DESCRIPTION
model_config

Used to ignore other services, which are defined in the config.

TYPE: ConfigDict

chat

Dictionary containing a name and prompts of LLMs's available for chat.

TYPE: dict[str, LLMPromptConfig]

Source code in docs/microservices/chat/src/models/llms.py
class LLMPromptMaps(BaseModel):
    """Defines complete LLM prompt config.

    Attributes:
        model_config (ConfigDict): Used to ignore other services, which are defined in the config.
        chat (dict[str, LLMPromptConfig]): Dictionary containing a name and prompts of LLMs's available for chat.

    """

    model_config = ConfigDict(extra="ignore")

    chat: dict[str, LLMPromptConfig]

    def __iter__(self) -> Iterator[str]:
        """Get 'keys' for automatic merge with i.e. LLMConfig."""
        return iter(self.__dict__.keys())
LLMPrompts

Bases: BaseModel

Defines the selectable LLM Prompts.

ATTRIBUTE DESCRIPTION
model_config

Used to ignore other services, which are defined in the config.

TYPE: ConfigDict

generate

Prompt for model.

TYPE: str

Source code in docs/microservices/chat/src/models/llms.py
class LLMPrompts(BaseModel):
    """Defines the selectable LLM Prompts.

    Attributes:
        model_config (ConfigDict): Used to ignore other services, which are defined in the config.
        generate (str): Prompt for model.
    """

    # if there are more prompts defined that are not used in this service: just ignore them
    model_config = ConfigDict(extra="ignore")

    generate: str = ""
ReasoningConfig

Bases: BaseModel

Configuration for reasoning-capable models.

ATTRIBUTE DESCRIPTION
is_reasoning_model

Whether this model supports reasoning streams.

TYPE: bool

reasoning_start_marker

Start marker for the reasoning section.

TYPE: str | None

reasoning_end_marker

End marker for the reasoning section.

TYPE: str | None

Source code in docs/microservices/chat/src/models/llms.py
class ReasoningConfig(BaseModel):
    """Configuration for reasoning-capable models.

    Attributes:
        is_reasoning_model (bool): Whether this model supports reasoning streams.
        reasoning_start_marker (str | None): Start marker for the reasoning section.
        reasoning_end_marker (str | None): End marker for the reasoning section.
    """

    is_reasoning_model: bool = False
    reasoning_start_marker: None | str = None
    reasoning_end_marker: None | str = None

openai_custom_auth

Costumized Httpx Authentication Client.

CLASS DESCRIPTION
CustomAuthClient

Custom HTTP transport for OpenAI client.

CustomAuthClient

Bases: AsyncClient

Custom HTTP transport for OpenAI client.

This class supports both Bearer Token Authentication and Basic Authentication. If auth_type is 'token', the secret is expected to be the API key. If auth_type is 'basic_auth', the secret is expected to be a base64-encoded string of 'username:password'.

ATTRIBUTE DESCRIPTION
auth_header

Authentication header for the httpx client.

TYPE: str

METHOD DESCRIPTION
a_send

Asynchronous method for sending HTTP requests.

send

Synchronous method for sending HTTP requests.

Source code in docs/microservices/chat/src/openai_custom_auth.py
class CustomAuthClient(httpx.AsyncClient):
    """Custom HTTP transport for OpenAI client.

    This class supports both Bearer Token Authentication and Basic Authentication.
    If `auth_type` is 'token', the `secret` is expected to be the API key.
    If `auth_type` is 'basic_auth', the `secret` is expected to be a base64-encoded string of 'username:password'.

    Attributes:
        auth_header (str): Authentication header for the httpx client.

    Methods:
        a_send(request, *args, **kwargs): Asynchronous method for sending HTTP requests.
        send(request, *args, **kwargs): Synchronous method for sending HTTP requests.
    """

    def __init__(
        self,
        secret: str | None = None,
        auth_type: Literal["token", "basic_auth"] | None = None,
        *args: object,
        **kwargs: object,
    ) -> None:
        """Initialize the custom HTTP transport for OpenAI client.

        Args:
            secret (str, optional): OpenAI API Key or Basic Auth credentials (username:password).
                                     This is required depending on the `auth_type`. If `auth_type`
                                     is 'token', the `secret` should be the API key. If
                                     `auth_type` is 'basic_auth', the `secret` should be a
                                     base64-encoded string of 'username:password'.
            auth_type (str, optional): The type of authentication to use. It can be 'token' or 'basic_auth'.
            *args: Variable length argument list.
            **kwargs: Arbitrary keyword arguments.

        Raises:
            ValueError: If `auth_type` is provided but `secret` is not provided.
        """
        super().__init__(*args, **kwargs)

        self.auth_header = ""

        if auth_type and not secret:
            raise ValueError("API credentials are required but missing.")

        if auth_type == "token":
            self.auth_header = f"Bearer {secret}"

        elif auth_type == "basic_auth":
            encoded_credentials = base64.b64encode(secret.encode()).decode()
            self.auth_header = f"Basic {encoded_credentials}"

    async def a_send(
        self,
        request: httpx.Request,
        *args: object,
        **kwargs: object,
    ) -> httpx.Response:
        """Asynchronous version of the send method to handle requests asynchronously."""
        if "Authorization" in request.headers:
            del request.headers["Authorization"]
        if self.auth_header:
            request.headers["Authorization"] = self.auth_header
        return await super().a_send(request, *args, **kwargs)

    def send(
        self,
        request: httpx.Request,
        *args: object,
        **kwargs: object,
    ) -> httpx.Response:
        """Version of the send method to handle requests asynchronously."""
        if "Authorization" in request.headers:
            del request.headers["Authorization"]
        if self.auth_header:
            request.headers["Authorization"] = self.auth_header
        return super().send(request, *args, **kwargs)
a_send async
a_send(request, *args, **kwargs)

Asynchronous version of the send method to handle requests asynchronously.

Source code in docs/microservices/chat/src/openai_custom_auth.py
async def a_send(
    self,
    request: httpx.Request,
    *args: object,
    **kwargs: object,
) -> httpx.Response:
    """Asynchronous version of the send method to handle requests asynchronously."""
    if "Authorization" in request.headers:
        del request.headers["Authorization"]
    if self.auth_header:
        request.headers["Authorization"] = self.auth_header
    return await super().a_send(request, *args, **kwargs)
send
send(request, *args, **kwargs)

Version of the send method to handle requests asynchronously.

Source code in docs/microservices/chat/src/openai_custom_auth.py
def send(
    self,
    request: httpx.Request,
    *args: object,
    **kwargs: object,
) -> httpx.Response:
    """Version of the send method to handle requests asynchronously."""
    if "Authorization" in request.headers:
        del request.headers["Authorization"]
    if self.auth_header:
        request.headers["Authorization"] = self.auth_header
    return super().send(request, *args, **kwargs)

settings

Load all settings from a central place, not hidden in utils.

utils

Utils functions for logging, LLM availability check and configuration processing.

MODULE DESCRIPTION
base_logger

Set up the root logger for the entire application. This logger will log messages to the console and a file.

check_model_api_availability

This module provides functions to check LLM-APIs for availability.

process_configs

Methods to load and config and start checks of config integrity.

base_logger

Set up the root logger for the entire application. This logger will log messages to the console and a file.

FUNCTION DESCRIPTION
setup_logger

Initialize the logger with the desired log level and add handlers.

setup_logger
setup_logger()

Initialize the logger with the desired log level and add handlers.

Sets up the root logger, which all other loggers inherit from. Adds file, console and exit handlers to the logger and sets the format.

Source code in docs/microservices/chat/src/utils/base_logger.py
def setup_logger() -> None:
    """Initialize the logger with the desired log level and add handlers.

    Sets up the root logger, which all other loggers inherit from.
    Adds file, console and exit handlers to the logger and sets the format.
    """
    # root logger, all other loggers inherit from this
    logger = logging.getLogger()

    # create different handlers for log file and console
    file_handler = logging.handlers.RotatingFileHandler(
        filename=settings.log_file,
        maxBytes=settings.log_file_max_bytes,
        backupCount=settings.log_file_backup_count,
    )
    console_handler = logging.StreamHandler()

    # define log format and set for each handler
    formatter = logging.Formatter(
        fmt="%(asctime)s - %(levelname)8s - %(module)s - %(funcName)s: %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S%z",
    )
    file_handler.setFormatter(formatter)
    console_handler.setFormatter(formatter)

    # add handlers to the logger
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)

    logger.setLevel(settings.log_level)
check_model_api_availability

This module provides functions to check LLM-APIs for availability.

To check a certain LLM use await check_model_api(llm). To get all LLMs that are activated in configs/general.yml, use await get_available_llms().

FUNCTION DESCRIPTION
get_available_llms

Returns a list of available LLMs.

is_model_api_available

Check if API is available using credentials.

get_available_llms async
get_available_llms()

Returns a list of available LLMs.

RETURNS DESCRIPTION
list[dict[str, str]]

List of available LLMs with selected infos.

Source code in docs/microservices/chat/src/utils/check_model_api_availability.py
async def get_available_llms() -> list[dict[str, str]]:
    """Returns a list of available LLMs.

    Returns:
        List of available LLMs with selected infos.
    """
    available_llms = []

    # iterate over model_groups (services), i.e. chat, RAG, embedding, ...
    for model_group_key in llm_config:
        logger.debug(f"Checking APIs for {model_group_key}-LLMs.")
        model_group = llm_config[model_group_key]

        for llm_name, llm in model_group.items():
            logger.debug(f"Checking availability of {llm_name}")
            if await is_model_api_available(llm.api, llm_name):
                llm_dict = llm.model_dump(include=["label", "is_remote"])
                llm_dict["name"] = llm_name

                available_llms.append(llm_dict)

    return available_llms
is_model_api_available async
is_model_api_available(llm_api, llm_name, timeout_in_s=10)

Check if API is available using credentials.

Availability is checked by sending a HEAD, GET, or POST request. If a health_check endpoint is provided, the request is sent to that endpoint; otherwise, it is sent to the main API URL.

PARAMETER DESCRIPTION
llm_api

the LLMAPI instance to check

TYPE: LLMAPI

llm_name

ID of the LLM as used in the config file as reference

TYPE: str

timeout_in_s

http timeout in seconds; defaults to 10

TYPE: int DEFAULT: 10

RETURNS DESCRIPTION
bool

Whether the model API is available or not - True if the API is available.

Source code in docs/microservices/chat/src/utils/check_model_api_availability.py
async def is_model_api_available(
    llm_api: LLMAPI,
    llm_name: str,
    timeout_in_s: int = 10,
) -> bool:
    """Check if API is available using credentials.

    Availability is checked by sending a HEAD, GET, or POST request. If a health_check endpoint is provided,
    the request is sent to that endpoint; otherwise, it is sent to the main API URL.

    Args:
        llm_api (LLMAPI): the LLMAPI instance to check
        llm_name (str): ID of the LLM as used in the config file as reference
        timeout_in_s (int): http timeout in seconds; defaults to 10

    Returns:
        Whether the model API is available or not - True if the API is available.
    """
    headers = {"Content-type": "application/json"}

    # Authorization is not always needed
    if llm_api.auth:
        headers["Authorization"] = llm_api.auth.get_auth_header()

    url = llm_api.get_health_check_url()

    # test health check endpoint with GET, HEAD and POST
    try:
        async with httpx.AsyncClient() as client:
            response = await client.get(
                url,
                headers=headers,
                timeout=timeout_in_s,
            )
        logger.debug(
            f"{url} health check via GET request: {response.status_code=}, LLM: '{llm_name}"
        )

        # test with HEAD
        if response.status_code != HTTPStatus.OK:
            async with httpx.AsyncClient() as client:
                response = await client.head(
                    url,
                    headers=headers,
                    timeout=timeout_in_s,
                )
            logger.debug(
                f"{url} health check via HEAD request: {response.status_code=}, LLM: '{llm_name}"
            )

        # test with POST
        if response.status_code != HTTPStatus.OK:
            async with httpx.AsyncClient() as client:
                response = await client.post(
                    url,
                    headers=headers,
                    timeout=timeout_in_s,
                )
            logger.debug(
                f"{url} health check via POST request: {response.status_code=}, LLM: '{llm_name}"
            )

    except Exception as e:
        logger.warning(
            f"Exception when trying to reach LLM API. Error: {e}, LLM: '{llm_name}"
        )
        return False

    if response.status_code != HTTPStatus.OK:
        logger.warning(
            f"LLM unavailable: Could not establish connection to LLM-API. LLM: '{llm_name}"
        )

    return response.status_code == HTTPStatus.OK
process_configs

Methods to load and config and start checks of config integrity.

FUNCTION DESCRIPTION
load_all_configs

Load config settings from respective paths.

load_from_yml_in_pydantic_model

Load config from 'list_of_yaml_paths' into given pydantic-Model.

load_yaml

Load yaml.

merge_specific_cfgs_in_place

Copy Prompt-config to apropriate section in general llm_config. Edit in-place!

postprocess_configs

Post-Process loaded configs.

remove_unactive_models

Remove models from all useacases, if they are not in 'active_models'. Edit in-place!

load_all_configs
load_all_configs(general_config_paths, path_to_llm_prompts, path_to_llm_model_configs)

Load config settings from respective paths.

PARAMETER DESCRIPTION
general_config_paths

Path to config, matching 'Settings'

TYPE: Path

path_to_llm_prompts

Path to config, matching 'LLMPromptMaps'

TYPE: Path

path_to_llm_model_configs

Path to config, matching 'LLMConfig'

TYPE: Path

RETURNS DESCRIPTION
tuple[Settings, LLMConfig]

Config loaded into their Pydantic Model.

Source code in docs/microservices/chat/src/utils/process_configs.py
def load_all_configs(
    general_config_paths: Path,
    path_to_llm_prompts: Path,
    path_to_llm_model_configs: Path,
) -> tuple[Settings, LLMConfig]:
    """Load config settings from respective paths.

    Args:
        general_config_paths (Path): Path to config, matching 'Settings'
        path_to_llm_prompts (Path): Path to config, matching 'LLMPromptMaps'
        path_to_llm_model_configs (Path): Path to config, matching 'LLMConfig'

    Returns:
        Config loaded into their Pydantic Model.

    """
    settings = load_from_yml_in_pydantic_model(general_config_paths, Settings)
    llm_prompts = load_from_yml_in_pydantic_model(path_to_llm_prompts, LLMPromptMaps)
    llm_config = load_from_yml_in_pydantic_model(path_to_llm_model_configs, LLMConfig)

    postprocess_configs(settings, llm_prompts, llm_config)

    return settings, llm_config
load_from_yml_in_pydantic_model
load_from_yml_in_pydantic_model(yaml_path, pydantic_reference_model)

Load config from 'list_of_yaml_paths' into given pydantic-Model.

PARAMETER DESCRIPTION
yaml_path

Yaml to load

TYPE: Path

pydantic_reference_model

pydantic model to load yaml into

TYPE: BaseModel

RETURNS DESCRIPTION
BaseModel

BaseModel derived pydantic data class.

Source code in docs/microservices/chat/src/utils/process_configs.py
def load_from_yml_in_pydantic_model(
    yaml_path: Path, pydantic_reference_model: BaseModel
) -> BaseModel:
    """Load config from 'list_of_yaml_paths' into given pydantic-Model.

    Args:
        yaml_path (Path): Yaml to load
        pydantic_reference_model (BaseModel): pydantic model to load yaml into

    Returns:
        BaseModel derived pydantic data class.

    """
    data = load_yaml(yaml_path)

    try:
        pydantic_class = pydantic_reference_model(**data)
        logger.info(f"Config loaded from: '{yaml_path}'")
        return pydantic_class

    except ValidationError as e:
        logger.critical(f"Error loading config: '{e}'")
        raise e
load_yaml
load_yaml(yaml_path)

Load yaml.

PARAMETER DESCRIPTION
yaml_path

Path to yaml

TYPE: list[Path]

RETURNS DESCRIPTION
dict[str, Any]

Content of the loaded yaml.

Source code in docs/microservices/chat/src/utils/process_configs.py
def load_yaml(yaml_path: Path) -> dict[str, Any]:
    """Load yaml.

    Args:
        yaml_path (list[Path]): Path to yaml

    Returns:
        Content of the loaded yaml.
    """
    if not yaml_path.exists():
        logger.error(f"Invalid path: '{yaml_path}'")
        raise FileNotFoundError

    with open(yaml_path) as file:
        return yaml.safe_load(file)
merge_specific_cfgs_in_place
merge_specific_cfgs_in_place(llm_config, llm_prompts)

Copy Prompt-config to apropriate section in general llm_config. Edit in-place!

Only if 'prompt_map' in LLMConfig can be found in LLMPromptMaps, it will be merged. i.e. try to generalize sth. like this:

cfg["phi3:mini"].prompts = prompt[cfg["phi3:mini"].prompt_map]

PARAMETER DESCRIPTION
llm_config

Target for merge of Prompt parameter

TYPE: LLMConfig

llm_prompts

Source to merge Prompt parameter from

TYPE: LLMPromptMaps

RETURNS DESCRIPTION
bool

True if no problems occurred.

Source code in docs/microservices/chat/src/utils/process_configs.py
def merge_specific_cfgs_in_place(
    llm_config: LLMConfig, llm_prompts: LLMPromptMaps
) -> bool:
    """Copy Prompt-config to apropriate section in general llm_config. Edit in-place!

    Only if 'prompt_map' in LLMConfig can be found in LLMPromptMaps, it will be merged.
    i.e. try to generalize sth. like this:

    cfg["phi3:mini"].prompts = prompt[cfg["phi3:mini"].prompt_map]

    Args:
        llm_config (LLMConfig): Target for merge of Prompt parameter
        llm_prompts (LLMPromptMaps): Source to merge Prompt parameter from

    Returns:
        True if no problems occurred.
    """
    no_issues_occurred = True
    for usecase in llm_config:
        # load identical usecases, i.e. chat, RAG
        try:
            cfg = getattr(llm_config, usecase)
            prompt = getattr(llm_prompts, usecase)
        except AttributeError:
            logger.warning(
                f"Usecase '{usecase}' not matching between prompt- and general llm config. \
                    Skipping cfg-merge for '{usecase}' .."
            )
            no_issues_occurred = False
            continue

        # copy prompt config to its usecase- and model-counterpart
        for model in cfg:
            prompt_map_to_use = cfg[model].prompt_map
            if prompt_map_to_use in prompt:
                cfg[model].prompt_config = prompt[prompt_map_to_use]
            else:
                logger.warning(
                    f"'prompt_map: {prompt_map_to_use}' from LLM-config not in prompt-config for '{usecase}'. \
                        Skipping .."
                )
                no_issues_occurred = False
                continue

    return no_issues_occurred
postprocess_configs
postprocess_configs(settings, llm_prompts, llm_config)

Post-Process loaded configs.

Remove unused models (from settings.active_llms), merge LLMPromptMaps into LLMConfig.

PARAMETER DESCRIPTION
settings

Config matching pydantic 'Settings'.

TYPE: Settings

llm_prompts

Config matching pydantic 'LLMPromptMaps'.

TYPE: LLMPromptMaps

llm_config

Config matching pydantic 'LLMConfig'.

TYPE: LLMConfig

RETURNS DESCRIPTION
LLMConfig

Merged and filtered LLM configuration.

Source code in docs/microservices/chat/src/utils/process_configs.py
def postprocess_configs(
    settings: Settings, llm_prompts: LLMPromptMaps, llm_config: LLMConfig
) -> LLMConfig:
    """Post-Process loaded configs.

    Remove unused models (from settings.active_llms), merge LLMPromptMaps into LLMConfig.

    Args:
        settings (Settings): Config matching pydantic 'Settings'.
        llm_prompts (LLMPromptMaps): Config matching pydantic 'LLMPromptMaps'.
        llm_config (LLMConfig): Config matching pydantic 'LLMConfig'.

    Returns:
        Merged and filtered LLM configuration.
    """
    remove_unactive_models(llm_config, settings.active_llms)
    merge_specific_cfgs_in_place(llm_config, llm_prompts)

    return llm_config
remove_unactive_models
remove_unactive_models(input_config, active_models)

Remove models from all useacases, if they are not in 'active_models'. Edit in-place!

PARAMETER DESCRIPTION
input_config

Config to change

TYPE: LLMConfig

active_models

Models to keep - remove other

TYPE: list[str]

Source code in docs/microservices/chat/src/utils/process_configs.py
def remove_unactive_models(input_config: LLMConfig, active_models: list[str]) -> None:
    """Remove models from all useacases, if they are not in 'active_models'. Edit in-place!

    Args:
        input_config (LLMConfig): Config to change
        active_models (list[str]): Models to keep - remove other
    """
    for usecase in input_config:
        cfg = getattr(input_config, usecase)
        active_models_for_usecase = getattr(active_models, usecase)
        for model in list(cfg):
            if model not in active_models_for_usecase:
                cfg.pop(model)