Chat
chat
| MODULE | DESCRIPTION |
|---|---|
main |
Main module of the application. |
src |
Source code of the chat containing core components and utilities. |
main
Main module of the application.
This module serves as the entry point for the program. It imports necessary modules, sets up any initial configuration or data structures, and possibly defines main functions or classes that are used throughout the application.
src
Source code of the chat containing core components and utilities.
| MODULE | DESCRIPTION |
|---|---|
app |
Initialize the app. |
chat |
Implementation of the core logic and interaction flow of the chat. |
endpoints |
Endpoints of the chat microservice. |
models |
Data model classes for loading and validation API and configuration parameters. |
openai_custom_auth |
Costumized Httpx Authentication Client. |
settings |
Load all settings from a central place, not hidden in utils. |
utils |
Utils functions for logging, LLM availability check and configuration processing. |
app
Initialize the app.
| FUNCTION | DESCRIPTION |
|---|---|
lifespan |
Sets up a scheduler and updates available llms. |
lifespan
async
Sets up a scheduler and updates available llms.
This lifespan function is started on startup of FastAPI. The first part
- till yield is executed on startup and initializes a scheduler to regulary
check the LLM-API. The second part is executed on shutdown and is used to
clean up the scheduler.
The available LLMs - i.e. the LLMs where API-checks passed - are cached in
FastAPI state object as app.state.available_llms.
Source code in docs/microservices/chat/src/app.py
@asynccontextmanager
async def lifespan(_app: FastAPI) -> AsyncGenerator[None]:
"""Sets up a scheduler and updates available llms.
This lifespan function is started on startup of FastAPI. The first part
- till `yield` is executed on startup and initializes a scheduler to regulary
check the LLM-API. The second part is executed on shutdown and is used to
clean up the scheduler.
The available LLMs - i.e. the LLMs where API-checks passed - are cached in
FastAPI state object as `app.state.available_llms`.
"""
async def update_llm_state() -> None:
_app.state.available_llms = await get_available_llms()
# store available LLMs in FastAPI app state
_app.state.available_llms = await get_available_llms()
# setup a scheduler
scheduler = AsyncIOScheduler()
scheduler.add_job(
update_llm_state,
"interval",
seconds=settings.check_llm_api_interval_in_s,
)
scheduler.start()
yield
# cleanup
scheduler.shutdown()
chat
Implementation of the core logic and interaction flow of the chat.
| MODULE | DESCRIPTION |
|---|---|
chat_completion |
Chat completion model running on an OpenAI-conform API. |
chat_registry |
Chat-Registry class for storing and accessing Chat-Providers (OpenAIChatCompletion). |
chat_completion
Chat completion model running on an OpenAI-conform API.
| CLASS | DESCRIPTION |
|---|---|
OpenAIChatCompletion |
Chat completion model running on an OpenAI-conform API. |
OpenAIChatCompletion
Chat completion model running on an OpenAI-conform API.
| ATTRIBUTE | DESCRIPTION |
|---|---|
llm |
Object describing the LLM.
TYPE:
|
auth_client |
Authentication client for various APIs.
TYPE:
|
llm_client |
LLM client using AsnycOpenAI API.
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
run_chat_completion |
Continues a chat history by generating the next assistant message. |
run_chat_completion_stream |
Continues a chat history by generating the next assistant message. |
Source code in docs/microservices/chat/src/chat/chat_completion.py
class OpenAIChatCompletion:
"""Chat completion model running on an OpenAI-conform API.
Attributes:
llm (LLM): Object describing the LLM.
auth_client (CustomAuthClient): Authentication client for various APIs.
llm_client (AsyncOpenAI): LLM client using AsnycOpenAI API.
"""
def __init__(self, llm: LLM) -> None:
"""Initializes the model with the LLM and the credentials."""
self.llm: LLM = llm
self.auth_client: CustomAuthClient = self._setup_auth_client()
self.llm_client: AsyncOpenAI = self._setup_llm_client()
async def run_chat_completion(self, chat_input: ChatInput) -> ChatOutput:
"""Continues a chat history by generating the next assistant message.
Args:
chat_input (ChatInput): Chat containing new message and chat history.
Returns:
Generated chat message output.
"""
messages = self._preprocess_chat_history(chat_input.as_list)
chat_output_raw = await self._generate(messages, response_format="text")
chat_output = self._postprocess_result(chat_output=chat_output_raw)
return chat_output
async def run_chat_completion_stream(
self, chat_input: ChatInput
) -> AsyncGenerator[str]:
"""Continues a chat history by generating the next assistant message.
Args:
chat_input (ChatInput): Chat containing new message and chat history.
Returns:
Generated chat output as stream.
"""
messages = self._preprocess_chat_history(chat_input.as_list)
raw_stream = self._generate_stream(
messages=messages,
response_format="text",
)
async for processed_chunk in self._postprocess_stream(raw_stream):
yield ChatStreamOutput(**processed_chunk).model_dump_json() + "\n"
def _preprocess_chat_history(
self,
chat_list: list[dict[str, str]],
) -> list[dict[str, str]]:
"""Ensures system prompt exists and trims the entire chat history.
Ensures system prompt exists and adds it if necessary.
Trims the chat history to fit within the model's maximum context length.
The oldest messages are removed first if the token limit is exceeded.
The system message is always preserved at the beginning of the history.
Args:
chat_list (list[dict[str, str]]): The chat history with roles and content.
Returns:
list[dict[str, str]]: The trimmed chat history.
"""
max_new_tokens = self.llm.inference.max_new_tokens or 0
max_total_tokens = self.llm.max_context_tokens + max_new_tokens
system_prompt = {
"role": "system",
"content": self.llm.prompt_config.system.generate,
}
if not chat_list or chat_list[0].get("role") != "system":
chat_list = [system_prompt] + chat_list
def estimate_tokens(text: str) -> int:
return int(len(text) / self.llm.character_to_token)
total_tokens = estimate_tokens(chat_list[0]["content"])
trimmed_history = [chat_list[0]]
for message in reversed(chat_list[1:]):
message_tokens = estimate_tokens(message.get("content", ""))
if total_tokens + message_tokens > max_total_tokens:
break
trimmed_history.insert(1, message)
total_tokens += message_tokens
if len(trimmed_history) < len(chat_list):
removed_count = len(chat_list) - len(trimmed_history)
logger.debug("Chat history was trimmed: %d messages removed", removed_count)
user_messages = [msg for msg in trimmed_history if msg.get("role") != "system"]
if not user_messages:
logger.warning(
"Chat history is too short after trimming. "
"Consider reducing the length of input messages or adjusting the LLM parameters: "
"'max_context_tokens' and 'max_new_tokens'."
)
return trimmed_history
async def _generate(
self,
messages: list[dict[str, str]],
response_format: str = "text",
) -> ChatOutput:
"""General generation function that generates an output given an input text.
Args:
messages (list[dict]): A list of dictionaries where each dictionary contains the "role" and the "content".
response_format (str): Format of the response.
Returns:
Generated chat message output.
"""
try:
response = await self.llm_client.chat.completions.create(
model=self.llm.model,
messages=messages,
response_format={"type": response_format},
max_completion_tokens=self.llm.inference.max_new_tokens,
temperature=self.llm.inference.temperature,
top_p=self.llm.inference.top_p,
stream=False,
)
logger.debug(f"Response from LLM client: {response}")
except BadRequestError as e:
logger.error(f"Invalid request to OpenAI API: {e}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=(
"Fehler beim Verarbeiten Ihrer Anfrage durch das Sprachmodell. "
"Bitte kürzen Sie Ihren Eingabetext oder versuchen Sie es mit einem anderen Sprachmodell."
),
)
except (APITimeoutError, httpx.TimeoutException) as e:
logger.error(f"{self.llm.label} API call timed out: {e}")
raise HTTPException(
status_code=status.HTTP_504_GATEWAY_TIMEOUT,
detail=(
"Das verwendete Sprachmodell ist momentan nicht erreichbar "
"oder benötigt aufgrund hoher Auslastung länger als üblich, "
"um eine Antwort zu generieren. Bitte versuchen Sie es in wenigen Momenten erneut."
),
)
except Exception as e:
logger.error(
f"{self.llm.label} API call of Chat-Completion to LLM failed: {e}"
)
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail="Interner Fehler beim Aufruf des Sprachmodells. Bitte versuchen Sie es später erneut.",
)
try:
content: str = response.choices[0].message.content
except Exception as e:
logger.error(f"{self.llm.label} chat content not available: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Fehler beim Verarbeiten der Antwort des Sprachmodells.",
)
return ChatOutput(response=content)
async def _generate_stream(
self,
messages: list[dict[str, str]],
response_format: str = "text",
) -> Stream[dict]:
"""Take a list of messages as input and return the stream of a model-generated message as output.
Args:
messages (list[dict[str, str]]): Messages as input to the model.
response_format (str): Format of the response.
Returns:
Stream of model-generated messages.
"""
try:
response = await self.llm_client.chat.completions.create(
model=self.llm.model,
messages=messages,
response_format={"type": response_format},
max_completion_tokens=self.llm.inference.max_new_tokens,
temperature=self.llm.inference.temperature,
top_p=self.llm.inference.top_p,
stream=True,
)
async for chunk in response:
yield chunk
except BadRequestError as e:
logger.error(f"Invalid request to OpenAI API: {e}")
yield {
"type": "error",
"error": (
"Fehler beim Verarbeiten Ihrer Anfrage durch das Sprachmodell. "
"Bitte kürzen Sie Ihren Eingabetext oder versuchen Sie es mit einem anderen Sprachmodell."
),
}
except (APITimeoutError, httpx.TimeoutException) as e:
logger.error(f"{self.llm.label} API call timed out: {e}")
yield {
"type": "error",
"error": (
"Das verwendete Sprachmodell ist momentan nicht erreichbar "
"oder benötigt aufgrund hoher Auslastung länger als üblich, "
"um eine Antwort zu generieren. Bitte versuchen Sie es in wenigen Momenten erneut."
),
}
except Exception as e:
logger.error(f"Error during streaming: {e}")
yield {
"type": "error",
"error": "Es ist ein Fehler aufgetreten. Bitte versuchen Sie es später erneut.",
}
def _postprocess_result(self, chat_output: ChatOutput) -> ChatOutput:
"""Postprocess reasoning block of a chat output.
Args:
chat_output (ChatOutput): Generated chat output in raw format.
Returns:
Post-processed chat output with reasoning excluded,
reasoning text stored separately in `chat_output.reason`.
"""
response = chat_output.response
if self.llm.reasoning_config.is_reasoning_model:
reasoning_regex = (
rf"{self.llm.reasoning_config.reasoning_start_marker}"
r"(.*?)"
rf"{self.llm.reasoning_config.reasoning_end_marker}"
)
match = re.search(reasoning_regex, response, flags=re.DOTALL)
if match:
chat_output.reason = match.group(1).strip()
chat_output.response = re.sub(
reasoning_regex, "", response, flags=re.DOTALL
).lstrip()
else:
chat_output.reason = ""
return chat_output
async def _postprocess_stream(
self,
stream: Stream[dict],
) -> AsyncGenerator[dict]:
"""Postprocesses the raw chat completion stream.
Splits the model output into separate "reason" and "response" outputs,
handling multi-token start/end markers and incremental token streaming.
Args:
stream (Stream[dict]): Async generator of raw chat chunks.
Yields:
Structured stream outputs with keys:
- 'type': 'reason' or 'response'
- 'content': partial text content
- 'finish_reason': optional finish reason from the model
"""
state = StreamState()
async for chunk in stream:
if self._is_error_chunk(chunk):
yield chunk
break
# Extract text and finish_reason from current chunk
text, finish_reason = self._extract_text_and_finish_reason(chunk)
logger.debug(f"Text: {text}, Finish Reason: {finish_reason}")
# finish_reason="length": generation exceeded max completion tokens or max context length.
if finish_reason == "length":
yield {
"type": "error",
"error": (
"Ihre Anfrage ist zu lang und überschreitet das Kontextfenster des Modells. "
"Bitte kürzen Sie Ihren Eingabetext oder reduzieren Sie die Anzahl der Nachrichten."
),
}
break
# Continue with next chunk if text empty (besides "Stop-Chunk")
if not text and not finish_reason:
continue
elif not text:
text = ""
# Stream non-reasoning model:
if not self.llm.reasoning_config.is_reasoning_model:
yield self._format_output(
output_type="response", content=text, finish_reason=finish_reason
)
continue
# Stream reasoning model:
async for output in self._handle_reasoning_stream(
state, text, finish_reason
):
yield output
def _is_error_chunk(self, chunk: dict) -> bool:
"""Checks if the given stream chunk represents an error.
Args:
chunk (dict): A single stream chunk from the model.
Returns:
bool: True if the chunk is an error (has type 'error'), False otherwise.
"""
return isinstance(chunk, dict) and chunk.get("type") == "error"
def _extract_text_and_finish_reason(
self, chunk: ChatCompletionChunk
) -> tuple[str, str | None]:
"""Extracts content text and finish_reason from a stream chunk.
Args:
chunk (ChatCompletionChunk): Raw stream chunk.
Returns:
The token text and optional finish_reason.
"""
if not chunk.choices:
return None, None
text = chunk.choices[0].delta.content
finish_reason = getattr(chunk.choices[0], "finish_reason", None)
return text, finish_reason
def _format_output(
self, output_type: str, content: str, finish_reason: str | None
) -> dict:
"""Builds a structured output dictionary.
Args:
output_type (str): Either "reason" or "response".
content (str): Text content for the output.
finish_reason (str | None): Finish reason if provided by the model.
Returns:
Output with keys {"type", "content", "finish_reason"}.
"""
return {
"type": output_type,
"content": content,
"finish_reason": finish_reason,
}
async def _handle_reasoning_stream(
self,
state: StreamState,
text: str,
finish_reason: str | None,
) -> AsyncGenerator[dict]:
"""Handles reasoning-mode streaming for reasoning-enabled models.
Buffers incoming text, detects reasoning start/end markers,
and processes content according to the current state mode
('idle', 'reason', or 'response').
Args:
state (StreamState): The current streaming state, including buffer,
mode, and accumulated outputs.
text (str): The incremental text chunk from the model.
finish_reason (str | None): The optional finish reason if
provided by the model.
Yields:
Structured stream outputs with keys:
- 'type': 'reason' or 'response'
- 'content': partial text content
- 'finish_reason': optional finish reason from the model
"""
# Always append text to buffer first
state.buffer += text
# Detect start of reasoning
if state.mode == "idle":
self._check_reasoning_start(
state=state,
start_marker=self.llm.reasoning_config.reasoning_start_marker,
)
if state.mode == "idle":
return
# Process reasoning or response depending on mode
if state.mode == "reason":
self._process_reasoning_buffer(
state=state,
finish_reason=finish_reason,
reasoning_end_marker=self.llm.reasoning_config.reasoning_end_marker,
end_marker_len=len(self.llm.reasoning_config.reasoning_end_marker),
)
elif state.mode == "response":
self._process_response_buffer(
state=state,
text=text,
finish_reason=finish_reason,
)
# Stream outputs and remove them from state
while state.outputs:
yield state.outputs.pop(0)
def _check_reasoning_start(
self,
state: StreamState,
start_marker: str,
) -> None:
"""Checks and updates the streaming state based on the reasoning start marker.
Args:
state (StreamState): The current streaming state.
start_marker (str): Marker that signals the beginning of reasoning.
Returns:
The function modifies `state` in place.
- `state.mode` will be set to "reason", "response", or remain "idle".
- `state.outputs` may receive a response chunk if the buffer
does not start with (a subset of) the marker.
"""
marker_len = len(start_marker)
buffer = state.buffer.lstrip("\n\r ")
if buffer.startswith(start_marker):
state.buffer = buffer[marker_len:] # remove marker
state.mode = "reason"
return
for i in range(marker_len - 1, 0, -1):
if start_marker.startswith(buffer[:i]):
return
state.mode = "response"
def _process_reasoning_buffer(
self,
state: StreamState,
finish_reason: str | None,
reasoning_end_marker: str,
end_marker_len: int,
) -> None:
"""Processes reasoning text until the end marker is found.
Streams reasoning chunks incrementally. If the end marker is detected,
switches the mode to 'response' and emits any remaining buffer content.
Args:
state (StreamState): Current stream state.
finish_reason (str | None): Optional finish reason from the model.
reasoning_end_marker (str): End marker for reasoning.
end_marker_len (int): Length of the reasoning end marker.
Returns:
The function modifies `state` in place.
"""
if reasoning_end_marker in state.buffer:
reason_part, rest = state.buffer.split(reasoning_end_marker, 1)
if reason_part:
state.outputs.append(
self._format_output("reason", reason_part, finish_reason)
)
state.mode = "response"
state.buffer = rest
if state.buffer:
state.outputs.append(
self._format_output(
"response", state.buffer.lstrip("\n\r "), finish_reason
)
)
state.first_response_sent = True
state.buffer = ""
elif len(state.buffer) > end_marker_len:
reason_chunk = state.buffer[:-end_marker_len]
state.outputs.append(
self._format_output("reason", reason_chunk, finish_reason)
)
state.buffer = state.buffer[-end_marker_len:]
def _process_response_buffer(
self,
state: StreamState,
text: str,
finish_reason: str | None,
) -> None:
"""Processes response tokens after reasoning has finished.
Emits response outputs, trims leading newlines if it's the first response,
and clears the buffer.
Args:
state (StreamState): Current stream state.
text (str): New token text to add.
finish_reason (str | None): Optional finish reason from the model.
Returns:
The function modifies `state` in place.
"""
if not state.first_response_sent:
text = text.lstrip("\n\r ")
state.first_response_sent = True
state.outputs.append(self._format_output("response", text, finish_reason))
state.buffer = ""
def _setup_auth_client(self) -> CustomAuthClient:
"""Set up authentication client for various APIs.
Sets up an authentication client using either a token, credentials or no authentication method.
Returns:
Authentication client.
"""
if self.llm.api.auth:
auth_client = CustomAuthClient(
secret=self.llm.api.auth.secret.get_secret_value(),
auth_type=self.llm.api.auth.type,
timeout=self.llm.api.timeout,
)
else:
auth_client = CustomAuthClient(timeout=self.llm.api.timeout)
return auth_client
def _setup_llm_client(self) -> AsyncOpenAI:
"""Initializing the LLM client using AsnycOpenAI API.
Returns:
Asynchronous OpenAI client.
"""
llm_client = AsyncOpenAI(
api_key=" ",
http_client=self.auth_client,
base_url=str(self.llm.api.url),
)
return llm_client
run_chat_completion
async
Continues a chat history by generating the next assistant message.
| PARAMETER | DESCRIPTION |
|---|---|
chat_input
|
Chat containing new message and chat history.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ChatOutput
|
Generated chat message output. |
Source code in docs/microservices/chat/src/chat/chat_completion.py
async def run_chat_completion(self, chat_input: ChatInput) -> ChatOutput:
"""Continues a chat history by generating the next assistant message.
Args:
chat_input (ChatInput): Chat containing new message and chat history.
Returns:
Generated chat message output.
"""
messages = self._preprocess_chat_history(chat_input.as_list)
chat_output_raw = await self._generate(messages, response_format="text")
chat_output = self._postprocess_result(chat_output=chat_output_raw)
return chat_output
run_chat_completion_stream
async
Continues a chat history by generating the next assistant message.
| PARAMETER | DESCRIPTION |
|---|---|
chat_input
|
Chat containing new message and chat history.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
AsyncGenerator[str]
|
Generated chat output as stream. |
Source code in docs/microservices/chat/src/chat/chat_completion.py
async def run_chat_completion_stream(
self, chat_input: ChatInput
) -> AsyncGenerator[str]:
"""Continues a chat history by generating the next assistant message.
Args:
chat_input (ChatInput): Chat containing new message and chat history.
Returns:
Generated chat output as stream.
"""
messages = self._preprocess_chat_history(chat_input.as_list)
raw_stream = self._generate_stream(
messages=messages,
response_format="text",
)
async for processed_chunk in self._postprocess_stream(raw_stream):
yield ChatStreamOutput(**processed_chunk).model_dump_json() + "\n"
chat_registry
Chat-Registry class for storing and accessing Chat-Providers (OpenAIChatCompletion).
| CLASS | DESCRIPTION |
|---|---|
ChatRegistry |
Manages and stores Chat-Providers (OpenAIChatCompletion) and makes access possible. |
ChatRegistry
Manages and stores Chat-Providers (OpenAIChatCompletion) and makes access possible.
| ATTRIBUTE | DESCRIPTION |
|---|---|
chat_models |
Chat models.
TYPE:
|
llm_config |
Model configuration for chat initialzation.
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
run_chat_completion |
Starts the chat completion of selected language model. |
run_chat_completion_json_stream |
Runs the chat completion process in json format using the selected language model. |
run_chat_completion_text_stream |
Runs the chat completion process in text format using the selected language model. |
Source code in docs/microservices/chat/src/chat/chat_registry.py
class ChatRegistry:
"""Manages and stores Chat-Providers (OpenAIChatCompletion) and makes access possible.
Attributes:
chat_models (dict[str, OpenAIChatCompletion]): Chat models.
llm_config (LLMConfig): Model configuration for chat initialzation.
"""
def __init__(self, llm_config: LLMConfig) -> None:
"""Initializes the list of chat models."""
self.llm_config: LLMConfig = llm_config
self.chat_models: dict[str, OpenAIChatCompletion] = self._initialize_models()
def _initialize_models(self) -> dict[str, OpenAIChatCompletion]:
"""Load all available chat models based on custom configuration.
Returns:
All model objects with custom configuration.
"""
models = {}
for model_name, llm in self.llm_config.chat.items():
models[model_name] = OpenAIChatCompletion(llm=llm)
logger.debug(f"Initialized {len(models)} chat models")
return models
async def run_chat_completion(
self, model: OpenAIChatCompletion, chat_input: ChatInput
) -> ChatOutput:
"""Starts the chat completion of selected language model.
Args:
chat_input (ChatInput): Defines the input to the chat endpoint including the chat message.
model (OpenAIChatCompletion): Language model to use for chat completion.
Returns:
Chat output containing enerated chat message.
"""
chat_result = await model.run_chat_completion(chat_input)
logger.info(
f"Chat completion successfully completed with model: {chat_input.language_model}"
)
return chat_result
async def run_chat_completion_text_stream(
self,
model: OpenAIChatCompletion,
chat_input: ChatInput,
) -> AsyncGenerator[str]:
"""Runs the chat completion process in text format using the selected language model.
Args:
chat_input (ChatInput): Input to the chat model, including chat history and selected language model.
model (OpenAIChatCompletion): Language model to use for chat completion.
Yields:
The next chunk of generated text and metadata.
"""
try:
async for chunk in model.run_chat_completion_stream(chat_input):
chunk_dict = json.loads(chunk)
logger.debug(chunk_dict)
if chunk_dict.get("type") == "response" and chunk_dict.get("content"):
yield chunk_dict["content"]
elif chunk_dict.get("type") == "error" and chunk_dict.get("error"):
yield "\nSYSTEM-WARNUNG: " + chunk_dict.get("error")
logger.info(
f"Streaming chat completed with model: {chat_input.language_model}"
)
except Exception as e:
logger.error(f"Error while processing chat input with model: {model}: {e}")
yield "Es ist ein Fehler aufgetreten. Bitte versuchen Sie es später erneut."
async def run_chat_completion_json_stream(
self,
model: OpenAIChatCompletion,
chat_input: ChatInput,
) -> AsyncGenerator[str]:
"""Runs the chat completion process in json format using the selected language model.
Args:
chat_input (ChatInput): Input to the chat model, including chat history and selected language model.
model (OpenAIChatCompletion): Language model to use for chat completion.
Yields:
The next chunk of generated text and metadata.
"""
try:
async for chunk in model.run_chat_completion_stream(chat_input):
if chunk:
yield chunk
logger.info(
f"Streaming chat completed with model: {chat_input.language_model}"
)
except Exception as e:
logger.error(f"Error while processing chat input with model: {model}: {e}")
yield ChatStreamOutput(
type="error",
error="Es ist ein Fehler aufgetreten. Bitte versuchen Sie es später erneut.",
).model_dump_json()
run_chat_completion
async
Starts the chat completion of selected language model.
| PARAMETER | DESCRIPTION |
|---|---|
chat_input
|
Defines the input to the chat endpoint including the chat message.
TYPE:
|
model
|
Language model to use for chat completion.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ChatOutput
|
Chat output containing enerated chat message. |
Source code in docs/microservices/chat/src/chat/chat_registry.py
async def run_chat_completion(
self, model: OpenAIChatCompletion, chat_input: ChatInput
) -> ChatOutput:
"""Starts the chat completion of selected language model.
Args:
chat_input (ChatInput): Defines the input to the chat endpoint including the chat message.
model (OpenAIChatCompletion): Language model to use for chat completion.
Returns:
Chat output containing enerated chat message.
"""
chat_result = await model.run_chat_completion(chat_input)
logger.info(
f"Chat completion successfully completed with model: {chat_input.language_model}"
)
return chat_result
run_chat_completion_json_stream
async
Runs the chat completion process in json format using the selected language model.
| PARAMETER | DESCRIPTION |
|---|---|
chat_input
|
Input to the chat model, including chat history and selected language model.
TYPE:
|
model
|
Language model to use for chat completion.
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
AsyncGenerator[str]
|
The next chunk of generated text and metadata. |
Source code in docs/microservices/chat/src/chat/chat_registry.py
async def run_chat_completion_json_stream(
self,
model: OpenAIChatCompletion,
chat_input: ChatInput,
) -> AsyncGenerator[str]:
"""Runs the chat completion process in json format using the selected language model.
Args:
chat_input (ChatInput): Input to the chat model, including chat history and selected language model.
model (OpenAIChatCompletion): Language model to use for chat completion.
Yields:
The next chunk of generated text and metadata.
"""
try:
async for chunk in model.run_chat_completion_stream(chat_input):
if chunk:
yield chunk
logger.info(
f"Streaming chat completed with model: {chat_input.language_model}"
)
except Exception as e:
logger.error(f"Error while processing chat input with model: {model}: {e}")
yield ChatStreamOutput(
type="error",
error="Es ist ein Fehler aufgetreten. Bitte versuchen Sie es später erneut.",
).model_dump_json()
run_chat_completion_text_stream
async
Runs the chat completion process in text format using the selected language model.
| PARAMETER | DESCRIPTION |
|---|---|
chat_input
|
Input to the chat model, including chat history and selected language model.
TYPE:
|
model
|
Language model to use for chat completion.
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
AsyncGenerator[str]
|
The next chunk of generated text and metadata. |
Source code in docs/microservices/chat/src/chat/chat_registry.py
async def run_chat_completion_text_stream(
self,
model: OpenAIChatCompletion,
chat_input: ChatInput,
) -> AsyncGenerator[str]:
"""Runs the chat completion process in text format using the selected language model.
Args:
chat_input (ChatInput): Input to the chat model, including chat history and selected language model.
model (OpenAIChatCompletion): Language model to use for chat completion.
Yields:
The next chunk of generated text and metadata.
"""
try:
async for chunk in model.run_chat_completion_stream(chat_input):
chunk_dict = json.loads(chunk)
logger.debug(chunk_dict)
if chunk_dict.get("type") == "response" and chunk_dict.get("content"):
yield chunk_dict["content"]
elif chunk_dict.get("type") == "error" and chunk_dict.get("error"):
yield "\nSYSTEM-WARNUNG: " + chunk_dict.get("error")
logger.info(
f"Streaming chat completed with model: {chat_input.language_model}"
)
except Exception as e:
logger.error(f"Error while processing chat input with model: {model}: {e}")
yield "Es ist ein Fehler aufgetreten. Bitte versuchen Sie es später erneut."
endpoints
Endpoints of the chat microservice.
| FUNCTION | DESCRIPTION |
|---|---|
fetch_chat_response |
Chat completion endpoint. |
fetch_chat_response_json_stream |
Chat completion endpoint with json-stream. |
fetch_chat_response_text_stream |
Chat completion endpoint with text-stream. |
get_llms |
Return model information of available LLMs. |
health |
Return a health check message. |
fetch_chat_response
async
Chat completion endpoint.
| PARAMETER | DESCRIPTION |
|---|---|
chat_input
|
Input containing the chat message.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ChatOutput
|
Output of chat response |
Source code in docs/microservices/chat/src/endpoints.py
@router.post(
"/completion",
response_model=ChatOutput,
summary="Chat completion endpoint.",
description=(
"Performs response for chat completions.\n\n"
"The endpoint returns a single JSON response containing the chat output.\n\n"
),
openapi_extra={
"requestBody": {
"content": {
"application/json": {
"examples": ChatInput.model_config["json_schema_extra"][
"openapi_examples"
],
}
},
}
},
responses={
200: {
"description": "Successful chat response.",
"content": {
"application/json": {
"examples": ChatOutput.model_config["json_schema_extra"][
"openapi_examples"
],
},
},
},
400: {
"description": (
"Invalid LLM API request, such as using an unsupported model or exceeding the context window."
)
},
500: {"description": "Error processing answer of LLM client."},
502: {"description": "API call of Chat-Completion to LLM failed."},
504: {
"description": "API call of Chat-Completion to LLM failed due to timeout."
},
},
)
async def fetch_chat_response(chat_input: ChatInput) -> ChatOutput:
"""Chat completion endpoint.
Args:
chat_input (ChatInput): Input containing the chat message.
Returns:
Output of chat response
"""
model = chat_registry.chat_models.get(chat_input.language_model)
if model is None:
logger.error(f"Invalid language model selected: {chat_input.language_model}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=(
f"Es wurde ein ungültiges Sprachmodell ausgewählt ({chat_input.language_model})."
" Bitte versuchen Sie es mit einem anderen Modell."
),
)
logger.info(f"Chat completion started with model: {chat_input.language_model}")
return await chat_registry.run_chat_completion(model, chat_input)
fetch_chat_response_json_stream
async
Chat completion endpoint with json-stream.
| PARAMETER | DESCRIPTION |
|---|---|
chat_input
|
Input containing the chat message.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
StreamingResponse
|
Output of chat response. |
Source code in docs/microservices/chat/src/endpoints.py
@router.post(
"/v2/completion/stream",
response_class=StreamingResponse,
summary="Chat completion endpoint with x-ndjson-stream.",
description=(
"Starts a streaming response for chat completions.\n\n"
"The endpoint streams messages as NDJSON (`application/x-ndjson`) "
"with different types: `response`, `reason`, and `error`."
),
openapi_extra={
"requestBody": {
"content": {
"application/json": {
"examples": ChatInput.model_config["json_schema_extra"][
"openapi_examples"
],
}
},
}
},
responses={
200: {
"description": "Streaming started successfully.",
"content": {
"application/x-ndjson": {
"examples": ChatStreamOutput.model_config["json_schema_extra"][
"openapi_examples"
],
},
},
},
400: {"description": "Invalid language model."},
},
)
async def fetch_chat_response_json_stream(chat_input: ChatInput) -> StreamingResponse:
"""Chat completion endpoint with json-stream.
Args:
chat_input (ChatInput): Input containing the chat message.
Returns:
Output of chat response.
"""
model = chat_registry.chat_models.get(chat_input.language_model)
if model is None:
logger.error(f"Invalid language model selected: {chat_input.language_model}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=(
f"Es wurde ein ungültiges Sprachmodell ausgewählt ({chat_input.language_model})."
" Bitte versuchen Sie es mit einem anderen Modell."
),
)
logger.info(
f"Streaming chat completion started with model: {chat_input.language_model}"
)
return StreamingResponse(
chat_registry.run_chat_completion_json_stream(model, chat_input),
media_type="application/x-ndjson",
)
fetch_chat_response_text_stream
async
Chat completion endpoint with text-stream.
| PARAMETER | DESCRIPTION |
|---|---|
chat_input
|
Input containing the chat message.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
StreamingResponse
|
Output of the chat response. |
Source code in docs/microservices/chat/src/endpoints.py
@router.post(
"/completion/stream",
response_class=StreamingResponse,
summary="Chat completion endpoint with text-stream.",
description=(
"Starts a streaming response for chat completions.\n\n"
"The endpoint streams messages as text (`text/event-stream`)."
),
openapi_extra={
"requestBody": {
"content": {
"application/json": {
"examples": ChatInput.model_config["json_schema_extra"][
"openapi_examples"
],
}
},
}
},
responses={
200: {
"description": "Streaming started successfully.",
"content": {
"text/event-stream": {
"schema": {
"type": "string",
"example": "Hello, how can I help you today?\n\n",
},
"examples": {
"response": {
"summary": "Chat response",
"description": (
"This is the standard output ",
"returned by the chat model for a normal request.",
),
"value": "Hello, how can I help you today?\n\n",
},
"context_length_exceeded_during_stream": {
"summary": "Context length exceeded during stream",
"description": (
"This example shows the output when the input exceeds the model's context window."
),
"value": (
"Ihre Anfrage ist zu lang und überschreitet das Kontextfenster des Modells. "
"Bitte kürzen Sie Ihren Eingabetext oder reduzieren Sie die Anzahl der Nachrichten."
),
},
"bad_request": {
"summary": "Invalid request",
"description": (
"This example shows the output produced when the input exceeds the context window "
"or when the request is malformed or lacks required parameters."
),
"value": (
"Fehler beim Verarbeiten Ihrer Anfrage durch das Sprachmodell. Bitte kürzen Sie "
"Ihren Eingabetext oder versuchen Sie es mit einem anderen Sprachmodell."
),
},
"internal_error": {
"summary": "Internal error",
"description": (
"This example shows the output when an unexpected error occurs during "
"streaming.",
),
"value": "Es ist ein Fehler aufgetreten. Bitte versuchen Sie es später erneut.",
},
},
}
},
},
400: {"description": "Invalid language model."},
},
)
async def fetch_chat_response_text_stream(chat_input: ChatInput) -> StreamingResponse:
"""Chat completion endpoint with text-stream.
Args:
chat_input (ChatInput): Input containing the chat message.
Returns:
Output of the chat response.
"""
model = chat_registry.chat_models.get(chat_input.language_model)
if model is None:
logger.error(f"Invalid language model selected: {chat_input.language_model}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=(
f"Es wurde ein ungültiges Sprachmodell ausgewählt ({chat_input.language_model})."
" Bitte versuchen Sie es mit einem anderen Modell."
),
)
logger.info(
f"Streaming chat completion started with model: {chat_input.language_model}"
)
return StreamingResponse(
chat_registry.run_chat_completion_text_stream(model, chat_input),
media_type="text/event-stream",
)
get_llms
async
Return model information of available LLMs.
| PARAMETER | DESCRIPTION |
|---|---|
request
|
Request-Data.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[dict]
|
The list of available models. |
Source code in docs/microservices/chat/src/endpoints.py
@router.get(
"/llms",
summary="List available language models.",
description=("Returns a list of available language models (LLMs).\n\n"),
responses={
200: {
"description": "List of available LLMs.",
"content": {
"application/json": {
"example": [
{
"label": "test_model:mock",
"is_remote": False,
"name": "test_model_mock",
},
]
}
},
},
500: {"description": "Internal server error accessing microservice"},
},
)
async def get_llms(request: Request) -> list[dict]:
"""Return model information of available LLMs.
Args:
request (Request): Request-Data.
Returns:
The list of available models.
"""
app = request.app # indirectly access the FastAPI app object
return app.state.available_llms
health
async
Return a health check message.
| RETURNS | DESCRIPTION |
|---|---|
dict[str, str]
|
The health check message as a dictonary. |
Source code in docs/microservices/chat/src/endpoints.py
@router.get(
"/",
summary="Health check endpoint",
description=(
"Returns a simple message indicating that the chat service is running.\n\n"
"Use this endpoint to verify that the service is alive and responsive."
),
responses={
200: {
"description": "Health check successful",
"content": {"application/json": {"example": {"status": "Chat is running"}}},
},
500: {"description": "Internal server error"},
},
)
@router.get(
"/health",
summary="Health check endpoint",
description=(
"Returns a simple message indicating that the chat service is running.\n\n"
"Use this endpoint to verify that the service is alive and responsive."
),
responses={
200: {
"description": "Health check successful",
"content": {"application/json": {"example": {"status": "Chat is running"}}},
},
500: {"description": "Internal server error"},
},
)
async def health() -> dict[str, str]:
"""Return a health check message.
Returns:
The health check message as a dictonary.
"""
return {"message": f"{settings.service_name} is running"}
models
Data model classes for loading and validation API and configuration parameters.
| MODULE | DESCRIPTION |
|---|---|
api_input |
pydantic Models for API input parameters. |
api_output |
pydantic Models for API output parameters. |
chat_state |
pydantic Models for chat completion parameters. |
general |
Load and check Settings from yml. |
llms |
pydantic model for LLM config. |
api_input
pydantic Models for API input parameters.
| CLASS | DESCRIPTION |
|---|---|
ChatInput |
Model defining the input of a valid chat request. |
ChatMessage |
Message input model used to store the content of chat messages. |
ChatInput
Bases: BaseModel
Model defining the input of a valid chat request.
| ATTRIBUTE | DESCRIPTION |
|---|---|
new_message |
The new user message to be processed.
TYPE:
|
chat_history |
List of previous chat messages forming the conversation context.
TYPE:
|
language_model |
The identifier of the language model to use.
TYPE:
|
request_timestamp |
Timestamp of the request.
TYPE:
|
Source code in docs/microservices/chat/src/models/api_input.py
class ChatInput(BaseModel):
"""Model defining the input of a valid chat request.
Attributes:
new_message (ChatMessage): The new user message to be processed.
chat_history (list[ChatMessage]): List of previous chat messages forming the conversation context.
language_model (str): The identifier of the language model to use.
request_timestamp (int | None): Timestamp of the request.
"""
new_message: ChatMessage
chat_history: list[ChatMessage] = []
language_model: str
request_timestamp: int | None = Field(
None,
description="Unix timestamp indicating when the request was made.",
deprecated=True,
)
@property
def as_list(self) -> list[dict[str, str]]:
"""Transforms the chat history plus the new message into a list of dictionaries containing the role and message.
Returns:
Each dictionary contains keys 'role' and 'content'.
"""
chat_history_list = [
{"role": message.role, "content": message.content}
for message in self.chat_history
]
chat_history_list.append(
{"role": self.new_message.role, "content": self.new_message.content}
)
return chat_history_list
model_config = ConfigDict(
json_schema_extra={
"openapi_examples": {
"standard": {
"summary": "Simple chat input",
"description": "Standard input with short chat history.",
"value": {
"new_message": {
"role": "user",
"content": "What's the weather like today?",
},
"chat_history": [
{"role": "user", "content": "Hi"},
{
"role": "assistant",
"content": "Hello! How can I help you today?",
},
],
"language_model": "test_model_mock",
},
}
}
}
)
ChatMessage
Bases: BaseModel
Message input model used to store the content of chat messages.
| ATTRIBUTE | DESCRIPTION |
|---|---|
content |
The textual content of the message.
TYPE:
|
role |
The role of the message sender. Must be one of "system", "user", "assistant". Defaults to "user".
TYPE:
|
Source code in docs/microservices/chat/src/models/api_input.py
class ChatMessage(BaseModel):
"""Message input model used to store the content of chat messages.
Attributes:
content (str): The textual content of the message.
role (str): The role of the message sender. Must be one of "system", "user", "assistant".
Defaults to "user".
"""
content: str
role: Literal["user", "system", "assistant"] = "user"
api_output
pydantic Models for API output parameters.
| CLASS | DESCRIPTION |
|---|---|
ChatOutput |
Chat response model of chat output. |
ChatStreamOutput |
Chat stream response model of chat output. |
ChatOutput
Bases: BaseModel
Chat response model of chat output.
| ATTRIBUTE | DESCRIPTION |
|---|---|
response |
The generated chat response.
TYPE:
|
reason |
Optional reasoning or explanation for the response.
TYPE:
|
Source code in docs/microservices/chat/src/models/api_output.py
class ChatOutput(BaseModel):
"""Chat response model of chat output.
Attributes:
response (str): The generated chat response.
reason (str | None): Optional reasoning or explanation for the response.
"""
response: str
reason: str | None = None
model_config = ConfigDict(
json_schema_extra={
"openapi_examples": {
"simple": {
"summary": "Simple chat response",
"description": "Used for models that produce a simple response without reasoning.",
"value": {
"response": "The weather is nice today.",
},
},
"reasoning": {
"summary": "Response with reasoning",
"description": "Used for reasoning-enabled models, showing both the answer and its explanation.",
"value": {
"response": "The weather is nice today.",
"reason": "It is sunny outside.",
},
},
}
}
)
ChatStreamOutput
Bases: BaseModel
Chat stream response model of chat output.
| ATTRIBUTE | DESCRIPTION |
|---|---|
type |
The kind of output. One of 'reason', 'response', or 'error'.
TYPE:
|
content |
Partial text content of the stream if type != error.
TYPE:
|
finish_reason |
Optional finish reason from the model.
TYPE:
|
error |
Error message if type == 'error'.
TYPE:
|
Source code in docs/microservices/chat/src/models/api_output.py
class ChatStreamOutput(BaseModel):
"""Chat stream response model of chat output.
Attributes:
type (str): The kind of output. One of 'reason', 'response', or 'error'.
content (str | None): Partial text content of the stream if type != error.
finish_reason (str | None): Optional finish reason from the model.
error (str | None): Error message if type == 'error'.
"""
type: Literal["reason", "response", "error"]
content: str | None = None
finish_reason: str | None = None
error: str | None = None
model_config = ConfigDict(
json_schema_extra={
"openapi_examples": {
"response": {
"summary": "Chat response",
"description": "Standard response returned by the chat model when a normal message is processed.",
"value": {
"type": "response",
"content": "Hello, how can I help you today?",
"finish_reason": None,
"error": None,
},
},
"reason": {
"summary": "Reason output",
"description": "Response including the reasoning or explanation of the model's output.",
"value": {
"type": "reason",
"content": "User said hello. I will answer politely.",
"finish_reason": None,
"error": None,
},
},
"context_length_exceeded_during_stream": {
"summary": "Context length exceeded during streaming",
"description": "Returned when the prompt exceeds the model's context window.",
"value": {
"type": "error",
"content": None,
"finish_reason": None,
"error": (
"Ihre Anfrage ist zu lang und überschreitet das Kontextfenster des Modells. "
"Bitte kürzen Sie Ihren Eingabetext oder reduzieren Sie die Anzahl der Nachrichten."
),
},
},
"bad_request": {
"summary": "Invalid request",
"description": (
"Returned when the input exceeds the context window or the request is malformed "
"or has invalid parameters."
),
"value": {
"type": "error",
"content": None,
"finish_reason": None,
"error": (
"Fehler beim Verarbeiten Ihrer Anfrage durch das Sprachmodell. "
"Bitte kürzen Sie Ihren Eingabetext oder versuchen Sie es mit einem anderen Sprachmodell."
),
},
},
"internal_error": {
"summary": "Internal error",
"description": "Returned when an unexpected error occurs during streaming.",
"value": {
"type": "error",
"content": None,
"finish_reason": None,
"error": "Es ist ein Fehler aufgetreten. Bitte versuchen Sie es später erneut.",
},
},
}
}
)
chat_state
pydantic Models for chat completion parameters.
| CLASS | DESCRIPTION |
|---|---|
StreamState |
Holds the state of a reasoning/response stream. |
StreamState
Bases: BaseModel
Holds the state of a reasoning/response stream.
| ATTRIBUTE | DESCRIPTION |
|---|---|
buffer |
Temporary storage for incoming text chunks.
TYPE:
|
mode |
Current mode of the stream. - "idle": Waiting for reasoning to start. - "reason": Currently streaming reasoning tokens. - "response": Currently streaming response tokens.
TYPE:
|
first_response_sent |
Whether first response output has been sent.
TYPE:
|
outputs |
Accumulated structured events from the stream. Each dict contains keys like "type" (reason/response), "content" and "finish_reason".
TYPE:
|
Source code in docs/microservices/chat/src/models/chat_state.py
class StreamState(BaseModel):
"""Holds the state of a reasoning/response stream.
Attributes:
buffer (str): Temporary storage for incoming text chunks.
mode (Literal["idle", "reason", "response"]): Current mode of the stream.
- "idle": Waiting for reasoning to start.
- "reason": Currently streaming reasoning tokens.
- "response": Currently streaming response tokens.
first_response_sent (bool): Whether first response output has been sent.
outputs (list[dict]): Accumulated structured events from the stream.
Each dict contains keys like "type" (reason/response), "content" and "finish_reason".
"""
buffer: str = ""
mode: Literal["idle", "reason", "response"] = "idle"
first_response_sent: bool = False
outputs: list[dict] = []
model_config = ConfigDict(arbitrary_types_allowed=True)
general
Load and check Settings from yml.
| CLASS | DESCRIPTION |
|---|---|
ActiveLLMs |
Selection of available models for respective use cases. |
LogLevel |
Enum class specifying possible log levels. |
Settings |
General Settings for the service. |
ActiveLLMs
Bases: BaseModel
Selection of available models for respective use cases.
| ATTRIBUTE | DESCRIPTION |
|---|---|
model_config |
Used to ignore other services, which are defined in the config.
TYPE:
|
chat |
List containing available models for chat. It may contain only a subset of all models in llm_models.yml.
TYPE:
|
Source code in docs/microservices/chat/src/models/general.py
class ActiveLLMs(BaseModel):
"""Selection of available models for respective use cases.
Attributes:
model_config (ConfigDict): Used to ignore other services, which are defined in the config.
chat (list[str]): List containing available models for chat.
It may contain only a subset of all models in llm_models.yml.
"""
# if there are more services defined in the config: just ignore them
model_config = ConfigDict(extra="ignore")
chat: list[str]
LogLevel
Bases: StrEnum
Enum class specifying possible log levels.
Source code in docs/microservices/chat/src/models/general.py
class LogLevel(StrEnum):
"""Enum class specifying possible log levels."""
CRITICAL = "CRITICAL"
ERROR = "ERROR"
WARNING = "WARNING"
INFO = "INFO"
DEBUG = "DEBUG"
@classmethod
def _missing_(cls, value: object) -> None:
"""Convert strings to uppercase and recheck for existance."""
if isinstance(value, str):
value = value.upper()
for level in cls:
if level == value:
return level
return None
Settings
Bases: BaseModel
General Settings for the service.
| ATTRIBUTE | DESCRIPTION |
|---|---|
model_config |
Used to ignore other services, which are defined in the config.
TYPE:
|
service_name |
Name of service, i.e. 'chat'
TYPE:
|
active_llms |
Selection of available models for respective use cases.
TYPE:
|
log_level |
Minimal level of logging output given.
TYPE:
|
log_file_max_bytes |
(PositiveInt): Max file size for logfile
TYPE:
|
log_file_backup_count |
Number of log-files to loop over
TYPE:
|
log_file |
Write logfile there.
TYPE:
|
check_llm_api_interval_in_s |
Interval for checking all LLM APIs (seconds)
TYPE:
|
n_uvicorn_workers |
Number of parallel uvicorn instances.
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
ensure_log_dir |
Create the log directory after validation. |
Source code in docs/microservices/chat/src/models/general.py
class Settings(BaseModel):
"""General Settings for the service.
Attributes:
model_config (ConfigDict): Used to ignore other services, which are defined in the config.
service_name (str): Name of service, i.e. 'chat'
active_llms (ActiveLLMs): Selection of available models for respective use cases.
log_level (LogLevel): Minimal level of logging output given.
log_file_max_bytes: (PositiveInt): Max file size for logfile
log_file_backup_count (PositiveInt): Number of log-files to loop over
log_file (FilePath): Write logfile there.
check_llm_api_interval_in_s (PositiveInt): Interval for checking all LLM APIs (seconds)
n_uvicorn_workers (PositiveInt): Number of parallel uvicorn instances.
"""
model_config = ConfigDict(extra="ignore")
service_name: str = "Chat"
service_descripton: str = "Generation of chat completions using various LLMs."
n_uvicorn_workers: PositiveInt = 1
active_llms: ActiveLLMs
log_level: LogLevel = LogLevel.INFO
log_file_max_bytes: PositiveInt = 1 * 1024 * 1024
log_file_backup_count: PositiveInt = 3
log_file: FilePath = Path("/chat/logs/log")
# interval for checking all LLM APIs (seconds)
check_llm_api_interval_in_s: PositiveInt = 120
@model_validator(mode="after")
def ensure_log_dir(self) -> "Settings":
"""Create the log directory after validation."""
self.log_file.parent.mkdir(parents=True, exist_ok=True)
return self
ensure_log_dir
llms
pydantic model for LLM config.
| CLASS | DESCRIPTION |
|---|---|
APIAuth |
Defines Authentification settings for LLM. |
LLM |
This pydantic class defines the basic structure of a LLM config. |
LLMAPI |
Defines API-Connection to LLM. |
LLMConfig |
Base class as loaded from model_configs.yml. |
LLMInference |
Defines Inference parameters. |
LLMPromptConfig |
Defines the structure of a LLM prompt configuration. |
LLMPromptMaps |
Defines complete LLM prompt config. |
LLMPrompts |
Defines the selectable LLM Prompts. |
ReasoningConfig |
Configuration for reasoning-capable models. |
APIAuth
Bases: BaseModel
Defines Authentification settings for LLM.
| ATTRIBUTE | DESCRIPTION |
|---|---|
type |
Either 'token' or 'basic_auth'.
TYPE:
|
secret_path |
File path where the api token or credentials are stored.
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
get_auth_header |
Generate auth part of header for http request. |
Source code in docs/microservices/chat/src/models/llms.py
class APIAuth(BaseModel):
"""Defines Authentification settings for LLM.
Attributes:
type (Literal): Either 'token' or 'basic_auth'.
secret_path (FilePath): File path where the api token or credentials are stored.
"""
type: Literal["token", "basic_auth"]
secret_path: FilePath
@property
def secret(self) -> SecretStr:
"""Load secret variable as 'secret'."""
with open(self.secret_path) as file:
return SecretStr(file.read().strip())
def get_auth_header(self) -> str:
"""Generate auth part of header for http request.
Returns:
The auth header.
"""
auth_header = ""
if self.type == "basic_auth":
auth_header = f"Basic {base64.b64encode(self.secret.get_secret_value().encode()).decode()}"
elif self.type == "token":
auth_header = f"Bearer {self.secret.get_secret_value()}"
return auth_header
get_auth_header
Generate auth part of header for http request.
| RETURNS | DESCRIPTION |
|---|---|
str
|
The auth header. |
Source code in docs/microservices/chat/src/models/llms.py
def get_auth_header(self) -> str:
"""Generate auth part of header for http request.
Returns:
The auth header.
"""
auth_header = ""
if self.type == "basic_auth":
auth_header = f"Basic {base64.b64encode(self.secret.get_secret_value().encode()).decode()}"
elif self.type == "token":
auth_header = f"Bearer {self.secret.get_secret_value()}"
return auth_header
LLM
Bases: BaseModel
This pydantic class defines the basic structure of a LLM config.
| ATTRIBUTE | DESCRIPTION |
|---|---|
label |
Human-readable model name that can be presented to users.
TYPE:
|
model |
Model name which is used in API call, e.g. ollama tag.
TYPE:
|
prompt_map |
Prompt map name to load LLMPromptMaps from.
TYPE:
|
is_remote |
Is this LLM hosted at an external API?
TYPE:
|
max_context_tokens |
Total chat-history length for chat completion.
TYPE:
|
character_to_token |
Factor to convert character count into approximate token count.
TYPE:
|
api |
API information.
TYPE:
|
inference |
Inference parameters.
TYPE:
|
prompt_config |
Prompts.
TYPE:
|
reasoning_config |
Reasoning configuration.
TYPE:
|
Source code in docs/microservices/chat/src/models/llms.py
class LLM(BaseModel):
"""This pydantic class defines the basic structure of a LLM config.
Attributes:
label (str): Human-readable model name that can be presented to users.
model (str): Model name which is used in API call, e.g. ollama tag.
prompt_map (str): Prompt map name to load LLMPromptMaps from.
is_remote (bool): Is this LLM hosted at an external API?
max_context_tokens (int): Total chat-history length for chat completion.
character_to_token (float): Factor to convert character count into approximate token count.
api (LLMAPI): API information.
inference (LLMInference): Inference parameters.
prompt_config (LLMPromptConfig): Prompts.
reasoning_config (ReasoningConfig): Reasoning configuration.
"""
label: str
model: str
prompt_map: str
is_remote: bool
max_context_tokens: int = 6144
character_to_token: float = 4
api: LLMAPI
inference: LLMInference
prompt_config: LLMPromptConfig = None
reasoning_config: ReasoningConfig = ReasoningConfig()
LLMAPI
Bases: BaseModel
Defines API-Connection to LLM.
| ATTRIBUTE | DESCRIPTION |
|---|---|
url |
URL to model.
TYPE:
|
timeout |
Timout of Httpx Authentication Client.
TYPE:
|
health_check |
Relative path to health check, i.e. '/models'
TYPE:
|
auth |
Authentification settings for LLM
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
get_health_check_url |
Get the URL to check if API is available. |
Source code in docs/microservices/chat/src/models/llms.py
class LLMAPI(BaseModel):
"""Defines API-Connection to LLM.
Attributes:
url (AnyHttpUrl): URL to model.
timeout (float): Timout of Httpx Authentication Client.
health_check (str | None): Relative path to health check, i.e. '/models'
auth (APIAuth | None): Authentification settings for LLM
"""
url: AnyHttpUrl
timeout: float = 10
health_check: str | None = None
auth: APIAuth | None = None
def get_health_check_url(self) -> str:
"""Get the URL to check if API is available."""
if self.health_check:
# make sure to remove trailing and leading slashes to not override path
return urljoin(
str(self.url).rstrip("/") + "/",
self.health_check.lstrip("/"),
)
return str(self.url)
get_health_check_url
Get the URL to check if API is available.
Source code in docs/microservices/chat/src/models/llms.py
LLMConfig
Bases: BaseModel
Base class as loaded from model_configs.yml.
| ATTRIBUTE | DESCRIPTION |
|---|---|
model_config |
Used to ignore other services, which are defined in the config.
TYPE:
|
chat |
Dictionary containing a name and definition of LLMs's available for chat.
TYPE:
|
Source code in docs/microservices/chat/src/models/llms.py
class LLMConfig(BaseModel):
"""Base class as loaded from model_configs.yml.
Attributes:
model_config (ConfigDict): Used to ignore other services, which are defined in the config.
chat (dict[str, LLM]): Dictionary containing a name and definition of LLMs's available for chat.
"""
# if there are more services defined in the config: just ignore them
model_config = ConfigDict(extra="ignore")
chat: dict[str, LLM]
def __iter__(self) -> Iterator[str]:
"""Get 'keys' for automatic merge with i.e. LLMPromptConfig."""
return iter(self.__dict__.keys())
def __getitem__(self, service: str) -> dict[str, LLM]:
"""Get all LLMs for a given service (e.g. "chat", "rag").
Args:
service (str): The service name (e.g., "chat", "rag").
Returns:
All configered LLMs for the given service.
"""
return self.__getattribute__(service)
LLMInference
Bases: BaseModel
Defines Inference parameters.
| ATTRIBUTE | DESCRIPTION |
|---|---|
temperature |
Randomness / variation of the output High values indicate more creativity.
TYPE:
|
max_new_tokens |
Maximum number of tokens of the generated response.
TYPE:
|
top_p |
Threshold for sampling only from the most likely tokens.
TYPE:
|
Source code in docs/microservices/chat/src/models/llms.py
class LLMInference(BaseModel):
"""Defines Inference parameters.
Attributes:
temperature (PositiveFloat | None): Randomness / variation of the output High values indicate more creativity.
max_new_tokens (PositiveInt | None): Maximum number of tokens of the generated response.
top_p (PositiveFloat | None): Threshold for sampling only from the most likely tokens.
"""
temperature: PositiveFloat | None = 0.7
max_new_tokens: PositiveInt | None = 2048
top_p: PositiveFloat | None = 0.9
LLMPromptConfig
Bases: BaseModel
Defines the structure of a LLM prompt configuration.
| ATTRIBUTE | DESCRIPTION |
|---|---|
model_config |
Used to ignore other services, which are defined in the config.
TYPE:
|
system |
System prompt.
TYPE:
|
user |
User prompt.
TYPE:
|
assistant |
Assistant prompt.
TYPE:
|
Source code in docs/microservices/chat/src/models/llms.py
class LLMPromptConfig(BaseModel):
"""Defines the structure of a LLM prompt configuration.
Attributes:
model_config (ConfigDict): Used to ignore other services, which are defined in the config.
system (LLMPrompts): System prompt.
user (LLMPrompts | None): User prompt.
assistant (LLMPrompts | None): Assistant prompt.
"""
# if there are more prompt types defined that are not used in this service: just ignore them
model_config = ConfigDict(extra="ignore")
system: LLMPrompts
user: LLMPrompts | None = None
assistant: LLMPrompts | None = None
LLMPromptMaps
Bases: BaseModel
Defines complete LLM prompt config.
| ATTRIBUTE | DESCRIPTION |
|---|---|
model_config |
Used to ignore other services, which are defined in the config.
TYPE:
|
chat |
Dictionary containing a name and prompts of LLMs's available for chat.
TYPE:
|
Source code in docs/microservices/chat/src/models/llms.py
class LLMPromptMaps(BaseModel):
"""Defines complete LLM prompt config.
Attributes:
model_config (ConfigDict): Used to ignore other services, which are defined in the config.
chat (dict[str, LLMPromptConfig]): Dictionary containing a name and prompts of LLMs's available for chat.
"""
model_config = ConfigDict(extra="ignore")
chat: dict[str, LLMPromptConfig]
def __iter__(self) -> Iterator[str]:
"""Get 'keys' for automatic merge with i.e. LLMConfig."""
return iter(self.__dict__.keys())
LLMPrompts
Bases: BaseModel
Defines the selectable LLM Prompts.
| ATTRIBUTE | DESCRIPTION |
|---|---|
model_config |
Used to ignore other services, which are defined in the config.
TYPE:
|
generate |
Prompt for model.
TYPE:
|
Source code in docs/microservices/chat/src/models/llms.py
class LLMPrompts(BaseModel):
"""Defines the selectable LLM Prompts.
Attributes:
model_config (ConfigDict): Used to ignore other services, which are defined in the config.
generate (str): Prompt for model.
"""
# if there are more prompts defined that are not used in this service: just ignore them
model_config = ConfigDict(extra="ignore")
generate: str = ""
ReasoningConfig
Bases: BaseModel
Configuration for reasoning-capable models.
| ATTRIBUTE | DESCRIPTION |
|---|---|
is_reasoning_model |
Whether this model supports reasoning streams.
TYPE:
|
reasoning_start_marker |
Start marker for the reasoning section.
TYPE:
|
reasoning_end_marker |
End marker for the reasoning section.
TYPE:
|
Source code in docs/microservices/chat/src/models/llms.py
class ReasoningConfig(BaseModel):
"""Configuration for reasoning-capable models.
Attributes:
is_reasoning_model (bool): Whether this model supports reasoning streams.
reasoning_start_marker (str | None): Start marker for the reasoning section.
reasoning_end_marker (str | None): End marker for the reasoning section.
"""
is_reasoning_model: bool = False
reasoning_start_marker: None | str = None
reasoning_end_marker: None | str = None
openai_custom_auth
Costumized Httpx Authentication Client.
| CLASS | DESCRIPTION |
|---|---|
CustomAuthClient |
Custom HTTP transport for OpenAI client. |
CustomAuthClient
Bases: AsyncClient
Custom HTTP transport for OpenAI client.
This class supports both Bearer Token Authentication and Basic Authentication.
If auth_type is 'token', the secret is expected to be the API key.
If auth_type is 'basic_auth', the secret is expected to be a base64-encoded string of 'username:password'.
| ATTRIBUTE | DESCRIPTION |
|---|---|
auth_header |
Authentication header for the httpx client.
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
a_send |
Asynchronous method for sending HTTP requests. |
send |
Synchronous method for sending HTTP requests. |
Source code in docs/microservices/chat/src/openai_custom_auth.py
class CustomAuthClient(httpx.AsyncClient):
"""Custom HTTP transport for OpenAI client.
This class supports both Bearer Token Authentication and Basic Authentication.
If `auth_type` is 'token', the `secret` is expected to be the API key.
If `auth_type` is 'basic_auth', the `secret` is expected to be a base64-encoded string of 'username:password'.
Attributes:
auth_header (str): Authentication header for the httpx client.
Methods:
a_send(request, *args, **kwargs): Asynchronous method for sending HTTP requests.
send(request, *args, **kwargs): Synchronous method for sending HTTP requests.
"""
def __init__(
self,
secret: str | None = None,
auth_type: Literal["token", "basic_auth"] | None = None,
*args: object,
**kwargs: object,
) -> None:
"""Initialize the custom HTTP transport for OpenAI client.
Args:
secret (str, optional): OpenAI API Key or Basic Auth credentials (username:password).
This is required depending on the `auth_type`. If `auth_type`
is 'token', the `secret` should be the API key. If
`auth_type` is 'basic_auth', the `secret` should be a
base64-encoded string of 'username:password'.
auth_type (str, optional): The type of authentication to use. It can be 'token' or 'basic_auth'.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Raises:
ValueError: If `auth_type` is provided but `secret` is not provided.
"""
super().__init__(*args, **kwargs)
self.auth_header = ""
if auth_type and not secret:
raise ValueError("API credentials are required but missing.")
if auth_type == "token":
self.auth_header = f"Bearer {secret}"
elif auth_type == "basic_auth":
encoded_credentials = base64.b64encode(secret.encode()).decode()
self.auth_header = f"Basic {encoded_credentials}"
async def a_send(
self,
request: httpx.Request,
*args: object,
**kwargs: object,
) -> httpx.Response:
"""Asynchronous version of the send method to handle requests asynchronously."""
if "Authorization" in request.headers:
del request.headers["Authorization"]
if self.auth_header:
request.headers["Authorization"] = self.auth_header
return await super().a_send(request, *args, **kwargs)
def send(
self,
request: httpx.Request,
*args: object,
**kwargs: object,
) -> httpx.Response:
"""Version of the send method to handle requests asynchronously."""
if "Authorization" in request.headers:
del request.headers["Authorization"]
if self.auth_header:
request.headers["Authorization"] = self.auth_header
return super().send(request, *args, **kwargs)
a_send
async
Asynchronous version of the send method to handle requests asynchronously.
Source code in docs/microservices/chat/src/openai_custom_auth.py
async def a_send(
self,
request: httpx.Request,
*args: object,
**kwargs: object,
) -> httpx.Response:
"""Asynchronous version of the send method to handle requests asynchronously."""
if "Authorization" in request.headers:
del request.headers["Authorization"]
if self.auth_header:
request.headers["Authorization"] = self.auth_header
return await super().a_send(request, *args, **kwargs)
send
Version of the send method to handle requests asynchronously.
Source code in docs/microservices/chat/src/openai_custom_auth.py
def send(
self,
request: httpx.Request,
*args: object,
**kwargs: object,
) -> httpx.Response:
"""Version of the send method to handle requests asynchronously."""
if "Authorization" in request.headers:
del request.headers["Authorization"]
if self.auth_header:
request.headers["Authorization"] = self.auth_header
return super().send(request, *args, **kwargs)
settings
Load all settings from a central place, not hidden in utils.
utils
Utils functions for logging, LLM availability check and configuration processing.
| MODULE | DESCRIPTION |
|---|---|
base_logger |
Set up the root logger for the entire application. This logger will log messages to the console and a file. |
check_model_api_availability |
This module provides functions to check LLM-APIs for availability. |
process_configs |
Methods to load and config and start checks of config integrity. |
base_logger
Set up the root logger for the entire application. This logger will log messages to the console and a file.
| FUNCTION | DESCRIPTION |
|---|---|
setup_logger |
Initialize the logger with the desired log level and add handlers. |
setup_logger
Initialize the logger with the desired log level and add handlers.
Sets up the root logger, which all other loggers inherit from. Adds file, console and exit handlers to the logger and sets the format.
Source code in docs/microservices/chat/src/utils/base_logger.py
def setup_logger() -> None:
"""Initialize the logger with the desired log level and add handlers.
Sets up the root logger, which all other loggers inherit from.
Adds file, console and exit handlers to the logger and sets the format.
"""
# root logger, all other loggers inherit from this
logger = logging.getLogger()
# create different handlers for log file and console
file_handler = logging.handlers.RotatingFileHandler(
filename=settings.log_file,
maxBytes=settings.log_file_max_bytes,
backupCount=settings.log_file_backup_count,
)
console_handler = logging.StreamHandler()
# define log format and set for each handler
formatter = logging.Formatter(
fmt="%(asctime)s - %(levelname)8s - %(module)s - %(funcName)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S%z",
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# add handlers to the logger
logger.addHandler(file_handler)
logger.addHandler(console_handler)
logger.setLevel(settings.log_level)
check_model_api_availability
This module provides functions to check LLM-APIs for availability.
To check a certain LLM use await check_model_api(llm).
To get all LLMs that are activated in configs/general.yml, use await get_available_llms().
| FUNCTION | DESCRIPTION |
|---|---|
get_available_llms |
Returns a list of available LLMs. |
is_model_api_available |
Check if API is available using credentials. |
get_available_llms
async
Returns a list of available LLMs.
| RETURNS | DESCRIPTION |
|---|---|
list[dict[str, str]]
|
List of available LLMs with selected infos. |
Source code in docs/microservices/chat/src/utils/check_model_api_availability.py
async def get_available_llms() -> list[dict[str, str]]:
"""Returns a list of available LLMs.
Returns:
List of available LLMs with selected infos.
"""
available_llms = []
# iterate over model_groups (services), i.e. chat, RAG, embedding, ...
for model_group_key in llm_config:
logger.debug(f"Checking APIs for {model_group_key}-LLMs.")
model_group = llm_config[model_group_key]
for llm_name, llm in model_group.items():
logger.debug(f"Checking availability of {llm_name}")
if await is_model_api_available(llm.api, llm_name):
llm_dict = llm.model_dump(include=["label", "is_remote"])
llm_dict["name"] = llm_name
available_llms.append(llm_dict)
return available_llms
is_model_api_available
async
Check if API is available using credentials.
Availability is checked by sending a HEAD, GET, or POST request. If a health_check endpoint is provided, the request is sent to that endpoint; otherwise, it is sent to the main API URL.
| PARAMETER | DESCRIPTION |
|---|---|
llm_api
|
the LLMAPI instance to check
TYPE:
|
llm_name
|
ID of the LLM as used in the config file as reference
TYPE:
|
timeout_in_s
|
http timeout in seconds; defaults to 10
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
bool
|
Whether the model API is available or not - True if the API is available. |
Source code in docs/microservices/chat/src/utils/check_model_api_availability.py
async def is_model_api_available(
llm_api: LLMAPI,
llm_name: str,
timeout_in_s: int = 10,
) -> bool:
"""Check if API is available using credentials.
Availability is checked by sending a HEAD, GET, or POST request. If a health_check endpoint is provided,
the request is sent to that endpoint; otherwise, it is sent to the main API URL.
Args:
llm_api (LLMAPI): the LLMAPI instance to check
llm_name (str): ID of the LLM as used in the config file as reference
timeout_in_s (int): http timeout in seconds; defaults to 10
Returns:
Whether the model API is available or not - True if the API is available.
"""
headers = {"Content-type": "application/json"}
# Authorization is not always needed
if llm_api.auth:
headers["Authorization"] = llm_api.auth.get_auth_header()
url = llm_api.get_health_check_url()
# test health check endpoint with GET, HEAD and POST
try:
async with httpx.AsyncClient() as client:
response = await client.get(
url,
headers=headers,
timeout=timeout_in_s,
)
logger.debug(
f"{url} health check via GET request: {response.status_code=}, LLM: '{llm_name}"
)
# test with HEAD
if response.status_code != HTTPStatus.OK:
async with httpx.AsyncClient() as client:
response = await client.head(
url,
headers=headers,
timeout=timeout_in_s,
)
logger.debug(
f"{url} health check via HEAD request: {response.status_code=}, LLM: '{llm_name}"
)
# test with POST
if response.status_code != HTTPStatus.OK:
async with httpx.AsyncClient() as client:
response = await client.post(
url,
headers=headers,
timeout=timeout_in_s,
)
logger.debug(
f"{url} health check via POST request: {response.status_code=}, LLM: '{llm_name}"
)
except Exception as e:
logger.warning(
f"Exception when trying to reach LLM API. Error: {e}, LLM: '{llm_name}"
)
return False
if response.status_code != HTTPStatus.OK:
logger.warning(
f"LLM unavailable: Could not establish connection to LLM-API. LLM: '{llm_name}"
)
return response.status_code == HTTPStatus.OK
process_configs
Methods to load and config and start checks of config integrity.
| FUNCTION | DESCRIPTION |
|---|---|
load_all_configs |
Load config settings from respective paths. |
load_from_yml_in_pydantic_model |
Load config from 'list_of_yaml_paths' into given pydantic-Model. |
load_yaml |
Load yaml. |
merge_specific_cfgs_in_place |
Copy Prompt-config to apropriate section in general llm_config. Edit in-place! |
postprocess_configs |
Post-Process loaded configs. |
remove_unactive_models |
Remove models from all useacases, if they are not in 'active_models'. Edit in-place! |
load_all_configs
Load config settings from respective paths.
| PARAMETER | DESCRIPTION |
|---|---|
general_config_paths
|
Path to config, matching 'Settings'
TYPE:
|
path_to_llm_prompts
|
Path to config, matching 'LLMPromptMaps'
TYPE:
|
path_to_llm_model_configs
|
Path to config, matching 'LLMConfig'
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
tuple[Settings, LLMConfig]
|
Config loaded into their Pydantic Model. |
Source code in docs/microservices/chat/src/utils/process_configs.py
def load_all_configs(
general_config_paths: Path,
path_to_llm_prompts: Path,
path_to_llm_model_configs: Path,
) -> tuple[Settings, LLMConfig]:
"""Load config settings from respective paths.
Args:
general_config_paths (Path): Path to config, matching 'Settings'
path_to_llm_prompts (Path): Path to config, matching 'LLMPromptMaps'
path_to_llm_model_configs (Path): Path to config, matching 'LLMConfig'
Returns:
Config loaded into their Pydantic Model.
"""
settings = load_from_yml_in_pydantic_model(general_config_paths, Settings)
llm_prompts = load_from_yml_in_pydantic_model(path_to_llm_prompts, LLMPromptMaps)
llm_config = load_from_yml_in_pydantic_model(path_to_llm_model_configs, LLMConfig)
postprocess_configs(settings, llm_prompts, llm_config)
return settings, llm_config
load_from_yml_in_pydantic_model
Load config from 'list_of_yaml_paths' into given pydantic-Model.
| PARAMETER | DESCRIPTION |
|---|---|
yaml_path
|
Yaml to load
TYPE:
|
pydantic_reference_model
|
pydantic model to load yaml into
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
BaseModel
|
BaseModel derived pydantic data class. |
Source code in docs/microservices/chat/src/utils/process_configs.py
def load_from_yml_in_pydantic_model(
yaml_path: Path, pydantic_reference_model: BaseModel
) -> BaseModel:
"""Load config from 'list_of_yaml_paths' into given pydantic-Model.
Args:
yaml_path (Path): Yaml to load
pydantic_reference_model (BaseModel): pydantic model to load yaml into
Returns:
BaseModel derived pydantic data class.
"""
data = load_yaml(yaml_path)
try:
pydantic_class = pydantic_reference_model(**data)
logger.info(f"Config loaded from: '{yaml_path}'")
return pydantic_class
except ValidationError as e:
logger.critical(f"Error loading config: '{e}'")
raise e
load_yaml
Load yaml.
| PARAMETER | DESCRIPTION |
|---|---|
yaml_path
|
Path to yaml
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
dict[str, Any]
|
Content of the loaded yaml. |
Source code in docs/microservices/chat/src/utils/process_configs.py
def load_yaml(yaml_path: Path) -> dict[str, Any]:
"""Load yaml.
Args:
yaml_path (list[Path]): Path to yaml
Returns:
Content of the loaded yaml.
"""
if not yaml_path.exists():
logger.error(f"Invalid path: '{yaml_path}'")
raise FileNotFoundError
with open(yaml_path) as file:
return yaml.safe_load(file)
merge_specific_cfgs_in_place
Copy Prompt-config to apropriate section in general llm_config. Edit in-place!
Only if 'prompt_map' in LLMConfig can be found in LLMPromptMaps, it will be merged. i.e. try to generalize sth. like this:
cfg["phi3:mini"].prompts = prompt[cfg["phi3:mini"].prompt_map]
| PARAMETER | DESCRIPTION |
|---|---|
llm_config
|
Target for merge of Prompt parameter
TYPE:
|
llm_prompts
|
Source to merge Prompt parameter from
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
bool
|
True if no problems occurred. |
Source code in docs/microservices/chat/src/utils/process_configs.py
def merge_specific_cfgs_in_place(
llm_config: LLMConfig, llm_prompts: LLMPromptMaps
) -> bool:
"""Copy Prompt-config to apropriate section in general llm_config. Edit in-place!
Only if 'prompt_map' in LLMConfig can be found in LLMPromptMaps, it will be merged.
i.e. try to generalize sth. like this:
cfg["phi3:mini"].prompts = prompt[cfg["phi3:mini"].prompt_map]
Args:
llm_config (LLMConfig): Target for merge of Prompt parameter
llm_prompts (LLMPromptMaps): Source to merge Prompt parameter from
Returns:
True if no problems occurred.
"""
no_issues_occurred = True
for usecase in llm_config:
# load identical usecases, i.e. chat, RAG
try:
cfg = getattr(llm_config, usecase)
prompt = getattr(llm_prompts, usecase)
except AttributeError:
logger.warning(
f"Usecase '{usecase}' not matching between prompt- and general llm config. \
Skipping cfg-merge for '{usecase}' .."
)
no_issues_occurred = False
continue
# copy prompt config to its usecase- and model-counterpart
for model in cfg:
prompt_map_to_use = cfg[model].prompt_map
if prompt_map_to_use in prompt:
cfg[model].prompt_config = prompt[prompt_map_to_use]
else:
logger.warning(
f"'prompt_map: {prompt_map_to_use}' from LLM-config not in prompt-config for '{usecase}'. \
Skipping .."
)
no_issues_occurred = False
continue
return no_issues_occurred
postprocess_configs
Post-Process loaded configs.
Remove unused models (from settings.active_llms), merge LLMPromptMaps into LLMConfig.
| PARAMETER | DESCRIPTION |
|---|---|
settings
|
Config matching pydantic 'Settings'.
TYPE:
|
llm_prompts
|
Config matching pydantic 'LLMPromptMaps'.
TYPE:
|
llm_config
|
Config matching pydantic 'LLMConfig'.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
LLMConfig
|
Merged and filtered LLM configuration. |
Source code in docs/microservices/chat/src/utils/process_configs.py
def postprocess_configs(
settings: Settings, llm_prompts: LLMPromptMaps, llm_config: LLMConfig
) -> LLMConfig:
"""Post-Process loaded configs.
Remove unused models (from settings.active_llms), merge LLMPromptMaps into LLMConfig.
Args:
settings (Settings): Config matching pydantic 'Settings'.
llm_prompts (LLMPromptMaps): Config matching pydantic 'LLMPromptMaps'.
llm_config (LLMConfig): Config matching pydantic 'LLMConfig'.
Returns:
Merged and filtered LLM configuration.
"""
remove_unactive_models(llm_config, settings.active_llms)
merge_specific_cfgs_in_place(llm_config, llm_prompts)
return llm_config
remove_unactive_models
Remove models from all useacases, if they are not in 'active_models'. Edit in-place!
| PARAMETER | DESCRIPTION |
|---|---|
input_config
|
Config to change
TYPE:
|
active_models
|
Models to keep - remove other
TYPE:
|
Source code in docs/microservices/chat/src/utils/process_configs.py
def remove_unactive_models(input_config: LLMConfig, active_models: list[str]) -> None:
"""Remove models from all useacases, if they are not in 'active_models'. Edit in-place!
Args:
input_config (LLMConfig): Config to change
active_models (list[str]): Models to keep - remove other
"""
for usecase in input_config:
cfg = getattr(input_config, usecase)
active_models_for_usecase = getattr(active_models, usecase)
for model in list(cfg):
if model not in active_models_for_usecase:
cfg.pop(model)