Core
core
| MODULE | DESCRIPTION |
|---|---|
main |
Define a default main entry point of the application. |
src |
Source code of the core containing endpoints of the microservices and utilities. |
main
Define a default main entry point of the application.
| FUNCTION | DESCRIPTION |
|---|---|
main |
The main entry point of the application. |
main
The main entry point of the application.
Source code in docs/microservices/core/main.py
src
Source code of the core containing endpoints of the microservices and utilities.
| MODULE | DESCRIPTION |
|---|---|
WebsocketManager |
WebSocketManager class keeping track of all websocket connections. |
app |
Initialize the app. |
authentication |
Expose the API for authentication in F13. |
endpoints |
Modules for sending HTTP requests to endpoints of internal microservices. |
feedback |
Models and functions to store feedback for chat, RAG and summary. |
models |
Data model classes for loading and validating API and configuration parameters. |
settings |
Load all settings from a central place, not hidden in utils. |
utils |
Utils functions for logging, LLM availability check and configuration processing. |
WebsocketManager
WebSocketManager class keeping track of all websocket connections.
| CLASS | DESCRIPTION |
|---|---|
WebSocketManager |
WebSocketManager keeps track of all websocket connections and dispatches messages to the appropriate connection. |
WebSocketManager
WebSocketManager keeps track of all websocket connections and dispatches messages to the appropriate connection.
| ATTRIBUTE | DESCRIPTION |
|---|---|
open_websocket_handler |
A dictionary mapping connection identifiers to their corresponding WebSocket handlers.
TYPE:
|
logger |
Logger instance for logging events and errors; can be None if not set.
TYPE:
|
message_queue |
A queue that holds messages to be sent to connected clients.
TYPE:
|
sender_thread |
Background thread responsible for processing the message queue and sending messages.
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
add_websocket |
Add a websocket connection to the dictionary of open websocket connections. |
check_connection_id_is_unique |
Check whether a given connection ID is already in use. |
critical |
Send a logging message with CRITICAL level. |
debug |
Send a logging message with DEBUG level. |
error |
Send a logging message with ERROR level. |
info |
Send a logging message with INFO level. |
remove_websocket |
Remove a WebSocket connection from the active connections. |
send_message |
Put a message into the message queue. |
set_logger |
Set the Python logger to use for logging. |
warning |
Send a logging message with WARNING level. |
Source code in docs/microservices/core/src/WebsocketManager.py
class WebSocketManager:
"""WebSocketManager keeps track of all websocket connections and dispatches messages to the appropriate connection.
Attributes:
open_websocket_handler (dict): A dictionary mapping connection identifiers to their corresponding WebSocket
handlers.
logger (Optional[logging.Logger]): Logger instance for logging events and errors; can be None if not set.
message_queue (queue.Queue): A queue that holds messages to be sent to connected clients.
sender_thread (threading.Thread): Background thread responsible for processing the message queue and sending
messages.
"""
def __init__(self) -> None:
"""Initialize the WebSocket message dispatcher.
This constructor sets up internal structures required for handling
WebSocket communication and message dispatching in a background thread.
Attributes:
open_websocket_handler (dict): A mapping of active WebSocket handlers.
logger (Logger or None): Optional logger instance for internal logging.
message_queue (queue.Queue): Queue for storing outgoing messages to be sent.
sender_thread (threading.Thread): Background thread that runs `_run_loop`
to process the message queue.
"""
self.open_websocket_handler = dict()
self.logger = None
self.message_queue = queue.Queue()
self.sender_thread = threading.Thread(target=self._run_loop)
self.sender_thread.daemon = True
self.sender_thread.start()
def _run_loop(self) -> None:
"""Start the message sender in a second thread."""
asyncio.run(self._send_messages())
def check_connection_id_is_unique(self, connection_id: str) -> bool:
"""Check whether a given connection ID is already in use.
Args:
connection_id (str): The ID of the WebSocket connection.
Returns:
True if the connection ID is unique.
"""
return connection_id not in self.open_websocket_handler
def add_websocket(self, websocket: WebSocket, connection_id: str) -> None:
"""Add a websocket connection to the dictionary of open websocket connections.
Usually this method is invoked manually when a websocket connection is being established.
Args:
websocket (WebSocket): The websocket object to be removed.
connection_id (str): The ID of the websocket connection.
"""
self.open_websocket_handler[str(connection_id)] = websocket
if self.logger:
self.logger.debug(f"WebSocket connection added: {connection_id}")
def remove_websocket(self, connection_id: str) -> None:
"""Remove a WebSocket connection from the active connections.
Typically called when a WebSocket is closed.
Args:
connection_id (str): Identifier of the WebSocket connection.
"""
if connection_id in self.open_websocket_handler:
del self.open_websocket_handler[connection_id]
if self.logger:
self.logger.debug(f"WebSocket connection removed: {connection_id}")
def set_logger(self, logger: logging.Logger) -> None:
"""Set the Python logger to use for logging.
Args:
logger (logging.Logger): The logger object to use.
"""
self.logger = logger
async def _send_messages(self) -> NoReturn:
"""Pull messages from message queue and sends it to the websocket of a specific user."""
while True:
if not self.message_queue.empty():
message = self.message_queue.get()
try:
connection_id = message["connection_id"]
msg = message["message"]
ws = self.open_websocket_handler[connection_id]
await ws.send_text(msg)
except Exception:
if self.logger:
self.logger.info(f"Websocket for {connection_id} not found")
await asyncio.sleep(0.1)
def send_message(self, message: str, connection_id: str) -> None:
"""Put a message into the message queue.
Args:
message (str): Text message.
connection_id (str): Connection ID of the addressed WebSocket.
"""
# ensure connection_id is a string
connection_id = str(connection_id)
# try to find the destination websocket
try:
if self.logger:
self.logger.debug(
f"Sending Message via websocket connection {connection_id}: {message}"
)
self.message_queue.put({"message": message, "connection_id": connection_id})
except Exception:
if self.logger:
self.logger.info(f"Websocket for {connection_id} not found")
def debug(self, message: str, connection_id: str) -> None:
"""Send a logging message with DEBUG level.
Args:
message (str): The log message to send.
connection_id (str): The ID of the connection associated with this log entry.
"""
self._send_log_message("DEBUG", message, connection_id)
def info(self, message: str, connection_id: str) -> None:
"""Send a logging message with INFO level.
Args:
message (str): The log message to send.
connection_id (str): The ID of the connection associated with this log entry.
"""
self._send_log_message("INFO", message, connection_id)
def warning(self, message: str, connection_id: str) -> None:
"""Send a logging message with WARNING level.
Args:
message (str): The log message to send.
connection_id (str): The ID of the connection associated with this log entry.
"""
self._send_log_message("WARNING", message, connection_id)
def error(self, message: str, connection_id: str) -> None:
"""Send a logging message with ERROR level.
Args:
message (str): The log message to send.
connection_id (str): The ID of the connection associated with this log entry.
"""
self._send_log_message("ERROR", message, connection_id)
def critical(self, message: str, connection_id: str) -> None:
"""Send a logging message with CRITICAL level.
Args:
message (str): The log message to send.
connection_id (str): The ID of the connection associated with this log entry.
"""
self._send_log_message("CRITICAL", message, connection_id)
def _send_log_message(
self, msg_type: str, message: str, connection_id: str
) -> None:
"""Format to json and send as log message."""
self.send_message(
message=json.dumps(dict(type=msg_type, message=message)),
connection_id=connection_id,
)
add_websocket
Add a websocket connection to the dictionary of open websocket connections.
Usually this method is invoked manually when a websocket connection is being established.
| PARAMETER | DESCRIPTION |
|---|---|
websocket
|
The websocket object to be removed.
TYPE:
|
connection_id
|
The ID of the websocket connection.
TYPE:
|
Source code in docs/microservices/core/src/WebsocketManager.py
def add_websocket(self, websocket: WebSocket, connection_id: str) -> None:
"""Add a websocket connection to the dictionary of open websocket connections.
Usually this method is invoked manually when a websocket connection is being established.
Args:
websocket (WebSocket): The websocket object to be removed.
connection_id (str): The ID of the websocket connection.
"""
self.open_websocket_handler[str(connection_id)] = websocket
if self.logger:
self.logger.debug(f"WebSocket connection added: {connection_id}")
check_connection_id_is_unique
Check whether a given connection ID is already in use.
| PARAMETER | DESCRIPTION |
|---|---|
connection_id
|
The ID of the WebSocket connection.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
bool
|
True if the connection ID is unique. |
Source code in docs/microservices/core/src/WebsocketManager.py
critical
Send a logging message with CRITICAL level.
| PARAMETER | DESCRIPTION |
|---|---|
message
|
The log message to send.
TYPE:
|
connection_id
|
The ID of the connection associated with this log entry.
TYPE:
|
Source code in docs/microservices/core/src/WebsocketManager.py
debug
Send a logging message with DEBUG level.
| PARAMETER | DESCRIPTION |
|---|---|
message
|
The log message to send.
TYPE:
|
connection_id
|
The ID of the connection associated with this log entry.
TYPE:
|
Source code in docs/microservices/core/src/WebsocketManager.py
error
Send a logging message with ERROR level.
| PARAMETER | DESCRIPTION |
|---|---|
message
|
The log message to send.
TYPE:
|
connection_id
|
The ID of the connection associated with this log entry.
TYPE:
|
Source code in docs/microservices/core/src/WebsocketManager.py
info
Send a logging message with INFO level.
| PARAMETER | DESCRIPTION |
|---|---|
message
|
The log message to send.
TYPE:
|
connection_id
|
The ID of the connection associated with this log entry.
TYPE:
|
Source code in docs/microservices/core/src/WebsocketManager.py
remove_websocket
Remove a WebSocket connection from the active connections.
Typically called when a WebSocket is closed.
| PARAMETER | DESCRIPTION |
|---|---|
connection_id
|
Identifier of the WebSocket connection.
TYPE:
|
Source code in docs/microservices/core/src/WebsocketManager.py
def remove_websocket(self, connection_id: str) -> None:
"""Remove a WebSocket connection from the active connections.
Typically called when a WebSocket is closed.
Args:
connection_id (str): Identifier of the WebSocket connection.
"""
if connection_id in self.open_websocket_handler:
del self.open_websocket_handler[connection_id]
if self.logger:
self.logger.debug(f"WebSocket connection removed: {connection_id}")
send_message
Put a message into the message queue.
| PARAMETER | DESCRIPTION |
|---|---|
message
|
Text message.
TYPE:
|
connection_id
|
Connection ID of the addressed WebSocket.
TYPE:
|
Source code in docs/microservices/core/src/WebsocketManager.py
def send_message(self, message: str, connection_id: str) -> None:
"""Put a message into the message queue.
Args:
message (str): Text message.
connection_id (str): Connection ID of the addressed WebSocket.
"""
# ensure connection_id is a string
connection_id = str(connection_id)
# try to find the destination websocket
try:
if self.logger:
self.logger.debug(
f"Sending Message via websocket connection {connection_id}: {message}"
)
self.message_queue.put({"message": message, "connection_id": connection_id})
except Exception:
if self.logger:
self.logger.info(f"Websocket for {connection_id} not found")
set_logger
Set the Python logger to use for logging.
| PARAMETER | DESCRIPTION |
|---|---|
logger
|
The logger object to use.
TYPE:
|
warning
Send a logging message with WARNING level.
| PARAMETER | DESCRIPTION |
|---|---|
message
|
The log message to send.
TYPE:
|
connection_id
|
The ID of the connection associated with this log entry.
TYPE:
|
Source code in docs/microservices/core/src/WebsocketManager.py
app
Initialize the app.
| CLASS | DESCRIPTION |
|---|---|
State |
The lifespan state of the FastAPI application. |
| FUNCTION | DESCRIPTION |
|---|---|
construct_app |
Construct the FastAPI application. |
get_jwt_params_from_openid_config |
Query the openid-configuration and extract all needed parameters for jwt. |
lifespan |
Hook to provide lifespan state to our FastAPI application. |
register_routers |
Register routers on the FastAPI app based on the configuration. |
State
Bases: TypedDict
The lifespan state of the FastAPI application.
Source code in docs/microservices/core/src/app.py
construct_app
Construct the FastAPI application.
Source code in docs/microservices/core/src/app.py
def construct_app(
lifespan: Callable[[FastAPI], AsyncIterator[State]] = lifespan,
settings: None | Settings = None,
) -> FastAPI:
"""Construct the FastAPI application."""
app = FastAPI(lifespan=lifespan)
if settings is None:
settings = get_general_settings()
app.title = settings.service_name
app.description = settings.service_descripton
app.version = "2.0.0"
app.state.settings = settings
if settings.enabled_routes.feedback:
app.state.db_session = create_db_session(settings)
else:
app.state.db_session = None
app.add_middleware(
CORSMiddleware,
allow_origins=[str(origin).rstrip("/") for origin in settings.allow_origins],
allow_methods=["*"],
allow_headers=["*"],
)
register_routers(app, settings)
return app
get_jwt_params_from_openid_config
Query the openid-configuration and extract all needed parameters for jwt.
| PARAMETER | DESCRIPTION |
|---|---|
settings
|
The application settings.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
tuple[PyJWKClient, Sequence[str]]
|
A tuple of the JWKs client and the supported signing algorithms. |
Source code in docs/microservices/core/src/app.py
def get_jwt_params_from_openid_config(
settings: Settings,
) -> tuple[jwt.PyJWKClient, Sequence[str]]:
"""Query the openid-configuration and extract all needed parameters for jwt.
Args:
settings: The application settings.
Returns:
A tuple of the JWKs client and the supported signing algorithms.
"""
logger.debug(
"Querying openid-configuration from %s",
settings.authentication.openid_configuration_url,
)
resp_oid_config = requests.get(
settings.authentication.openid_configuration_url, timeout=5
)
oid_config = resp_oid_config.json()
logger.debug(oid_config)
if "error" in oid_config:
logger.error(
"Could not obtain OpenID configuration from %s: %s",
settings.authentication.openid_configuration_url,
oid_config["error"],
)
raise Exception(oid_config["error"])
logger.debug("Decoded oidc config: %r", oid_config)
jwks_client = jwt.PyJWKClient(oid_config["jwks_uri"])
jwt_signign_algorithms = oid_config["id_token_signing_alg_values_supported"]
return jwks_client, jwt_signign_algorithms
lifespan
async
Hook to provide lifespan state to our FastAPI application.
Source code in docs/microservices/core/src/app.py
@contextlib.asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[State]:
"""Hook to provide lifespan state to our FastAPI application."""
# 'app.state.settings' is set in 'construct_app'.
settings = app.state.settings
logger.debug("Running with settings:\n%s", settings)
if settings.authentication.guest_mode:
jwks_client = None
jwt_signign_algorithms = None
else:
jwks_client, jwt_signign_algorithms = get_jwt_params_from_openid_config(
settings
)
if settings.enabled_routes.feedback:
db_session = app.state.db_session
scheduler = BackgroundScheduler()
scheduler.add_job(
functools.partial(cleanup_userfeedback, db_session), "interval", minutes=60
)
scheduler.start()
else:
db_session = None
scheduler = None
yield {
"settings": settings,
"jwks_client": jwks_client,
"jwt_signign_algorithms": jwt_signign_algorithms,
"db_session": db_session,
"permissions_cache": InMemoryPermissionsCache(),
}
if scheduler:
scheduler.shutdown()
register_routers
Register routers on the FastAPI app based on the configuration.
This function conditionally includes route handlers for various features (chat, RAG, summary, feedback, and transcription) into the FastAPI application based on their enabled status in the settings configuration. The core router is always included. Each router inclusion or exclusion is logged for monitoring.
| PARAMETER | DESCRIPTION |
|---|---|
app
|
The FastAPI application instance to register routers on.
TYPE:
|
settings
|
Configuration object containing enabled_routes settings that determine which routers should be registered.
TYPE:
|
Source code in docs/microservices/core/src/app.py
def register_routers(app: FastAPI, settings: Settings) -> None:
"""Register routers on the FastAPI app based on the configuration.
This function conditionally includes route handlers for various features
(chat, RAG, summary, feedback, and transcription) into the FastAPI application
based on their enabled status in the settings configuration. The core router
is always included. Each router inclusion or exclusion is logged for monitoring.
Args:
app (FastAPI): The FastAPI application instance to register routers on.
settings (Settings): Configuration object containing enabled_routes settings
that determine which routers should be registered.
"""
app.include_router(core.router)
logger.info("Core route enabled")
if settings.enabled_routes.chat:
app.include_router(chat.router)
logger.info("Chat route enabled")
else:
logger.warning("Chat route disabled")
if settings.enabled_routes.rag:
app.include_router(rag.router)
logger.info("RAG route enabled")
else:
logger.warning("RAG route disabled")
if settings.enabled_routes.summary:
app.include_router(summary.router)
logger.info("Summary route enabled")
else:
logger.warning("Summary route disabled")
if settings.enabled_routes.feedback:
app.include_router(feedback.router)
logger.info("Feedback route enabled")
else:
logger.warning("Feedback route disabled")
if settings.enabled_routes.transcription:
app.include_router(transcription.router)
logger.info("Transcription route enabled")
else:
logger.warning("Transcription route disabled")
authentication
Expose the API for authentication in F13.
| MODULE | DESCRIPTION |
|---|---|
authentication |
Primitives for user authentication. |
authorization |
Primitives for user authorization. |
common |
Define common data types and utility functions used in several auth-related modules. |
permissions_cache |
Primitives for user authentication. |
| FUNCTION | DESCRIPTION |
|---|---|
add_auth_status_codes_to_responses |
Add documentation for authorization related status codes. |
add_auth_status_codes_to_responses
Add documentation for authorization related status codes.
Source code in docs/microservices/core/src/authentication/__init__.py
def add_auth_status_codes_to_responses(
responses: dict[int | str, dict[str, Any]],
) -> dict[int | str, dict[str, Any]]:
"""Add documentation for authorization related status codes."""
responses[401] = {
"description": "Missing authorization, invalid or expired token",
}
responses[403] = {
"description": "Insufficient permissions to access the endpoint.",
}
return responses
authentication
Primitives for user authentication.
To authenticate a user we perform a User-Managed Access (UMA) request in Keycloak. Keycloak returns a Requesting Party token (RPT) which contains the permissions of the user.
| CLASS | DESCRIPTION |
|---|---|
Permission |
Enumeration of known permissions to F13. |
| FUNCTION | DESCRIPTION |
|---|---|
needs_permissions |
Create a Security dependency that requires specific permissions. |
Permission
Bases: StrEnum
Enumeration of known permissions to F13.
Source code in docs/microservices/core/src/authentication/authentication.py
needs_permissions
Create a Security dependency that requires specific permissions.
| PARAMETER | DESCRIPTION |
|---|---|
perms
|
List of required permission names.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Security
|
Security dependency configured to check the specified permissions. |
Source code in docs/microservices/core/src/authentication/authentication.py
def needs_permissions(perms: Sequence[Permission]) -> Security:
"""Create a Security dependency that requires specific permissions.
Args:
perms: List of required permission names.
Returns:
Security dependency configured to check the specified permissions.
"""
return Security(_check_permissions, scopes=[perm.value for perm in perms])
authorization
Primitives for user authorization.
This will extract user information from a Javascript web token provided in the request.
| CLASS | DESCRIPTION |
|---|---|
User |
Model of a frontend enduser of F13. |
| FUNCTION | DESCRIPTION |
|---|---|
get_current_user |
Construct a 'User' object from an access token. |
User
Bases: BaseModel
Model of a frontend enduser of F13.
The relevant data can all be extracted from a Keycloak access_token.
| METHOD | DESCRIPTION |
|---|---|
from_token |
Construct a 'User' from a Keycloak token. |
Source code in docs/microservices/core/src/authentication/authorization.py
class User(BaseModel):
"""Model of a frontend enduser of F13.
The relevant data can all be extracted from a Keycloak `access_token`.
"""
user_id: str
session_id: str
@classmethod
def from_token(cls, decoded_token: AccessTokenContent) -> User:
"""Construct a 'User' from a Keycloak token."""
logger.debug("Decoded token: %r", decoded_token)
sub = decoded_token.get("sub")
if not sub:
raise ValueError("Token does not contain the 'sub' claim")
session_id = decoded_token.get("sid")
if not session_id:
raise ValueError("Token does not contain the 'sid' claim")
return cls(user_id=sub, session_id=session_id)
from_token
classmethod
Construct a 'User' from a Keycloak token.
Source code in docs/microservices/core/src/authentication/authorization.py
@classmethod
def from_token(cls, decoded_token: AccessTokenContent) -> User:
"""Construct a 'User' from a Keycloak token."""
logger.debug("Decoded token: %r", decoded_token)
sub = decoded_token.get("sub")
if not sub:
raise ValueError("Token does not contain the 'sub' claim")
session_id = decoded_token.get("sid")
if not session_id:
raise ValueError("Token does not contain the 'sid' claim")
return cls(user_id=sub, session_id=session_id)
get_current_user
async
Construct a 'User' object from an access token.
The token is supposed to be passed in as "Bearer" in the "Authorization" header.
Source code in docs/microservices/core/src/authentication/authorization.py
async def get_current_user(
request: Request,
token: Annotated[None | str, Depends(oauth2_scheme)],
) -> None | User:
"""Construct a 'User' object from an access token.
The token is supposed to be passed in as "Bearer" in the "Authorization" header.
"""
if request.state.settings.authentication.guest_mode:
logger.debug(
"Application is configured for guest-mode. Skipping authorization."
)
return None
elif token is None:
logger.debug("No token provided!")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
else:
logger.debug("Extract user data from token: %s", token)
jwks_client = cast(jwt.PyJWKClient, request.state.jwks_client)
signing_algos = request.state.jwt_signign_algorithms
try:
decoded = await decode_access_token(
token,
jwks_client,
signing_algos,
request.state.settings.authentication.audience,
)
except (jwt.DecodeError, jwt.PyJWKClientError) as exc:
logger.debug("Error while decoding token %s", token, exc_info=exc)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token"
)
except jwt.ExpiredSignatureError:
logger.debug("Presented token has expired")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Expired signature"
)
return User.from_token(decoded)
common
Define common data types and utility functions used in several auth-related modules.
| CLASS | DESCRIPTION |
|---|---|
AccessTokenContent |
Encountered fields in a users access token. |
RequestingPartyToken |
Decoded Requesting Party Token (RPT) from Keycloak. |
| FUNCTION | DESCRIPTION |
|---|---|
decode_access_token |
Decode the given 'access_token'. |
AccessTokenContent
RequestingPartyToken
Bases: TypedDict
Decoded Requesting Party Token (RPT) from Keycloak.
This and related data types only serve a documentary purpose. No validation or parsing happens to ensure that this is indeed the structure returned by Keycloak. The functions working with these types have to carry out checks on their behalf.
| ATTRIBUTE | DESCRIPTION |
|---|---|
exp |
Token expiration timestamp (Unix epoch).
TYPE:
|
sid |
Session identifier.
TYPE:
|
authorization |
UMA authorization data containing permissions.
TYPE:
|
Source code in docs/microservices/core/src/authentication/common.py
class RequestingPartyToken(TypedDict):
"""Decoded Requesting Party Token (RPT) from Keycloak.
This and related data types only serve a documentary purpose. No validation or parsing happens
to ensure that this is indeed the structure returned by Keycloak. The functions working with
these types have to carry out checks on their behalf.
Attributes:
exp: Token expiration timestamp (Unix epoch).
sid: Session identifier.
authorization: UMA authorization data containing permissions.
"""
exp: int
sid: str
authorization: _AuthorizationBlock
decode_access_token
async
Decode the given 'access_token'.
If 'audience' is set to 'None' then audience verification is disabled entirely.
Source code in docs/microservices/core/src/authentication/common.py
async def decode_access_token(
access_token: str,
jwks_client: jwt.PyJWKClient,
signing_algos: Sequence[str],
audience: None | str,
) -> AccessTokenContent:
"""Decode the given 'access_token'.
If 'audience' is set to 'None' then audience verification is disabled entirely.
"""
# 'get_signing_key_from_jwt' eventually performs a request to Keycloak
signing_key = await asyncio.to_thread(
jwks_client.get_signing_key_from_jwt, access_token
)
options = {}
if audience is None:
options = {"verify_aud": False}
decoded = jwt.decode(
access_token,
key=signing_key.key,
algorithms=signing_algos,
audience=audience,
options=options,
)
return decoded
permissions_cache
Primitives for user authentication.
Authentication happens via OAuth2 from Keycloak.
| CLASS | DESCRIPTION |
|---|---|
AuthCacheVal |
Cached authorization data for a user session. |
InMemoryPermissionsCache |
In-memory cache for UMA RPT authorization data. |
PermissionsCacheABC |
Abstract cache interface for UMA RPT authorization data. |
AuthCacheVal
dataclass
Cached authorization data for a user session.
| ATTRIBUTE | DESCRIPTION |
|---|---|
expires |
When this cache entry expires.
TYPE:
|
session_id |
The user's session identifier.
TYPE:
|
rpt |
The decoded Requesting Party Token.
TYPE:
|
permissions |
List of permission names extracted from the RPT.
TYPE:
|
raw_rpt |
The raw JWT token string.
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
construct_from_rpt |
Construct an AuthCacheVal from a decoded RPT. |
Source code in docs/microservices/core/src/authentication/permissions_cache.py
@dataclasses.dataclass(frozen=True)
class AuthCacheVal:
"""Cached authorization data for a user session.
Attributes:
expires: When this cache entry expires.
session_id: The user's session identifier.
rpt: The decoded Requesting Party Token.
permissions: List of permission names extracted from the RPT.
raw_rpt: The raw JWT token string.
"""
expires: datetime.datetime
session_id: SessionId
rpt: RequestingPartyToken
permissions: list[str]
raw_rpt: str
@classmethod
def construct_from_rpt(
cls, rpt: RequestingPartyToken, raw_rpt: str
) -> AuthCacheVal:
"""Construct an AuthCacheVal from a decoded RPT.
Args:
rpt: The decoded Requesting Party Token.
raw_rpt: The raw JWT token string.
Returns:
A new AuthCacheVal instance with extracted permissions.
Raises:
HTTPException: If required claims are missing from the RPT.
"""
if "authorization" not in rpt:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing 'authorization' claim in RPT.",
)
if "permissions" not in rpt["authorization"]:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing 'permissions' field in authorization claim in RPT.",
)
permissions = [perm["rsname"] for perm in rpt["authorization"]["permissions"]]
expires = datetime.datetime.fromtimestamp(rpt["exp"])
session_id = rpt["sid"]
return cls(expires, session_id, rpt, permissions, raw_rpt)
construct_from_rpt
classmethod
Construct an AuthCacheVal from a decoded RPT.
| PARAMETER | DESCRIPTION |
|---|---|
rpt
|
The decoded Requesting Party Token.
TYPE:
|
raw_rpt
|
The raw JWT token string.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
AuthCacheVal
|
A new AuthCacheVal instance with extracted permissions. |
| RAISES | DESCRIPTION |
|---|---|
HTTPException
|
If required claims are missing from the RPT. |
Source code in docs/microservices/core/src/authentication/permissions_cache.py
@classmethod
def construct_from_rpt(
cls, rpt: RequestingPartyToken, raw_rpt: str
) -> AuthCacheVal:
"""Construct an AuthCacheVal from a decoded RPT.
Args:
rpt: The decoded Requesting Party Token.
raw_rpt: The raw JWT token string.
Returns:
A new AuthCacheVal instance with extracted permissions.
Raises:
HTTPException: If required claims are missing from the RPT.
"""
if "authorization" not in rpt:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing 'authorization' claim in RPT.",
)
if "permissions" not in rpt["authorization"]:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing 'permissions' field in authorization claim in RPT.",
)
permissions = [perm["rsname"] for perm in rpt["authorization"]["permissions"]]
expires = datetime.datetime.fromtimestamp(rpt["exp"])
session_id = rpt["sid"]
return cls(expires, session_id, rpt, permissions, raw_rpt)
InMemoryPermissionsCache
dataclass
Bases: PermissionsCacheABC
In-memory cache for UMA RPT authorization data.
Stores authorization cache entries indexed by session ID with automatic expiration handling. This is a simple in-memory implementation suitable for smaller deployments.
Cleanup is triggered in the add method and is time based. It is executed every
'_cleanup_interval' when no other request is owning the '_lock'.
This cache should perform nicely for up to 100,000 users.
| METHOD | DESCRIPTION |
|---|---|
add |
Insert or replace a cached authorization entry. |
get_by_session_id |
Return the cached AuthCacheVal for this session_id, or None. |
Source code in docs/microservices/core/src/authentication/permissions_cache.py
@dataclasses.dataclass
class InMemoryPermissionsCache(PermissionsCacheABC):
"""In-memory cache for UMA RPT authorization data.
Stores authorization cache entries indexed by session ID with automatic expiration handling.
This is a simple in-memory implementation suitable for smaller deployments.
Cleanup is triggered in the `add` method and is time based. It is executed every
'_cleanup_interval' when no other request is owning the '_lock'.
This cache should perform nicely for up to 100,000 users.
"""
_cache: dict[SessionId, AuthCacheVal] = dataclasses.field(default_factory=dict)
_last_cleanup: datetime.datetime = dataclasses.field(
default_factory=datetime.datetime.now
)
_cleanup_interval: datetime.timedelta = dataclasses.field(
default_factory=lambda: datetime.timedelta(minutes=5)
)
_lock: asyncio.Lock = dataclasses.field(default_factory=asyncio.Lock)
async def add(self, val: AuthCacheVal) -> None:
"""Insert or replace a cached authorization entry.
Also run the cleanup routine to remove old
Args:
val: The authorization cache value to store.
"""
logger.debug("Added to permissions cache: %r", val.session_id)
async with self._lock:
self._cache[val.session_id] = val
logger.debug("Current cache size: %s", len(self._cache))
await self._check_cleanup()
async def get_by_session_id(self, session_id: SessionId) -> None | AuthCacheVal:
"""Return the cached AuthCacheVal for this session_id, or None.
Args:
session_id: The session identifier to lookup.
Returns:
The cached authorization value if found and not expired, None otherwise.
"""
async with self._lock:
entry = self._cache.get(session_id)
if entry is None:
return None
if entry.expires <= datetime.datetime.now():
self._cache.pop(session_id)
return None
return entry
async def _check_cleanup(self) -> None:
"""Remove all expired cache entries.
This is time based and runs after '_cleanup_interval' elapsed. To avoid blocking other
requests we try to acquire the lock non-blocking.
"""
now = datetime.datetime.now()
if now - self._last_cleanup < self._cleanup_interval:
return
# Postpone cleanup if the lock is already held by another coroutine
if self._lock.locked():
logger.info("Could not acquire lock. Postponing cleanup")
return
async with self._lock:
# Recheck to side-step race condition with 'self._lock.locked()'
if now - self._last_cleanup < self._cleanup_interval:
return
logger.info("Cleaning up permissions cache.")
expired_keys = [
sid for sid, val in self._cache.items() if val.expires <= now
]
for key in expired_keys:
entry = self._cache.pop(key)
logger.debug("Removed %r", entry.session_id)
self._last_cleanup = now
add
async
Insert or replace a cached authorization entry.
Also run the cleanup routine to remove old
| PARAMETER | DESCRIPTION |
|---|---|
val
|
The authorization cache value to store.
TYPE:
|
Source code in docs/microservices/core/src/authentication/permissions_cache.py
async def add(self, val: AuthCacheVal) -> None:
"""Insert or replace a cached authorization entry.
Also run the cleanup routine to remove old
Args:
val: The authorization cache value to store.
"""
logger.debug("Added to permissions cache: %r", val.session_id)
async with self._lock:
self._cache[val.session_id] = val
logger.debug("Current cache size: %s", len(self._cache))
await self._check_cleanup()
get_by_session_id
async
Return the cached AuthCacheVal for this session_id, or None.
| PARAMETER | DESCRIPTION |
|---|---|
session_id
|
The session identifier to lookup.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
None | AuthCacheVal
|
The cached authorization value if found and not expired, None otherwise. |
Source code in docs/microservices/core/src/authentication/permissions_cache.py
async def get_by_session_id(self, session_id: SessionId) -> None | AuthCacheVal:
"""Return the cached AuthCacheVal for this session_id, or None.
Args:
session_id: The session identifier to lookup.
Returns:
The cached authorization value if found and not expired, None otherwise.
"""
async with self._lock:
entry = self._cache.get(session_id)
if entry is None:
return None
if entry.expires <= datetime.datetime.now():
self._cache.pop(session_id)
return None
return entry
PermissionsCacheABC
Bases: ABC
Abstract cache interface for UMA RPT authorization data.
We want to keep this compatible with Redis. Note that Redis will invalidate expired tokens automatically. Thus, any implementation has to take care of cleanups themselves and not to rely on FastAPI background tasks or similar.
| METHOD | DESCRIPTION |
|---|---|
add |
Insert or replace a cached authorization entry. |
get_by_session_id |
Return the cached AuthCacheVal for this session_id, or None. |
Source code in docs/microservices/core/src/authentication/permissions_cache.py
class PermissionsCacheABC(abc.ABC):
"""Abstract cache interface for UMA RPT authorization data.
We want to keep this compatible with Redis. Note that Redis will invalidate expired tokens
automatically. Thus, any implementation has to take care of cleanups themselves and not to rely
on FastAPI background tasks or similar.
"""
@abc.abstractmethod
async def add(self, val: AuthCacheVal) -> None:
"""Insert or replace a cached authorization entry."""
raise NotImplementedError
@abc.abstractmethod
async def get_by_session_id(self, session_id: SessionId) -> None | AuthCacheVal:
"""Return the cached AuthCacheVal for this session_id, or None."""
raise NotImplementedError
add
abstractmethod
async
get_by_session_id
abstractmethod
async
Return the cached AuthCacheVal for this session_id, or None.
endpoints
Modules for sending HTTP requests to endpoints of internal microservices.
| MODULE | DESCRIPTION |
|---|---|
chat |
Endpoints of the chat-microservice. |
core |
General endpoints of f13 core. |
feedback |
Endpoints for Feedback (general, chat, rag and summary). |
rag |
Endpoints of the RAG-microservice. |
summary |
Endpoints of the summary-microservice. |
transcription |
Endpoints of the transcrpition-microservice including: upload, retrieval, word and upsertion of special terms. |
chat
Endpoints of the chat-microservice.
| FUNCTION | DESCRIPTION |
|---|---|
chat_completion |
Chat completion endpoint. |
chat_completion_json_stream |
Chat completion endpoint with json-stream. |
chat_completion_stream |
Chat completion endpoint with text-stream. |
get_chat_endpoint |
Extract the rag endpoint from the 'settings'. |
get_llms |
Return model information of available LLMs. |
chat_completion
async
Chat completion endpoint.
| PARAMETER | DESCRIPTION |
|---|---|
request
|
the currently performed request
TYPE:
|
chat_input
|
Input containing the chat message.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ChatOutput
|
Output of the chat response. |
Source code in docs/microservices/core/src/endpoints/chat.py
@router.post(
"/completion",
response_model=ChatOutput,
summary="Chat completion endpoint.",
description=(
"Performs response for chat completions.\n\n"
"The endpoint returns a single JSON response containing the chat output.\n\n"
),
openapi_extra={
"requestBody": {
"content": {
"application/json": {
"examples": ChatInput.model_config["json_schema_extra"][
"openapi_examples"
],
}
},
}
},
responses=add_auth_status_codes_to_responses(
{
200: {
"description": "Successful chat response.",
"content": {
"application/json": {
"examples": ChatOutput.model_config["json_schema_extra"][
"openapi_examples"
],
},
},
},
400: {"description": "Invalid language model."},
408: {"description": "Request timeout of a dependency."},
502: {"description": "Error accessing microservice or LLM client."},
500: {"description": "Error processing answer of LLM client."},
}
),
)
async def chat_completion(
request: Request,
chat_input: ChatInput,
) -> ChatOutput:
"""Chat completion endpoint.
Args:
request (Request): the currently performed request
chat_input (ChatInput): Input containing the chat message.
Returns:
Output of the chat response.
"""
settings = request.state.settings
chat_endpoint = get_chat_endpoint(settings)
url = f"{chat_endpoint}/completion"
json = chat_input.model_dump()
headers = {"Content-Type": "application/json", "Accept": "application/json"}
post_config = settings.inter_service_communication.chat
request_options = {
"json": json,
"headers": headers,
}
result = await async_post(
url=url,
response_model=ChatOutput,
config=post_config,
request_options=request_options,
service_name="Chat",
)
return result
chat_completion_json_stream
async
Chat completion endpoint with json-stream.
| PARAMETER | DESCRIPTION |
|---|---|
request
|
the currently performed request
TYPE:
|
chat_input
|
Input containing the chat message.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
StreamingResponse
|
Output of the chat response. |
Source code in docs/microservices/core/src/endpoints/chat.py
@router.post(
"/v2/completion/stream",
response_class=StreamingResponse,
summary="Chat completion endpoint with x-ndjson-stream.",
description=(
"Starts a streaming response for chat completions.\n\n"
"The endpoint streams messages as NDJSON (`application/x-ndjson`) "
"with different types: `response`, `reason`, and `error`."
),
openapi_extra={
"requestBody": {
"content": {
"application/json": {
"examples": ChatInput.model_config["json_schema_extra"][
"openapi_examples"
],
}
},
}
},
responses=add_auth_status_codes_to_responses(
{
200: {
"description": "Streaming started successfully.",
"content": {
"application/x-ndjson": {
"examples": ChatStreamOutput.model_config["json_schema_extra"][
"openapi_examples"
],
},
},
},
400: {"description": "Invalid language model."},
}
),
)
async def chat_completion_json_stream(
request: Request,
chat_input: ChatInput,
) -> StreamingResponse:
"""Chat completion endpoint with json-stream.
Args:
request (Request): the currently performed request
chat_input (ChatInput): Input containing the chat message.
Returns:
Output of the chat response.
"""
settings = request.state.settings
chat_endpoint = get_chat_endpoint(settings)
url = f"{chat_endpoint}/v2/completion/stream"
json = chat_input.model_dump()
headers = {"Content-Type": "application/json", "Accept": "application/json"}
post_config = settings.inter_service_communication.chat
return StreamingResponse(
async_post_stream(
url=url,
request_options={
"json": json,
"headers": headers,
},
config=post_config,
service_name="Chat",
),
media_type="application/x-ndjson",
headers={
"Connection": "keep-alive",
},
)
chat_completion_stream
async
Chat completion endpoint with text-stream.
| PARAMETER | DESCRIPTION |
|---|---|
request
|
the currently performed request
TYPE:
|
chat_input
|
Input containing the chat message.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
StreamingResponse
|
Output of the chat response. |
Source code in docs/microservices/core/src/endpoints/chat.py
@router.post(
"/completion/stream",
response_class=StreamingResponse,
summary="Chat completion endpoint with text-stream.",
description=(
"Starts a streaming response for chat completions.\n\n"
"The endpoint streams messages as text (`text/event-stream`)."
),
openapi_extra={
"requestBody": {
"content": {
"application/json": {
"examples": ChatInput.model_config["json_schema_extra"][
"openapi_examples"
],
}
},
}
},
responses=add_auth_status_codes_to_responses(
{
200: {
"description": "Streaming started successfully.",
"content": {
"text/event-stream": {
"schema": {
"type": "string",
"example": "Hello, how can I help you today?\n\n",
},
"examples": {
"response": {
"summary": "Chat response",
"description": (
"This is the standard output ",
"returned by the chat model for a normal request.",
),
"value": "Hello, how can I help you today?\n\n",
},
"error": {
"summary": "Error output",
"description": "This example shows the output returned when an error occurs.",
"value": "Es ist ein Fehler aufgetreten. Bitte versuchen Sie es später erneut.",
},
},
}
},
},
400: {"description": "Invalid language model."},
}
),
)
async def chat_completion_stream(
request: Request,
chat_input: ChatInput,
) -> StreamingResponse:
"""Chat completion endpoint with text-stream.
Args:
request (Request): the currently performed request
chat_input (ChatInput): Input containing the chat message.
Returns:
Output of the chat response.
"""
settings = request.state.settings
chat_endpoint = get_chat_endpoint(settings)
url = f"{chat_endpoint}/completion/stream"
json = chat_input.model_dump()
headers = {"Content-Type": "application/json", "Accept": "application/json"}
post_config = settings.inter_service_communication.chat
return StreamingResponse(
async_post_stream(
url=url,
request_options={
"json": json,
"headers": headers,
},
config=post_config,
service_name="Chat",
),
media_type="text/event-stream",
headers={
"Connection": "keep-alive",
},
)
get_chat_endpoint
get_llms
async
Return model information of available LLMs.
| PARAMETER | DESCRIPTION |
|---|---|
request
|
the currently performed request
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[dict]
|
The list of available models. |
Source code in docs/microservices/core/src/endpoints/chat.py
@router.get(
"/llms",
summary="List available language models.",
description=("Returns a list of available language models (LLMs).\n\n"),
responses=add_auth_status_codes_to_responses(
{
200: {
"description": "List of available LLMs.",
"content": {
"application/json": {
"example": [
{
"label": "test_model:mock",
"is_remote": False,
"name": "test_model_mock",
},
]
}
},
},
500: {"description": "Internal server error accessing microservice"},
},
),
)
async def get_llms(request: Request) -> list[dict]:
"""Return model information of available LLMs.
Args:
request (Request): the currently performed request
Returns:
The list of available models.
"""
chat_endpoint = get_chat_endpoint(request.state.settings)
async with httpx.AsyncClient(timeout=30) as client:
response = await client.get(f"{chat_endpoint}/llms")
result = response.json()
return result
core
General endpoints of f13 core.
| FUNCTION | DESCRIPTION |
|---|---|
health |
Return a health check message. |
websocket_endpoint |
WebSocket endpoint to establish a bi-directional connection between clients and server. |
health
async
Return a health check message.
| RETURNS | DESCRIPTION |
|---|---|
dict[str, str]
|
The health check message as a dictonary. |
Source code in docs/microservices/core/src/endpoints/core.py
@router.get(
"/",
summary="Health check endpoint",
description=(
"Returns a simple message indicating that the F13-Core service is running.\n\n"
"Use this endpoint to verify that the service is alive and responsive."
),
responses={
200: {
"description": "Health check successful",
"content": {
"application/json": {"example": {"status": "F13-Core is running"}}
},
},
500: {"description": "Internal server error"},
},
)
@router.get(
"/health",
summary="Health check endpoint",
description=(
"Returns a simple message indicating that the F13-Core service is running.\n\n"
"Use this endpoint to verify that the service is alive and responsive."
),
responses={
200: {
"description": "Health check successful",
"content": {
"application/json": {"example": {"status": "F13-Core is running"}}
},
},
500: {"description": "Internal server error"},
},
)
async def health() -> dict[str, str]:
"""Return a health check message.
Returns:
The health check message as a dictonary.
"""
return {"status": "F13-Core is running"}
websocket_endpoint
async
WebSocket endpoint to establish a bi-directional connection between clients and server.
| PARAMETER | DESCRIPTION |
|---|---|
websocket
|
The WebSocket connection to be established.
TYPE:
|
connection_id
|
A client-defined connection ID.
TYPE:
|
token
|
A JWT access tokenIf the application runs in guest_mode the token is ignored
TYPE:
|
Source code in docs/microservices/core/src/endpoints/core.py
@router.websocket("/ws/{connection_id}")
async def websocket_endpoint(
websocket: WebSocket,
connection_id: str,
token: None | str = None,
) -> None:
"""WebSocket endpoint to establish a bi-directional connection between clients and server.
Args:
websocket (WebSocket): The WebSocket connection to be established.
connection_id (str): A client-defined connection ID.
token (str): A JWT access tokenIf the application runs in guest_mode the token is ignored
"""
current_user = await get_current_user(websocket, token)
logger.debug("WS initiated via a token from %s", current_user)
# If a connection with the given connection_id already exists: raise Exeption
if not websocket_manager.check_connection_id_is_unique(connection_id):
raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)
# Accept Connection and update websocketHandler
await websocket.accept()
websocket_manager.add_websocket(websocket, connection_id)
websocket_manager.debug(f"Welcome {connection_id}!", connection_id=connection_id)
try:
while True:
message = await websocket.receive_text()
websocket_manager.debug(
f"{connection_id} says: {message}", connection_id=connection_id
)
except WebSocketDisconnect:
websocket_manager.remove_websocket(connection_id)
except Exception as e:
logger.exception(e)
finally:
# Remove Handler, if connection closed
websocket_manager.remove_websocket(connection_id)
feedback
Endpoints for Feedback (general, chat, rag and summary).
| FUNCTION | DESCRIPTION |
|---|---|
chat_feedback |
Store chat-specific feedback in feedback-database. |
general_feedback |
Store general feedback in feedback-database. |
rag_feedback |
Store RAG-specific feedback in feedback-database. |
summary_feedback |
Store Summary-specific feedback in feedback-database. |
chat_feedback
async
Store chat-specific feedback in feedback-database.
| PARAMETER | DESCRIPTION |
|---|---|
session
|
A database session object.
TYPE:
|
feedback
|
Chat feedback.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
bool
|
True if feedback was successfully stored. |
| RAISES | DESCRIPTION |
|---|---|
HTTPException
|
Error if general feedback was not stored. |
Source code in docs/microservices/core/src/endpoints/feedback.py
@router.post(
"/chat",
response_model=bool,
responses=add_auth_status_codes_to_responses(
{
400: {"description": "Invalid input."},
422: {"description": "Validation error in the request."},
500: {"description": "Error saving the feedback."},
}
),
)
async def chat_feedback(session: SessionDep, feedback: ChatFeedback) -> bool:
"""Store chat-specific feedback in feedback-database.
Args:
session (Session): A database session object.
feedback (ChatFeedback): Chat feedback.
Returns:
True if feedback was successfully stored.
Raises:
HTTPException: Error if general feedback was not stored.
"""
try:
return save_chat_feedback(session, feedback)
except Exception as e:
logger.error(f"Error while saving the Chat-feedback: {e}")
raise HTTPException(
status_code=500, detail="Fehler beim Speichern des Feedback."
)
general_feedback
async
Store general feedback in feedback-database.
| PARAMETER | DESCRIPTION |
|---|---|
session
|
A database session object.
TYPE:
|
feedback
|
General feedback.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
bool
|
True if feedback was successfully stored. |
| RAISES | DESCRIPTION |
|---|---|
HTTPException
|
Error if general feedback was not stored. |
Source code in docs/microservices/core/src/endpoints/feedback.py
@router.post(
"/general",
response_model=bool,
responses=add_auth_status_codes_to_responses(
{
400: {"description": "Invalid input."},
422: {"description": "Validation error in the request."},
500: {"description": "Error saving the feedback."},
}
),
)
async def general_feedback(session: SessionDep, feedback: GeneralFeedback) -> bool:
"""Store general feedback in feedback-database.
Args:
session (Session): A database session object.
feedback (GeneralFeedback): General feedback.
Returns:
True if feedback was successfully stored.
Raises:
HTTPException: Error if general feedback was not stored.
"""
try:
return save_general_feedback(session, feedback)
except Exception as e:
logger.error(f"Error while saving the general feedback: {e}")
raise HTTPException(
status_code=500, detail="Fehler beim Speichern des Feedback."
)
rag_feedback
async
Store RAG-specific feedback in feedback-database.
| PARAMETER | DESCRIPTION |
|---|---|
session
|
A database session object.
TYPE:
|
feedback
|
RAG feedback.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
bool
|
True if feedback was successfully stored. |
| RAISES | DESCRIPTION |
|---|---|
HTTPException
|
Error if general feedback was not stored. |
Source code in docs/microservices/core/src/endpoints/feedback.py
@router.post(
"/rag",
response_model=bool,
responses=add_auth_status_codes_to_responses(
{
400: {"description": "Invalid input."},
422: {"description": "Validation error in the request."},
500: {"description": "Error saving the feedback."},
}
),
)
async def rag_feedback(session: SessionDep, feedback: RAGFeedback) -> bool:
"""Store RAG-specific feedback in feedback-database.
Args:
session (Session): A database session object.
feedback (RAGFeedback): RAG feedback.
Returns:
True if feedback was successfully stored.
Raises:
HTTPException: Error if general feedback was not stored.
"""
try:
return save_rag_feedback(session, feedback)
except Exception as e:
logger.error(f"Error while saving the RAG-feedback: {e}")
raise HTTPException(
status_code=500, detail="Fehler beim Speichern des Feedback."
)
summary_feedback
async
Store Summary-specific feedback in feedback-database.
| PARAMETER | DESCRIPTION |
|---|---|
session
|
A database session object.
TYPE:
|
feedback
|
Summary feedback.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
bool
|
True if feedback was successfully stored. |
| RAISES | DESCRIPTION |
|---|---|
HTTPException
|
Error if general feedback was not stored. |
Source code in docs/microservices/core/src/endpoints/feedback.py
@router.post(
"/summary",
response_model=bool,
responses=add_auth_status_codes_to_responses(
{
400: {"description": "Invalid input."},
422: {"description": "Validation error in the request."},
500: {"description": "Error saving the feedback."},
}
),
)
async def summary_feedback(session: SessionDep, feedback: SummaryFeedback) -> bool:
"""Store Summary-specific feedback in feedback-database.
Args:
session (Session): A database session object.
feedback (SummaryFeedback): Summary feedback.
Returns:
True if feedback was successfully stored.
Raises:
HTTPException: Error if general feedback was not stored.
"""
try:
return save_summary_feedback(session, feedback)
except Exception as e:
logger.error(f"Error while saving the Summary-feedback: {e}")
raise HTTPException(
status_code=500, detail="Fehler beim Speichern des Feedback."
)
rag
Endpoints of the RAG-microservice.
| FUNCTION | DESCRIPTION |
|---|---|
database_rag |
RAG Endpoint for knowledge DB. |
fetch_file_rag |
Endpoint for RAG on a given list of files. |
get_llms |
LLM-Information of available Models for RAG. |
get_rag_endpoint |
Extract the rag endpoint from the general settings. |
get_sources |
Available sources in the knowledge database. |
no_database_ingestion |
Ingestion Endpoint, returns forbidden Error message. Only for internal use. |
database_rag
async
RAG Endpoint for knowledge DB.
| PARAMETER | DESCRIPTION |
|---|---|
request
|
the currently performed request
TYPE:
|
_auth
|
Authorization dependency that checks for required permissions.
TYPE:
|
rag_input
|
Input containing RAG request.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[RAGOutput]
|
RAG response. |
Source code in docs/microservices/core/src/endpoints/rag.py
@router.post(
"/database",
response_model=list[RAGOutput],
summary="RAG knowledge database endpoint.",
description=(
"Performs a RAG (Retrieval-Augmented Generation) query on the knowledge database.\n\n"
"The endpoint retrieves relevant documents based on the input filters and question, "
"then generates answers using the specified language model."
),
openapi_extra={
"requestBody": {
"content": {
"application/json": {
"examples": RAGInput.model_config["json_schema_extra"][
"openapi_examples"
],
}
}
},
},
responses=add_auth_status_codes_to_responses(
{
200: {
"description": "Successful RAG response with retrieved documents.",
"content": {
"application/json": {
"examples": RAGOutput.model_config["json_schema_extra"][
"openapi_examples"
],
}
},
},
400: {"description": "Invalid language model."},
408: {"description": "Request timeout of a dependency."},
500: {"description": "Internal server error."},
502: {"description": "Error accessing microservice."},
}
),
)
async def database_rag(
request: Request,
_auth: Annotated[None, needs_permissions([Permission.RAG_DATABASE])],
rag_input: RAGInput,
) -> list[RAGOutput]:
"""RAG Endpoint for knowledge DB.
Args:
request (Request): the currently performed request
_auth: Authorization dependency that checks for required permissions.
rag_input (RAGInput): Input containing RAG request.
Returns:
RAG response.
"""
settings = request.state.settings
post_config = settings.inter_service_communication.rag
request_options = {
"json": rag_input.model_dump(),
"headers": {
"Content-Type": "application/json",
"Accept": "application/json",
},
}
rag_endpoint = get_rag_endpoint(settings)
result = await async_post(
url=f"{rag_endpoint}/database",
response_model=RAGOutput,
config=post_config,
request_options=request_options,
service_name="RAG",
)
return result
fetch_file_rag
async
fetch_file_rag(request, _auth, request_timestamp=Form(..., description='Unix timestamp of the request.', examples=[1731252767]), question=Form(..., description="The user's input question to be answered.", examples=['What did the parties decide?']), language_model=Form(..., description='Identifier for the language model to use.', examples=['test_model_mock']), max_chunks_to_use=Form(10, description='Optional upper limit on the number of text chunks used for the response.', examples=[5]), files=File(..., description='One or more files (pdf, txt, docx).'))
Endpoint for RAG on a given list of files.
| PARAMETER | DESCRIPTION |
|---|---|
request
|
the currently performed request
TYPE:
|
_auth
|
Authorization dependency that checks for required permissions.
TYPE:
|
request_timestamp
|
The timestamp of the request.
TYPE:
|
question
|
The question to be answered, encoded as FormData.
TYPE:
|
language_model
|
The selected language model. LLMs are defined in configs/llm_models.yml.
TYPE:
|
max_chunks_to_use
|
The number of chunks to use for answer generation.
TYPE:
|
files
|
List of binary-encoded uploaded files (e.g., PDF, DOCX/ODT, TXT), sent as FormData.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[RAGOutput]
|
The generated answer as an inference result. |
Source code in docs/microservices/core/src/endpoints/rag.py
@router.post(
"/file",
response_model=list[RAGOutput],
summary="RAG file endpoint with multiple uploads.",
description=(
"Performs a RAG (Retrieval-Augmented Generation) query based on uploaded files.\n\n"
"The endpoint parses and chunks the uploaded files, retrieves relevant documents, "
"and generates answers using the specified language model."
),
responses=add_auth_status_codes_to_responses(
{
200: {
"description": "Successful RAG response with retrieved documents.",
"content": {
"application/json": {
"examples": RAGOutput.model_config["json_schema_extra"][
"openapi_examples"
],
}
},
},
400: {"description": "Invalid language model or request."},
408: {"description": "Request timeout of a dependency."},
500: {"description": "Internal server error."},
502: {"description": "Error accessing microservice."},
}
),
)
async def fetch_file_rag(
request: Request,
_auth: Annotated[None, needs_permissions([Permission.RAG_FILE])],
request_timestamp: int = Form(
...,
description="Unix timestamp of the request.",
examples=[1731252767],
),
question: str = Form(
...,
description="The user's input question to be answered.",
examples=["What did the parties decide?"],
),
language_model: str = Form(
...,
description="Identifier for the language model to use.",
examples=["test_model_mock"],
),
max_chunks_to_use: int | None = Form(
10,
description="Optional upper limit on the number of text chunks used for the response.",
examples=[5],
),
files: list[UploadFile] = File(
...,
description="One or more files (pdf, txt, docx).",
),
) -> list[RAGOutput]:
"""Endpoint for RAG on a given list of files.
Args:
request (Request): the currently performed request
_auth: Authorization dependency that checks for required permissions.
request_timestamp (int): The timestamp of the request.
question (str): The question to be answered, encoded as FormData.
language_model (str): The selected language model. LLMs are defined in configs/llm_models.yml.
max_chunks_to_use (int): The number of chunks to use for answer generation.
files (list): List of binary-encoded uploaded files (e.g., PDF, DOCX/ODT, TXT), sent as FormData.
Returns:
The generated answer as an inference result.
"""
settings = request.state.settings
# validating input data
rag_input = RAGInput(
request_timestamp=request_timestamp,
question=question,
meta_data_filters=None,
max_chunks_to_use=max_chunks_to_use,
language_model=language_model,
)
# preparing uploaded files for sending to rag microservice
files = [("files", (f.filename, await f.read(), f.content_type)) for f in files]
post_config = settings.inter_service_communication.rag
request_options = {
"data": rag_input.model_dump(),
"files": files,
}
rag_endpoint = get_rag_endpoint(request.state.settings)
result = await async_post(
url=f"{rag_endpoint}/file",
response_model=RAGOutput,
config=post_config,
request_options=request_options,
service_name="RAG",
)
return result
get_llms
async
LLM-Information of available Models for RAG.
| PARAMETER | DESCRIPTION |
|---|---|
request
|
the currently performed request
TYPE:
|
_authenticated_user
|
the (authenticated) user performing the request This is 'None' if the application is configured for guest mode.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[dict]
|
List of available RAG models. |
Source code in docs/microservices/core/src/endpoints/rag.py
@router.get(
"/llms",
summary="List available language models.",
description=("Returns a list of available language models (LLMs).\n\n"),
responses=add_auth_status_codes_to_responses(
{
200: {
"description": "List of available LLMs.",
"content": {
"application/json": {
"example": [
{
"label": "test_model:mock",
"is_remote": False,
"name": "test_model_mock",
},
]
}
},
},
500: {"description": "Internal server error accessing microservice"},
}
),
)
async def get_llms(request: Request, _authenticated_user: UserDep) -> list[dict]:
"""LLM-Information of available Models for RAG.
Args:
request (Request): the currently performed request
_authenticated_user (User): the (authenticated) user performing the request
This is 'None' if the application is configured for guest mode.
Returns:
List of available RAG models.
"""
rag_endpoint = get_rag_endpoint(request.state.settings)
async with httpx.AsyncClient(timeout=30) as client:
response = await client.get(f"{rag_endpoint}/llms")
result = response.json()
return result
get_rag_endpoint
get_sources
async
Available sources in the knowledge database.
| PARAMETER | DESCRIPTION |
|---|---|
request
|
the currently performed request
TYPE:
|
_auth
|
Authorization dependency that checks for required permissions.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[RAGDatabaseSources]
|
List of available database sources (e.g. Koalitionsvertrag, Drucksachen). |
Source code in docs/microservices/core/src/endpoints/rag.py
@router.get(
"/sources",
response_model=list[RAGDatabaseSources],
summary="List available RAG database sources.",
description=(
"Returns a list of all available sources in the knowledge database.\n\n"
"Each source contains its name and whether date-based filtering is applied."
),
responses=add_auth_status_codes_to_responses(
{
200: {
"description": "List of available database sources.",
"content": {
"application/json": {
"examples": RAGDatabaseSources.model_config[
"json_schema_extra"
]["openapi_examples"],
}
},
},
500: {"description": "Internal server error accessing microservice."},
}
),
)
async def get_sources(
request: Request,
_auth: Annotated[None, needs_permissions([Permission.RAG_DATABASE])],
) -> list[RAGDatabaseSources]:
"""Available sources in the knowledge database.
Args:
request (Request): the currently performed request
_auth: Authorization dependency that checks for required permissions.
Returns:
List of available database sources (e.g. Koalitionsvertrag, Drucksachen).
"""
rag_endpoint = get_rag_endpoint(request.state.settings)
async with httpx.AsyncClient(timeout=30) as client:
response = await client.get(url=f"{rag_endpoint}/sources")
result = response.json()
return result
no_database_ingestion
async
Ingestion Endpoint, returns forbidden Error message. Only for internal use.
Source code in docs/microservices/core/src/endpoints/rag.py
@router.post(
"/database_ingestion",
responses={
403: {
"description": "Ingestion-enpoint only for internal use.",
}
},
)
async def no_database_ingestion() -> None:
"""Ingestion Endpoint, returns forbidden Error message. Only for internal use."""
raise HTTPException(
status_code=403,
detail="This endpoint is for internal use only.",
)
summary
Endpoints of the summary-microservice.
| FUNCTION | DESCRIPTION |
|---|---|
get_llms |
LLM-Information of available Models for Summary. |
get_summary_endpoint |
Extract the summary endpoint from the general settings. |
summarize_file |
Summary of a pdf, docx or txt file including its parsing, cleaning and chunking. |
summarize_text |
Summary of text input including basic text cleaning and chunking of text input. |
get_llms
async
LLM-Information of available Models for Summary.
| PARAMETER | DESCRIPTION |
|---|---|
request
|
the currently performed request
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[dict]
|
List of available summary models. |
Source code in docs/microservices/core/src/endpoints/summary.py
@router.get(
"/llms",
summary="List available language models.",
description=("Returns a list of available language models (LLMs).\n\n"),
responses=add_auth_status_codes_to_responses(
{
200: {
"description": "List of available LLMs.",
"content": {
"application/json": {
"example": [
{
"label": "test_model:mock",
"is_remote": False,
"name": "test_model_mock",
},
]
}
},
},
500: {"description": "Internal server error accessing microservice"},
}
),
)
async def get_llms(request: Request) -> list[dict]:
"""LLM-Information of available Models for Summary.
Args:
request (Request): the currently performed request
Returns:
List of available summary models.
"""
summary_endpoint = get_summary_endpoint(request.state.settings)
async with httpx.AsyncClient(timeout=30) as client:
response = await client.get(f"{summary_endpoint}/llms")
result = response.json()
return result
get_summary_endpoint
summarize_file
async
summarize_file(request, file=File(..., description='Upload a PDF, DOCX, or TXT file.'), api_input=Depends(SummaryFileAPIInputParameters.as_form))
Summary of a pdf, docx or txt file including its parsing, cleaning and chunking.
| PARAMETER | DESCRIPTION |
|---|---|
request
|
the currently performed request
TYPE:
|
file
|
Input file either pdf, docx or txt.
TYPE:
|
api_input
|
Containing the name of the language model, desired length of summary output as number of DIN-A4 pages, topics on which the summary should be focussed.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SummaryAPIOutput
|
The summary, a message to the user and the parsing output. |
Source code in docs/microservices/core/src/endpoints/summary.py
@router.post(
"/file",
response_model=SummaryAPIOutput,
summary="File summary endpoint.",
description=(
"Generates a summary for a PDF, DOCX, or TXT file.\n\n"
"The endpoint parses, cleans, chunks the input file and then summarizes the text "
"according to the requested output length, focusing the summary on the user-defined topics "
"and using the specified language model."
),
openapi_extra={
"requestBody": {
"content": {
"application/json": {
"examples": SummaryFileAPIInputParameters.model_config[
"json_schema_extra"
]["openapi_examples"],
}
}
}
},
responses=add_auth_status_codes_to_responses(
{
200: {
"description": "Successful summary generation.",
"content": {
"application/json": {
"examples": SummaryAPIOutput.model_config["json_schema_extra"][
"openapi_examples"
],
}
},
},
400: {"description": "Invalid request."},
424: {"description": "Failed dependency."},
408: {"description": "Request timeout of a dependency."},
502: {"description": "Error accessing microservice."},
}
),
)
async def summarize_file(
request: Request,
file: UploadFile = File(..., description="Upload a PDF, DOCX, or TXT file."),
api_input: SummaryFileAPIInputParameters = Depends(
SummaryFileAPIInputParameters.as_form
),
) -> SummaryAPIOutput:
"""Summary of a pdf, docx or txt file including its parsing, cleaning and chunking.
Args:
request (Request): the currently performed request
file (UploadFile): Input file either pdf, docx or txt.
api_input (SummaryFileAPIInputParameters): Containing the name of the language model, desired length of
summary output as number of DIN-A4 pages, topics on which the summary should be focussed.
Returns:
The summary, a message to the user and the parsing output.
"""
settings = request.state.settings
summary_endpoint = get_summary_endpoint(settings)
url = f"{summary_endpoint}/summary/file"
files = dict(file=(file.filename, file.file, file.content_type))
data = dict(
language_model=api_input.language_model,
output_length=api_input.output_length,
topics=api_input.topics,
)
post_config = settings.inter_service_communication.summary
request_options = {
"files": files,
"data": data,
}
result = await async_post(
url=url,
response_model=SummaryAPIOutput,
config=post_config,
request_options=request_options,
service_name="Summary",
)
return result
summarize_text
async
Summary of text input including basic text cleaning and chunking of text input.
| PARAMETER | DESCRIPTION |
|---|---|
request
|
the currently performed request
TYPE:
|
api_input
|
Containing the input text, name of the language model, desired length of summary output as number of DIN-A4 pages, topics on which the summary should be focussed.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SummaryAPIOutput
|
The summary, a message to the user and the parsing output. |
Source code in docs/microservices/core/src/endpoints/summary.py
@router.post(
"/text",
response_model=SummaryAPIOutput,
summary="Text summary endpoint.",
description=(
"Generates a summary from a plain text input.\n\n"
"The endpoint performs basic text cleaning, chunking, and then summarizes the text "
"according to the requested output length, focusing the summary on the user-defined topics "
"and using the specified language model."
),
openapi_extra={
"requestBody": {
"content": {
"application/json": {
"examples": SummaryTextAPIInput.model_config["json_schema_extra"][
"openapi_examples"
],
}
}
}
},
responses=add_auth_status_codes_to_responses(
{
200: {
"description": "Successful summary generation.",
"content": {
"application/json": {
"examples": SummaryAPIOutput.model_config["json_schema_extra"][
"openapi_examples"
],
}
},
},
400: {"description": "Invalid request."},
424: {"description": "Failed dependency."},
408: {"description": "Request timeout of a dependency."},
502: {"description": "Error accessing microservice."},
}
),
)
async def summarize_text(
request: Request,
api_input: SummaryTextAPIInput,
) -> SummaryAPIOutput:
"""Summary of text input including basic text cleaning and chunking of text input.
Args:
request (Request): the currently performed request
api_input (SummaryTextAPIInput): Containing the input text, name of the language model, desired length of
summary output as number of DIN-A4 pages, topics on which the summary should be focussed.
Returns:
The summary, a message to the user and the parsing output.
"""
settings = request.state.settings
summary_endpoint = get_summary_endpoint(settings)
url = f"{summary_endpoint}/summary/text"
json = dict(
text=api_input.text,
language_model=api_input.language_model,
output_length=float(api_input.output_length),
topics=api_input.topics,
)
headers = {"Content-Type": "application/json", "Accept": "application/json"}
post_config = settings.inter_service_communication.summary
request_options = {
"json": json,
"headers": headers,
}
result = await async_post(
url=url,
response_model=SummaryAPIOutput,
config=post_config,
request_options=request_options,
service_name="Summary",
)
return result
transcription
Endpoints of the transcrpition-microservice including: upload, retrieval, word and upsertion of special terms.
| FUNCTION | DESCRIPTION |
|---|---|
check_transcription_exists |
Verify if transcription exists for given job ID. |
get_transcription |
Retrieve transcription for given job ID. |
get_transcription_endpoint |
Extract the transcription endpoint from the 'settings'. |
post_transcription_docx_or_zip |
Retrieve stored transcription as DOCX document. |
translate |
Translate the translation_chunks to German. |
upload |
Upload file for transcription endpoint. |
upload_special_terms |
Upserts new special terms to database. |
check_transcription_exists
async
Verify if transcription exists for given job ID.
Can be used for periodic checks. No retrieval of transcription.
| PARAMETER | DESCRIPTION |
|---|---|
request
|
The currently performed request.
TYPE:
|
job_ids
|
List of transcription job IDs to check.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
TranscriptionStatusResponse
|
Contains job_id and status of the job.
TYPE:
|
| RAISES | DESCRIPTION |
|---|---|
HTTPException
|
If no job_id is provided in the request body. |
Source code in docs/microservices/core/src/endpoints/transcription.py
@router.post(
"/check_transcription_exists",
summary="Check if transcriptions exists and retrieve status",
description="Check whether transcriptions exist for the given job IDs. Returns booleans and status.",
openapi_extra={
"requestBody": {
"content": {
"application/json": {
"examples": CheckTranscriptionExistsRequest.model_config[
"json_schema_extra"
]["openapi_examples"],
}
},
}
},
response_model=TranscriptionStatusResponse,
responses=add_auth_status_codes_to_responses(
{
200: {
"description": "Successful status check.",
"content": {
"application/json": {
"examples": TranscriptionStatusResponse.model_config[
"json_schema_extra"
]["openapi_examples"]
}
},
},
400: {"description": "Payload must contain at least one job_id."},
500: {"description": "Internal Server Error"},
},
),
)
async def check_transcription_exists(
request: Request,
job_ids: CheckTranscriptionExistsRequest,
) -> TranscriptionStatusResponse:
"""Verify if transcription exists for given job ID.
Can be used for periodic checks. No retrieval of transcription.
Args:
request (Request): The currently performed request.
job_ids (CheckTranscriptionExistsRequest): List of transcription job IDs to check.
Returns:
TranscriptionStatusResponse: Contains job_id and status of the job.
Raises:
HTTPException: If no job_id is provided in the request body.
"""
settings = request.state.settings
transcription_endpoint = get_transcription_endpoint(settings)
url: str = f"{transcription_endpoint}/check_transcription_exists"
job_id_list = job_ids.root
if not job_id_list:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Payload must contain at least one job_id.",
)
request_options = {
"json": [str(job_id) for job_id in job_id_list],
}
post_config = settings.inter_service_communication.transcription
result = await async_post(
url=url,
response_model=TranscriptionStatusResponse,
request_options=request_options,
config=post_config,
service_name="Transcription",
)
if result is None:
return TranscriptionStatusResponse(
job_statuses=[
{
"job_id": str(job_id),
"exists": False,
"status": "unavailable",
}
for job_id in job_id_list
]
)
return result
get_transcription
async
get_transcription(request, job_id=Path(..., example=TranscriptionBaseResponse.model_config['json_schema_extra']['openapi_examples']['standard']['value']['job_id']))
Retrieve transcription for given job ID.
| PARAMETER | DESCRIPTION |
|---|---|
request
|
The currently performed request.
TYPE:
|
job_id
|
Unique identifier for transcription job
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
TranscriptionReceiveResponse
|
Contains job_id, transcription text if found, None if not found, translation and matched special terms.
TYPE:
|
Source code in docs/microservices/core/src/endpoints/transcription.py
@router.get(
"/retrieve_transcription/{job_id}",
summary="Retrieve transcription",
description="""
Retrieve the transcription text for the given job ID.
The transcription remains stored until deleted by the recurring deletion process.""",
response_model=TranscriptionReceiveResponse,
responses=add_auth_status_codes_to_responses(
{
200: {
"description": "Transcription retrieved (possibly empty)",
"content": {
"application/json": {
"examples": TranscriptionReceiveResponse.model_config[
"json_schema_extra"
]["openapi_examples"]
}
},
},
404: {
"description": "No transcription found for the given job ID.",
"content": {
"application/json": {
"example": {"detail": "No transcription found for job_id=..."}
}
},
},
500: {"description": "Internal Server Error"},
},
),
)
async def get_transcription(
request: Request,
job_id: UUID = Path(
...,
example=TranscriptionBaseResponse.model_config["json_schema_extra"][
"openapi_examples"
]["standard"]["value"]["job_id"],
),
) -> TranscriptionReceiveResponse:
"""Retrieve transcription for given job ID.
Args:
request (Request): The currently performed request.
job_id (UUID): Unique identifier for transcription job
Returns:
TranscriptionReceiveResponse: Contains job_id, transcription text if found,
None if not found, translation and matched special terms.
"""
settings = request.state.settings
transcription_endpoint = get_transcription_endpoint(settings)
url = f"{transcription_endpoint}/retrieve_transcription/{job_id}"
async with httpx.AsyncClient(timeout=30) as client:
response = await client.get(url=url)
payload = response.json()
if response.status_code != status.HTTP_200_OK:
error_detail = (
payload.get("detail", response.text)
if isinstance(payload, dict)
else response.text
)
raise HTTPException(status_code=response.status_code, detail=error_detail)
return TranscriptionReceiveResponse(**payload)
get_transcription_endpoint
Extract the transcription endpoint from the 'settings'.
post_transcription_docx_or_zip
async
Retrieve stored transcription as DOCX document.
| PARAMETER | DESCRIPTION |
|---|---|
request
|
The currently performed request.
TYPE:
|
items
|
The requested transcripts with speaker rename mapping.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
StreamingResponse
|
DOCX/ZIP file as downloadable response.
TYPE:
|
| RAISES | DESCRIPTION |
|---|---|
HTTPException
|
If payload is empty |
Source code in docs/microservices/core/src/endpoints/transcription.py
@router.post(
"/download_docx",
summary="Download a single DOCX or a ZIP of DOCX transcriptions",
description="Send a list of items with uuid and speaker mapping. If list has one item, returns a single DOCX. \
If multiple, returns a ZIP.",
response_class=StreamingResponse,
openapi_extra={
"requestBody": {
"content": {
"application/json": {
"examples": CreateWordFilePayload.model_config["json_schema_extra"][
"openapi_examples"
]
}
}
}
},
responses=add_auth_status_codes_to_responses(
{
200: {
"description": "DOCX or ZIP file",
"content": {
"application/vnd.openxmlformats-officedocument.wordprocessingml.document": {
"schema": {"type": "string", "format": "binary"}
},
"application/zip": {
"schema": {"type": "string", "format": "binary"}
},
},
},
400: {"description": "Payload must contain at least one item."},
404: {"description": "No transcription found for the given job ID."},
500: {"description": "Internal Server Error"},
},
),
)
async def post_transcription_docx_or_zip(
request: Request,
items: list[CreateWordFilePayload],
) -> StreamingResponse:
"""Retrieve stored transcription as DOCX document.
Args:
request (Request): The currently performed request.
items (list[CreateWordFilePayload]): The requested transcripts with speaker rename mapping.
Returns:
StreamingResponse: DOCX/ZIP file as downloadable response.
Raises:
HTTPException: If payload is empty
"""
settings = request.state.settings
transcription_endpoint = get_transcription_endpoint(settings)
url = f"{transcription_endpoint}/download_docx"
if not items:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Payload must contain at least one item.",
)
client = httpx.AsyncClient(timeout=PostConfig().timeout_in_s)
resp_context_manager = client.stream("POST", url, json=jsonable_encoder(items))
resp = await resp_context_manager.__aenter__()
if resp.status_code != status.HTTP_200_OK:
close_streaming_client_context_manager(resp, resp_context_manager, client)
headers = {
k: v
for k, v in resp.headers.items()
if k.lower()
in {
"content-type",
"content-disposition",
"content-length",
}
}
return StreamingResponse(
stream_bytes_from_streaming_response(resp, resp_context_manager, client),
headers=headers,
media_type=headers.get("content-type", "application/octet-stream"),
status_code=resp.status_code,
)
translate
async
Translate the translation_chunks to German.
| PARAMETER | DESCRIPTION |
|---|---|
request
|
The currently performed request.
TYPE:
|
translation_text
|
Text to translate
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
translation
|
The full combined translated text
TYPE:
|
Source code in docs/microservices/core/src/endpoints/transcription.py
@router.post(
"/translate",
summary="Translate text to German",
description=("Uses an LLM to translate a text to German."),
response_model=TranslateAPIOutput,
openapi_extra={
"requestBody": {
"content": {
"application/json": {
"examples": TranslateRequest.model_config["json_schema_extra"][
"openapi_examples"
]
}
}
}
},
responses=add_auth_status_codes_to_responses(
{
200: {
"description": "Successful translation response.",
"content": {
"application/json": {
"examples": {
"translation": TranslateAPIOutput.model_config[
"json_schema_extra"
]["openapi_examples"],
}
}
},
},
404: {"description": "Job not found or no chunks available"},
500: {"description": "Internal Server Error"},
},
),
)
async def translate(
request: Request,
translation_text: TranslateRequest,
) -> TranslateAPIOutput:
"""Translate the translation_chunks to German.
Args:
request (Request): The currently performed request.
translation_text (TranslateRequest): Text to translate
Returns:
translation (TranslateAPIOutput): The full combined translated text
"""
settings = request.state.settings
transcription_endpoint = get_transcription_endpoint(settings)
url = f"{transcription_endpoint}/translate"
post_config = settings.inter_service_communication.transcription
request_options = {
"json": translation_text.model_dump(),
}
result = await async_post(
url=url,
response_model=TranslateAPIOutput,
config=post_config,
request_options=request_options,
)
return result
upload
async
Upload file for transcription endpoint.
| PARAMETER | DESCRIPTION |
|---|---|
request
|
The currently performed request.
TYPE:
|
file
|
Uploaded file from client.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
TranscriptionBaseResponse
|
The unique job ID for tracking transcription status
TYPE:
|
Source code in docs/microservices/core/src/endpoints/transcription.py
@router.post(
"/upload",
summary="Upload file for transcription",
description="Upload an audio/video file. Returns a job ID to track transcription status.",
status_code=status.HTTP_201_CREATED,
response_model=TranscriptionBaseResponse,
responses=add_auth_status_codes_to_responses(
{
201: {
"description": "Successful upload response.",
"content": {
"application/json": {
"examples": TranscriptionBaseResponse.model_config[
"json_schema_extra"
]["openapi_examples"]
}
},
},
400: {"description": "There was an error parsing the body."},
413: {"description": "File too large."},
415: {
"description": "Uploaded file is invalid or in an unsupported state."
},
500: {"description": "Internal Server Error"},
},
),
)
async def upload(
request: Request,
file: UploadFile = File(
...,
description="Audio or video file to transcribe.",
),
) -> TranscriptionBaseResponse:
"""Upload file for transcription endpoint.
Args:
request (Request): The currently performed request.
file (File): Uploaded file from client.
Returns:
TranscriptionBaseResponse: The unique job ID for tracking transcription status
"""
settings = request.state.settings
transcription_endpoint = get_transcription_endpoint(settings)
url: str = f"{transcription_endpoint}/check_transcription_exists"
url = f"{transcription_endpoint}/upload"
files = [("file", (file.filename, file.file, file.content_type))]
request_options = {
"files": files,
}
post_config = settings.inter_service_communication.transcription
result = await async_post(
url=url,
response_model=TranscriptionBaseResponse,
config=post_config,
request_options=request_options,
service_name="Transcription",
)
return result
upload_special_terms
async
upload_special_terms(request, file=File(..., description='CSV/TXT with headers: Fachbegriff, Bedeutung, Themenbereich'))
Upserts new special terms to database.
| PARAMETER | DESCRIPTION |
|---|---|
request
|
The currently performed request.
TYPE:
|
file
|
Uploaded TXT/CSV file.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
UpsertTermsAPIOutput
|
Number of upsertions and details if there were problems
TYPE:
|
Source code in docs/microservices/core/src/endpoints/transcription.py
@router.post(
"/special_terms/upload",
summary="Upload/update special terms (CSV/TXT)",
description="""Accepts a file with columns (Fachbegriff, Bedeutung, Themenbereich) and
forwards it to the transcription to upsert the special-terms DB.""",
response_model=UpsertTermsAPIOutput,
openapi_extra={
"requestBody": {
"content": {
"text/plain": {
"examples": {
"inline_example": {
"summary": "Simple CSV/TXT content",
"description": "Example file content with the required headers and a few entries.",
"value": (
"Fachbegriff, Bedeutung, Themenbereich\n"
"API, Schnittstelle, IT\n"
"und, Verbindungswort, Konjunktion"
),
}
}
}
}
}
},
responses=add_auth_status_codes_to_responses(
{
200: {
"description": "Upsert completed",
"content": {
"application/json": {
"examples": UpsertTermsAPIOutput.model_config[
"json_schema_extra"
]["openapi_examples"]
}
},
},
400: {"detail": "There was an error parsing the body."},
422: {"description": "Missing required columns."},
500: {"description": "Internal Server Error"},
},
),
)
async def upload_special_terms(
request: Request,
file: UploadFile = File(
..., description="CSV/TXT with headers: Fachbegriff, Bedeutung, Themenbereich"
),
) -> UpsertTermsAPIOutput:
"""Upserts new special terms to database.
Args:
request (Request): The currently performed request.
file (File): Uploaded TXT/CSV file.
Returns:
UpsertTermsAPIOutput: Number of upsertions and details if there were problems
"""
settings = request.state.settings
transcription_endpoint = get_transcription_endpoint(settings)
upload_url = f"{transcription_endpoint}/special_terms/upload"
files = [("file", (file.filename, file.file, file.content_type or "text/plain"))]
post_config = settings.inter_service_communication.transcription
request_options = {
"files": files,
}
result = await async_post(
url=upload_url,
response_model=UpsertTermsAPIOutput,
config=post_config,
request_options=request_options,
)
return result
feedback
Models and functions to store feedback for chat, RAG and summary.
| CLASS | DESCRIPTION |
|---|---|
TableChatFeedback |
Feedback for Chat. |
TableGeneralFeedback |
General Feedback. |
TableRAGFeedback |
Feedback for RAG. |
TableSummaryFeedback |
Feedback for Summary. |
| FUNCTION | DESCRIPTION |
|---|---|
cleanup_userfeedback |
Delete feedback entries older than given number of days. |
create_db_session |
Create the basic DB engine and session. |
get_db_session |
A SQLAlchemy database session for the feedback database bound in a context manager. |
save_chat_feedback |
Store a chat feedback in the database. |
save_general_feedback |
Store the general feedback in the database. |
save_rag_feedback |
Save RAG feedback in the database. |
save_summary_feedback |
Store a summary feedback in the database. |
TableChatFeedback
Bases: Base
Feedback for Chat.
Source code in docs/microservices/core/src/feedback.py
class TableChatFeedback(Base):
"""Feedback for Chat."""
__tablename__ = "feedback_chat"
id = Column(Integer, primary_key=True, autoincrement=True)
timestamp = Column(DateTime(timezone=True), default=func.now())
feedback = Column(Enum(FeedbackRating))
feedback_text = Column(Text)
user_prompt = Column(Text)
chat_answer = Column(Text)
llm_label = Column(Text)
llm_name = Column(Text)
llm_is_remote = Column(Boolean)
TableGeneralFeedback
Bases: Base
General Feedback.
Source code in docs/microservices/core/src/feedback.py
TableRAGFeedback
Bases: Base
Feedback for RAG.
Source code in docs/microservices/core/src/feedback.py
class TableRAGFeedback(Base):
"""Feedback for RAG."""
__tablename__ = "feedback_rag"
id = Column(Integer, primary_key=True, autoincrement=True)
timestamp = Column(DateTime(timezone=True), default=func.now())
feedback = Column(Enum(FeedbackRating))
feedback_text = Column(Text)
question = Column(Text)
source_type = Column(Text)
rag_answer = Column(Text)
source_chunks = Column(JSON)
llm_label = Column(Text)
llm_name = Column(Text)
llm_is_remote = Column(Boolean)
TableSummaryFeedback
Bases: Base
Feedback for Summary.
Source code in docs/microservices/core/src/feedback.py
class TableSummaryFeedback(Base):
"""Feedback for Summary."""
__tablename__ = "feedback_summary"
id = Column(Integer, primary_key=True, autoincrement=True)
timestamp = Column(DateTime(timezone=True), default=func.now())
feedback = Column(Enum(FeedbackRating))
feedback_text = Column(Text)
source_type = Column(Text)
text_length = Column(Integer)
desired_summary_length = Column(Integer)
summary_length = Column(Integer)
llm_label = Column(Text)
llm_name = Column(Text)
llm_is_remote = Column(Boolean)
cleanup_userfeedback
Delete feedback entries older than given number of days.
| PARAMETER | DESCRIPTION |
|---|---|
session
|
A database session
TYPE:
|
retention_days
|
Number of days to retain entries. Entries older than this will be deleted. Defaults to 90.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
bool
|
True if the commit was successful. |
| RAISES | DESCRIPTION |
|---|---|
SQLAlchemyError
|
Error if the commit fails. |
Source code in docs/microservices/core/src/feedback.py
def cleanup_userfeedback(
session: SessionDep,
retention_days: int = 90,
) -> bool:
"""Delete feedback entries older than given number of days.
Args:
session (Session): A database session
retention_days (int): Number of days to retain entries.
Entries older than this will be deleted. Defaults to 90.
Returns:
True if the commit was successful.
Raises:
SQLAlchemyError: Error if the commit fails.
"""
tables = [
TableGeneralFeedback,
TableChatFeedback,
TableRAGFeedback,
TableSummaryFeedback,
]
date_threshold = datetime.now(tz=UTC) - timedelta(days=retention_days)
for table in tables:
result = session.query(table).filter(table.timestamp < date_threshold).delete()
logger.info(
f"Cleaned up user feedback in Table {table.__tablename__}: {result} datapoints deleted."
)
# commit changes or rollback if something went wrong
try:
session.commit()
logger.debug("Old feedback entries successfully removed.")
return True
except SQLAlchemyError as e:
session.rollback()
logger.error(f"Failed to clean up feedback: {e}")
raise
finally:
session.close()
create_db_session
Create the basic DB engine and session.
| RETURNS | DESCRIPTION |
|---|---|
Session
|
A session factory for the feedback database |
| RAISES | DESCRIPTION |
|---|---|
SQLAlchemyError
|
If the connection to the database fails. |
Source code in docs/microservices/core/src/feedback.py
def create_db_session(settings: Settings) -> Session:
"""Create the basic DB engine and session.
Returns:
A session factory for the feedback database
Raises:
SQLAlchemyError: If the connection to the database fails.
"""
engine = create_engine(settings.feedback_db.url)
# Check connection
try:
with engine.connect() as __:
logger.debug("Connection to feedback-db OK")
except SQLAlchemyError as e:
logger.warning(f"Connection to feedback-db failed with error: {e}")
raise e
Base.metadata.create_all(engine)
return sessionmaker(bind=engine)
get_db_session
async
A SQLAlchemy database session for the feedback database bound in a context manager.
Returns: An active SQLAlchemy session connected to the feedback database.
Source code in docs/microservices/core/src/feedback.py
async def get_db_session(request: Request) -> Session:
"""A SQLAlchemy database session for the feedback database bound in a context manager.
Returns: An active SQLAlchemy session connected to the feedback database.
"""
session = request.state.db_session
with session.begin() as this_session:
yield this_session
save_chat_feedback
Store a chat feedback in the database.
This function creates a new record in the feedback_chat table. The timestamp field will be automatically set to the current timestamp by SQLAlchemy. The id field will be auto-incremented.
| PARAMETER | DESCRIPTION |
|---|---|
session
|
A database session
TYPE:
|
chat_feedback
|
The feedback to be stored.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
bool
|
True if the commit was successful. |
| RAISES | DESCRIPTION |
|---|---|
SQLAlchemyError
|
Error if the commit fails. |
Source code in docs/microservices/core/src/feedback.py
def save_chat_feedback(
session: SessionDep,
chat_feedback: ChatFeedback,
) -> bool:
"""Store a chat feedback in the database.
This function creates a new record in the feedback_chat table.
The timestamp field will be automatically set to the current timestamp by SQLAlchemy.
The id field will be auto-incremented.
Args:
session (Session): A database session
chat_feedback (ChatFeedback): The feedback to be stored.
Returns:
True if the commit was successful.
Raises:
SQLAlchemyError: Error if the commit fails.
"""
# Create a new record; the timestamp will be set automatically by SQLAlchemy to the current timestamp
# id will be auto-incremented
new_entry = TableChatFeedback(
feedback=chat_feedback.feedback,
feedback_text=chat_feedback.feedback_text,
user_prompt=chat_feedback.user_prompt,
chat_answer=chat_feedback.chat_answer,
llm_label=chat_feedback.llm.label,
llm_name=chat_feedback.llm.name,
llm_is_remote=chat_feedback.llm.is_remote,
)
# commit changes or rollback if something went wrong
try:
session.add(new_entry)
session.commit()
logger.debug("Feedback (chat) successfully saved in database")
return True
except SQLAlchemyError as e:
session.rollback()
logger.error(f"Failed to save feedback: {e}")
raise
finally:
session.close()
save_general_feedback
Store the general feedback in the database.
| PARAMETER | DESCRIPTION |
|---|---|
session
|
A database session
TYPE:
|
general_feedback
|
The general feedback to be saved.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
bool
|
True if the commit was successful. |
| RAISES | DESCRIPTION |
|---|---|
SQLAlchemyError
|
Error if the commit fails. |
Source code in docs/microservices/core/src/feedback.py
def save_general_feedback(
session: SessionDep,
general_feedback: GeneralFeedback,
) -> bool:
"""Store the general feedback in the database.
Args:
session (Session): A database session
general_feedback (GeneralFeedbackInput): The general feedback to be saved.
Returns:
True if the commit was successful.
Raises:
SQLAlchemyError: Error if the commit fails.
"""
new_entry = TableGeneralFeedback(
feedback=general_feedback.feedback,
feedback_text=general_feedback.feedback_text,
)
# commit changes or rollback if something went wrong
try:
session.add(new_entry)
session.commit()
logger.debug("Feedback (general) successfully saved in database")
return True
except SQLAlchemyError as e:
session.rollback()
logger.error(f"Failed to save feedback: {e}")
raise
finally:
session.close()
save_rag_feedback
Save RAG feedback in the database.
| PARAMETER | DESCRIPTION |
|---|---|
session
|
A database session
TYPE:
|
rag_feedback
|
The feedback to be stored.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
bool
|
True if the commit was successful. |
| RAISES | DESCRIPTION |
|---|---|
SQLAlchemyError
|
Error if the commit fails. |
Source code in docs/microservices/core/src/feedback.py
def save_rag_feedback(
session: SessionDep,
rag_feedback: RAGFeedback,
) -> bool:
"""Save RAG feedback in the database.
Args:
session (Session): A database session
rag_feedback (RAGFeedback): The feedback to be stored.
Returns:
True if the commit was successful.
Raises:
SQLAlchemyError: Error if the commit fails.
"""
new_entry = TableRAGFeedback(
feedback=rag_feedback.feedback,
feedback_text=rag_feedback.feedback_text,
question=rag_feedback.question,
rag_answer=rag_feedback.rag_answer,
source_type=rag_feedback.source_type,
source_chunks=json.dumps(rag_feedback.source_chunks),
llm_label=rag_feedback.llm.label,
llm_name=rag_feedback.llm.name,
llm_is_remote=rag_feedback.llm.is_remote,
)
# commit changes or rollback if something went wrong
try:
session.add(new_entry)
session.commit()
logger.debug("Feedback (rag) successfully saved in database")
return True
except SQLAlchemyError as e:
session.rollback()
logger.error(f"Failed to save feedback: {e}")
raise
finally:
session.close()
save_summary_feedback
Store a summary feedback in the database.
This function creates a new record in the feedback_summary-table The timestamp field will be automatically set to the current timestamp by SQLAlchemy. The id field will be auto-incremented.
| PARAMETER | DESCRIPTION |
|---|---|
session
|
A database session
TYPE:
|
summary_feedback
|
The feedback to be stored.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
bool
|
True if the commit was successful. |
| RAISES | DESCRIPTION |
|---|---|
SQLAlchemyError
|
Error if the commit fails. |
Source code in docs/microservices/core/src/feedback.py
def save_summary_feedback(
session: SessionDep,
summary_feedback: SummaryFeedback,
) -> bool:
"""Store a summary feedback in the database.
This function creates a new record in the feedback_summary-table
The timestamp field will be automatically set to the
current timestamp by SQLAlchemy.
The id field will be auto-incremented.
Args:
session (Session): A database session
summary_feedback (SummaryFeedback): The feedback to be stored.
Returns:
True if the commit was successful.
Raises:
SQLAlchemyError: Error if the commit fails.
"""
# Create a new record; the timestamp will be set automatically by SQLAlchemy to the current timestamp
# id will be auto-incremented
new_entry = TableSummaryFeedback(
feedback=summary_feedback.feedback,
feedback_text=summary_feedback.feedback_text,
source_type=summary_feedback.source_type,
text_length=summary_feedback.text_length,
desired_summary_length=summary_feedback.desired_summary_length,
summary_length=summary_feedback.summary_length,
llm_label=summary_feedback.llm.label,
llm_name=summary_feedback.llm.name,
llm_is_remote=summary_feedback.llm.is_remote,
)
# commit changes or rollback if something went wrong
try:
session.add(new_entry)
session.commit()
logger.debug("Feedback (summary) successfully saved in database")
return True
except SQLAlchemyError as e:
session.rollback()
logger.error(f"Failed to save feedback: {e}")
raise
finally:
session.close()
models
Data model classes for loading and validating API and configuration parameters.
| MODULE | DESCRIPTION |
|---|---|
api_input |
pydantic Models for API input parameters. |
api_output |
pydantic Models for API output parameters. |
general |
Load and check Settings from yml. |
api_input
pydantic Models for API input parameters.
| CLASS | DESCRIPTION |
|---|---|
ChatFeedback |
Model defining the input of the feedback for the chat. |
ChatInput |
Model defining the input of a valid chat request. |
ChatMessage |
Message input model used to store the content of chat messages. |
CheckTranscriptionExistsRequest |
Request body for checking transcription status by job IDs. |
CreateWordFilePayload |
Input payload describing how to build a Word transcript file. |
FeedbackLLM |
Model validating metadata for the LLM used in feedback. |
FeedbackRating |
Enum class specifying possible feedback ratings. |
GeneralFeedback |
Model defining the structure of feedback for all services. |
MetadataFilter |
Model defining the structure of a metadata filter. |
RAGFeedback |
Model defining the input of the feedback for the RAG. |
RAGInput |
Model defining the input of a valid RAG request. |
RestrictValuesForDBModel |
Model validating string fields before database insertion. |
SpeakerItem |
Speaker mapping used when generating word files. |
SummaryFeedback |
Model defining the input of the feedback for the summary. |
SummaryFileAPIInputParameters |
Model defining the input parameters as part of a valid Summary request for file endpoint. |
SummaryTextAPIInput |
Model defining the input of a valid Summary request for text endpoint. |
TranslateRequest |
Request body for translating free text. |
ChatFeedback
Bases: GeneralFeedback
Model defining the input of the feedback for the chat.
| ATTRIBUTE | DESCRIPTION |
|---|---|
user_prompt |
The original user prompt.
TYPE:
|
chat_answer |
The model’s answer being rated.
TYPE:
|
llm |
Information about the model used.
TYPE:
|
Source code in docs/microservices/core/src/models/api_input.py
class ChatFeedback(GeneralFeedback):
"""Model defining the input of the feedback for the chat.
Attributes:
user_prompt (str): The original user prompt.
chat_answer (str): The model’s answer being rated.
llm (FeedbackLLM): Information about the model used.
"""
user_prompt: str
chat_answer: str
llm: FeedbackLLM
ChatInput
Bases: BaseModel
Model defining the input of a valid chat request.
| ATTRIBUTE | DESCRIPTION |
|---|---|
new_message |
The new user message to be processed.
TYPE:
|
chat_history |
List of previous chat messages forming the conversation context.
TYPE:
|
language_model |
The identifier of the language model to use.
TYPE:
|
request_timestamp |
Timestamp of the request.
TYPE:
|
Source code in docs/microservices/core/src/models/api_input.py
class ChatInput(BaseModel):
"""Model defining the input of a valid chat request.
Attributes:
new_message (ChatMessage): The new user message to be processed.
chat_history (list[ChatMessage]): List of previous chat messages forming the conversation context.
language_model (str): The identifier of the language model to use.
request_timestamp (int): Timestamp of the request.
"""
new_message: ChatMessage
chat_history: list[ChatMessage] = []
language_model: str
request_timestamp: int
@property
def as_list(self) -> list[dict[str, str]]:
"""Transforms the chat history plus the new message into a list of dictionaries containing the role and message.
Returns:
Each dictionary contains keys 'role' and 'content'.
"""
chat_history_list = [
{"role": message.role, "content": message.content}
for message in self.chat_history
]
chat_history_list.append(
{"role": self.new_message.role, "content": self.new_message.content}
)
return chat_history_list
model_config = ConfigDict(
json_schema_extra={
"openapi_examples": {
"standard": {
"summary": "Simple chat input",
"description": "Standard input with short chat history.",
"value": {
"new_message": {
"role": "user",
"content": "What's the weather like today?",
},
"chat_history": [
{"role": "user", "content": "Hi"},
{
"role": "assistant",
"content": "Hello! How can I help you today?",
},
],
"language_model": "test_model_mock",
"request_timestamp": 1731252767,
},
}
}
}
)
ChatMessage
Bases: BaseModel
Message input model used to store the content of chat messages.
| ATTRIBUTE | DESCRIPTION |
|---|---|
content |
The textual content of the message.
TYPE:
|
role |
The role of the message sender. Must be one of "system", "user", "assistant". Defaults to "user".
TYPE:
|
Source code in docs/microservices/core/src/models/api_input.py
class ChatMessage(BaseModel):
"""Message input model used to store the content of chat messages.
Attributes:
content (str): The textual content of the message.
role (str): The role of the message sender. Must be one of "system", "user", "assistant".
Defaults to "user".
"""
content: str
role: Literal["user", "system", "assistant"] = "user"
CheckTranscriptionExistsRequest
Bases: RootModel[list[UUID]]
Request body for checking transcription status by job IDs.
| ATTRIBUTE | DESCRIPTION |
|---|---|
root |
A list of transcription job identifiers.
TYPE:
|
Source code in docs/microservices/core/src/models/api_input.py
class CheckTranscriptionExistsRequest(RootModel[list[UUID]]):
"""Request body for checking transcription status by job IDs.
Attributes:
root (list[UUID]): A list of transcription job identifiers.
"""
model_config = ConfigDict(
json_schema_extra={
"openapi_examples": {
"multiple_job_ids": {
"summary": "Check two job IDs",
"description": "Request a status check for multiple transcription jobs.",
"value": [
"123e4567-e89b-12d3-a456-426614174000",
"550e8400-e29b-41d4-a716-446655440000",
],
}
}
}
)
CreateWordFilePayload
Bases: BaseModel
Input payload describing how to build a Word transcript file.
| ATTRIBUTE | DESCRIPTION |
|---|---|
uuid |
Unique identifier of the transcription job this Word export belongs to.
TYPE:
|
speaker |
Mapping of speaker IDs to their (possibly edited) names used in the document.
TYPE:
|
Source code in docs/microservices/core/src/models/api_input.py
class CreateWordFilePayload(BaseModel):
"""Input payload describing how to build a Word transcript file.
Attributes:
uuid (UUID): Unique identifier of the transcription job this Word export belongs to.
speaker (list[SpeakerItem]): Mapping of speaker IDs to their (possibly edited) names used in the document.
"""
uuid: UUID
speaker: list[SpeakerItem] = Field(default_factory=list)
model_config = ConfigDict(
json_schema_extra={
"openapi_examples": {
"single_docx": {
"summary": "Single DOCX export",
"description": "Request a DOCX export for one transcription job including speaker mapping.",
"value": [
{
"uuid": "123e4567-e89b-12d3-a456-426614174000",
"speaker": [
{
"speaker-name-ID": "SPEAKER_00",
"speaker-name-edited": "Redner 1",
},
{
"speaker-name-ID": "SPEAKER_01",
"speaker-name-edited": "Rednerin 2",
},
],
}
],
}
}
}
)
FeedbackLLM
Bases: RestrictValuesForDBModel
Model validating metadata for the LLM used in feedback.
| ATTRIBUTE | DESCRIPTION |
|---|---|
is_remote |
Indicates whether the model was hosted remotely.
TYPE:
|
name |
The internal name of the LLM (e.g. 'llama3_70b_cloud').
TYPE:
|
label |
The label shown to the user for the model.
TYPE:
|
Source code in docs/microservices/core/src/models/api_input.py
class FeedbackLLM(RestrictValuesForDBModel):
"""Model validating metadata for the LLM used in feedback.
Attributes:
is_remote (bool): Indicates whether the model was hosted remotely.
name (str): The internal name of the LLM (e.g. 'llama3_70b_cloud').
label (str): The label shown to the user for the model.
"""
is_remote: bool
name: str
label: str
FeedbackRating
GeneralFeedback
Bases: RestrictValuesForDBModel
Model defining the structure of feedback for all services.
| ATTRIBUTE | DESCRIPTION |
|---|---|
feedback |
The general rating of the output.
TYPE:
|
feedback_text |
Optional free-text feedback from the user.
TYPE:
|
Source code in docs/microservices/core/src/models/api_input.py
MetadataFilter
Bases: BaseModel
Model defining the structure of a metadata filter.
| ATTRIBUTE | DESCRIPTION |
|---|---|
source |
The data source to filter on.
TYPE:
|
start_date |
The earliest date for filtering documents.
TYPE:
|
end_date |
The latest date for filtering documents.
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
serialize_date |
Ensure dates are serialized into ISO format. |
Source code in docs/microservices/core/src/models/api_input.py
class MetadataFilter(BaseModel):
"""Model defining the structure of a metadata filter.
Attributes:
source (str | None): The data source to filter on.
start_date (datetime.date | None): The earliest date for filtering documents.
end_date (datetime.date | None): The latest date for filtering documents.
"""
source: str | None = None
start_date: datetime.date | None = None
end_date: datetime.date | None = None
@field_serializer("start_date", "end_date")
def serialize_date(self, value: datetime.date) -> str | None:
"""Ensure dates are serialized into ISO format."""
return value.isoformat() if value else None
serialize_date
Ensure dates are serialized into ISO format.
RAGFeedback
Bases: GeneralFeedback
Model defining the input of the feedback for the RAG.
| ATTRIBUTE | DESCRIPTION |
|---|---|
question |
The question asked by the user.
TYPE:
|
source_type |
The data source type.
TYPE:
|
rag_answer |
The generated answer provided by the RAG system.
TYPE:
|
source_chunks |
The document chunks used to generate the answer.
TYPE:
|
llm |
Information about the model used.
TYPE:
|
Source code in docs/microservices/core/src/models/api_input.py
class RAGFeedback(GeneralFeedback):
"""Model defining the input of the feedback for the RAG.
Attributes:
question (str): The question asked by the user.
source_type (str): The data source type.
rag_answer (str): The generated answer provided by the RAG system.
source_chunks (list[dict]): The document chunks used to generate the answer.
llm (FeedbackLLM): Information about the model used.
"""
question: str
source_type: str
rag_answer: str
source_chunks: list[dict]
llm: FeedbackLLM
RAGInput
Bases: BaseModel
Model defining the input of a valid RAG request.
| ATTRIBUTE | DESCRIPTION |
|---|---|
question |
The user question to be answered using retrieved sources.
TYPE:
|
meta_data_filters |
List of metadata filters.
TYPE:
|
language_model |
The identifier of the language model to use.
TYPE:
|
request_timestamp |
Timestamp of the request.
TYPE:
|
max_chunks_to_use |
Optional limit for the number of retrieved chunks used in generation.
TYPE:
|
Source code in docs/microservices/core/src/models/api_input.py
class RAGInput(BaseModel):
"""Model defining the input of a valid RAG request.
Attributes:
question (str): The user question to be answered using retrieved sources.
meta_data_filters (list[MetadataFilter] | None): List of metadata filters.
language_model (str): The identifier of the language model to use.
request_timestamp (int): Timestamp of the request.
max_chunks_to_use (int | None): Optional limit for the number of retrieved chunks used in generation.
"""
question: str
#: The Qa Input contains a list of filters where each filter filters per source
#: Hence, a document needs to fulfill the requirements of at least one of those filters
#: In other words: The filters are connected with OR statements.
#: If no filters are given (empty list or None), then the whole database is used
meta_data_filters: list[MetadataFilter] | None = None
language_model: str
request_timestamp: int
max_chunks_to_use: int | None = None
model_config = ConfigDict(
json_schema_extra={
"openapi_examples": {
"simple": {
"summary": "Simple RAG input",
"description": "A minimal example of a RAG request with a question and default model, no filters.",
"value": {
"question": "What did the political parties decide regarding the pension system?",
"meta_data_filters": [],
"language_model": "test_model_mock",
"request_timestamp": 1731252767,
"max_chunks_to_use": None,
},
},
"with_filters": {
"summary": "RAG input with a single metadata filter",
"description": "Example of a RAG request that applies a metadata filter to restrict the sources.",
"value": {
"question": "Between which parties was the coalition agreement concluded?",
"meta_data_filters": [
{"source": "Coalition Agreement"},
],
"language_model": "test_model_mock",
"request_timestamp": 1731252767,
"max_chunks_to_use": 5,
},
},
}
}
)
RestrictValuesForDBModel
Bases: BaseModel
Model validating string fields before database insertion.
| METHOD | DESCRIPTION |
|---|---|
validate_values |
Checks input values for null bytes and strips whitespace. |
Source code in docs/microservices/core/src/models/api_input.py
class RestrictValuesForDBModel(BaseModel):
"""Model validating string fields before database insertion."""
@field_validator("*", mode="before")
def validate_values(cls, v: object) -> object: # noqa: N805
"""Checks input values for null bytes and strips whitespace."""
if isinstance(v, str):
if "\x00" in v:
raise ValueError("String contains null bytes, not allowed.")
v = v.strip()
return v
validate_values
Checks input values for null bytes and strips whitespace.
Source code in docs/microservices/core/src/models/api_input.py
SpeakerItem
Bases: BaseModel
Speaker mapping used when generating word files.
| ATTRIBUTE | DESCRIPTION |
|---|---|
speaker_name_edited |
Human-readable speaker name that may be corrected/edited for output.
TYPE:
|
speaker_name_id |
Stable identifier used to map transcript segments to a speaker.
TYPE:
|
Source code in docs/microservices/core/src/models/api_input.py
class SpeakerItem(BaseModel):
"""Speaker mapping used when generating word files.
Attributes:
speaker_name_edited (str): Human-readable speaker name that may be corrected/edited for output.
speaker_name_id (str): Stable identifier used to map transcript segments to a speaker.
"""
speaker_name_edited: str = Field("", alias="speaker-name-edited")
speaker_name_id: str = Field(..., alias="speaker-name-ID")
SummaryFeedback
Bases: GeneralFeedback
Model defining the input of the feedback for the summary.
| ATTRIBUTE | DESCRIPTION |
|---|---|
source_type |
Indicates the origin of the summarized text.
TYPE:
|
text_length |
The number of characters in the parsed input text.
TYPE:
|
desired_summary_length |
Desired length of the summary in characters.
TYPE:
|
summary_length |
Actual number of characters in the generated summary.
TYPE:
|
llm |
Information about the model used.
TYPE:
|
Source code in docs/microservices/core/src/models/api_input.py
class SummaryFeedback(GeneralFeedback):
"""Model defining the input of the feedback for the summary.
Attributes:
source_type (str): Indicates the origin of the summarized text.
text_length (conint): The number of characters in the parsed input text.
desired_summary_length (confloat): Desired length of the summary in characters.
summary_length (conint): Actual number of characters in the generated summary.
llm (FeedbackLLM): Information about the model used.
"""
source_type: str
# len(parsing_output) aus Summary Output
text_length: conint(ge=0, le=MAX_DB_FLOAT)
# output_length aus Summary Input ist in pages daher * 4000, um Anzahl der Zeichen zu bekommen
desired_summary_length: confloat(ge=0, le=MAX_DB_FLOAT)
# len(summary_length) aus Summary Output
summary_length: conint(ge=0, le=MAX_DB_FLOAT)
llm: FeedbackLLM
SummaryFileAPIInputParameters
Bases: BaseModel
Model defining the input parameters as part of a valid Summary request for file endpoint.
Due to technical reasons "file (Uploadfile)" can not be part of this pydantic model
even though it is part of the file endpoints input.
| ATTRIBUTE | DESCRIPTION |
|---|---|
language_model |
The name or identifier of the language model to use.
TYPE:
|
output_length |
Desired length of summary output as number of DIN-A4 pages. Default is 0, which will lead to no summary length restrictions. This is the fastest option since the LLM will decide, which length is the most suitable one.
TYPE:
|
topics |
User input text stating the topics the summary should focus on. Default is None, which will lead to a general summary without any focus topics.
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
as_form |
Creates an instance of |
Source code in docs/microservices/core/src/models/api_input.py
class SummaryFileAPIInputParameters(BaseModel):
"""Model defining the input parameters as part of a valid Summary request for file endpoint.
Note: Due to technical reasons "file (Uploadfile)" can not be part of this pydantic model
even though it is part of the file endpoints input.
Attributes:
language_model (str): The name or identifier of the language model to use.
output_length (NonNegativeFloat, optional): Desired length of summary output as number of DIN-A4 pages.
Default is 0, which will lead to no summary length restrictions. This is the fastest option since
the LLM will decide, which length is the most suitable one.
topics (str | None): User input text stating the topics the summary should focus on.
Default is None, which will lead to a general summary without any focus topics.
"""
language_model: str
output_length: NonNegativeFloat = 0
topics: str | None = None
model_config = ConfigDict(
json_schema_extra={
"openapi_examples": {
"simple_summary": {
"summary": "Simple summary request",
"description": (
"Example input for a summary without predefined output length or focus topic settings."
),
"value": {
"file": "tests/data/txt-testfile.txt",
"language_model": "test_model_mock",
},
},
"fast_summary": {
"summary": "Fast summary request",
"description": (
"Example input for a fast summary. Using a txt file without predefined output length but "
"with focus topic setting."
),
"value": {
"file": "tests/data/txt-testfile.txt",
"language_model": "test_model_mock",
"topics": "Städte",
},
},
"output_length_summary": {
"summary": "Summary request with output length",
"description": "Example input for a summary with predefined output length.",
"value": {
"file": "tests/data/txt-testfile.txt",
"language_model": "test_model_mock",
"output_length": 2.5,
},
},
"focus_topic_summary": {
"summary": "Summary request with focus topic",
"description": "Example input for a summary with focus on a specific topic.",
"value": {
"file": "tests/data/pdf-testfile.pdf",
"language_model": "test_model_mock",
"topics": "Open Source, Community-Gedanke",
},
},
}
}
)
@classmethod
def as_form(
cls,
language_model: str = Form(
...,
description="The name or identifier of the language model to use.",
example="test_model_mock",
),
output_length: float = Form(
0,
description=(
"Desired summary length in DIN-A4 pages. "
"Default 0 = no restriction and high performance."
),
example=3.0,
ge=0,
),
topics: str | None = Form(
None,
description=(
"Comma-separated topics the summary should focus on. "
"Default None = general summary without any focus."
),
example="public administration, artificial intelligence, digitization",
),
) -> Self:
"""Creates an instance of `SummaryFileAPIInputParameters` from multipart form-data.
FastAPI does not automatically map Pydantic models from form-data in
multipart requests. This helper enables the model to be used together
with file uploads by defining how form parameters should be parsed.
Args:
language_model (str): Selected language model.
output_length (float): Desired summary length in pages.
topics (str | None): User-defined focus topics.
Returns:
A validated input parameter set.
"""
return cls(
language_model=language_model,
output_length=output_length,
topics=topics,
)
as_form
classmethod
as_form(language_model=Form(..., description='The name or identifier of the language model to use.', example='test_model_mock'), output_length=Form(0, description='Desired summary length in DIN-A4 pages. Default 0 = no restriction and high performance.', example=3.0, ge=0), topics=Form(None, description='Comma-separated topics the summary should focus on. Default None = general summary without any focus.', example='public administration, artificial intelligence, digitization'))
Creates an instance of SummaryFileAPIInputParameters from multipart form-data.
FastAPI does not automatically map Pydantic models from form-data in multipart requests. This helper enables the model to be used together with file uploads by defining how form parameters should be parsed.
| PARAMETER | DESCRIPTION |
|---|---|
language_model
|
Selected language model.
TYPE:
|
output_length
|
Desired summary length in pages.
TYPE:
|
topics
|
User-defined focus topics.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Self
|
A validated input parameter set. |
Source code in docs/microservices/core/src/models/api_input.py
@classmethod
def as_form(
cls,
language_model: str = Form(
...,
description="The name or identifier of the language model to use.",
example="test_model_mock",
),
output_length: float = Form(
0,
description=(
"Desired summary length in DIN-A4 pages. "
"Default 0 = no restriction and high performance."
),
example=3.0,
ge=0,
),
topics: str | None = Form(
None,
description=(
"Comma-separated topics the summary should focus on. "
"Default None = general summary without any focus."
),
example="public administration, artificial intelligence, digitization",
),
) -> Self:
"""Creates an instance of `SummaryFileAPIInputParameters` from multipart form-data.
FastAPI does not automatically map Pydantic models from form-data in
multipart requests. This helper enables the model to be used together
with file uploads by defining how form parameters should be parsed.
Args:
language_model (str): Selected language model.
output_length (float): Desired summary length in pages.
topics (str | None): User-defined focus topics.
Returns:
A validated input parameter set.
"""
return cls(
language_model=language_model,
output_length=output_length,
topics=topics,
)
SummaryTextAPIInput
Bases: BaseModel
Model defining the input of a valid Summary request for text endpoint.
| ATTRIBUTE | DESCRIPTION |
|---|---|
text |
The text content to be summarized.
TYPE:
|
language_model |
The name or identifier of the language model to use.
TYPE:
|
output_length |
Desired length of summary output as number of DIN-A4 pages. Default is 0, which will lead to no summary length restrictions. This is the fastest option since the LLM will decide, which length is the most suitable one.
TYPE:
|
topics |
User input text stating the topics the summary should focus on. Default is None, which will lead to a general summary without any focus topics.
TYPE:
|
Source code in docs/microservices/core/src/models/api_input.py
class SummaryTextAPIInput(BaseModel):
"""Model defining the input of a valid Summary request for text endpoint.
Attributes:
text (str): The text content to be summarized.
language_model (str): The name or identifier of the language model to use.
output_length (NonNegativeFloat, optional): Desired length of summary output as number of DIN-A4 pages.
Default is 0, which will lead to no summary length restrictions. This is the fastest option since
the LLM will decide, which length is the most suitable one.
topics (str | None): User input text stating the topics the summary should focus on.
Default is None, which will lead to a general summary without any focus topics.
"""
text: str
language_model: str
output_length: NonNegativeFloat = 0
topics: str | None = None
model_config = ConfigDict(
json_schema_extra={
"openapi_examples": {
"simple_summary": {
"summary": "Simple summary request",
"description": (
"Example input for a summary without predefined output length or focus topic settings."
),
"value": {
"text": (
"F13 ist eine vollständig souveräne und Modell-agnostische KI-Assistenz, die auf eigener "
"Infrastruktur betrieben werden kann und von der Verwaltung für die Verwaltung entwickelt "
"wurde. F13 stellt eine Vielzahl modularer, flexibel einsetzbarer KI-Funktionen bereit – "
"zum Verarbeiten von Informationen, Erstellen von Texten, oder zum Recherchieren. Nutzende "
"können mit F13 gezielt, wirksam und sicher Generative Künstliche Intelligenz bei "
"täglichen Büroarbeiten einsetzen. Mit dem Übergang in eine Open-Source-Weiterentwicklung "
"wird F13 seit Juli 2025 als gemeinschaftliches Vorhaben geöffnet: Der Quellcode steht "
"frei zur Verfügung, die Nutzung ist offen. Interessierte Verwaltungen, Organisationen "
"sowie Entwicklerinnen und Entwickler sind eingela-den, das System für eigene Bedarfe "
"anzupassen, weiterzuentwickeln und diese Entwicklungen wiederum Open Source "
"bereitzustellen."
),
"language_model": "test_model_mock",
},
},
"fast_summary": {
"summary": "Fast summary request",
"description": (
"Example input for a fast summary without predefined output length but with focus topic "
"setting."
),
"value": {
"text": (
"F13 ist eine vollständig souveräne und Modell-agnostische KI-Assistenz, die auf eigener "
"Infrastruktur betrieben werden kann und von der Verwaltung für die Verwaltung entwickelt "
"wurde. F13 stellt eine Vielzahl modularer, flexibel einsetzbarer KI-Funktionen bereit – "
"zum Verarbeiten von Informationen, Erstellen von Texten, oder zum Recherchieren. Nutzende "
"können mit F13 gezielt, wirksam und sicher Generative Künstliche Intelligenz bei "
"täglichen Büroarbeiten einsetzen. Mit dem Übergang in eine Open-Source-Weiterentwicklung "
"wird F13 seit Juli 2025 als gemeinschaftliches Vorhaben geöffnet: Der Quellcode steht "
"frei zur Verfügung, die Nutzung ist offen. Interessierte Verwaltungen, Organisationen "
"sowie Entwicklerinnen und Entwickler sind eingela-den, das System für eigene Bedarfe "
"anzupassen, weiterzuentwickeln und diese Entwicklungen wiederum Open Source "
"bereitzustellen."
),
"language_model": "test_model_mock",
"topics": "KI, Papier",
},
},
"output_length_summary": {
"summary": "Summary request with output length",
"description": "Example input for a summary with predefined output length",
"value": {
"text": (
"F13 ist eine vollständig souveräne und Modell-agnostische KI-Assistenz, die auf eigener "
"Infrastruktur betrieben werden kann und von der Verwaltung für die Verwaltung entwickelt "
"wurde. F13 stellt eine Vielzahl modularer, flexibel einsetzbarer KI-Funktionen bereit – "
"zum Verarbeiten von Informationen, Erstellen von Texten, oder zum Recherchieren. Nutzende "
"können mit F13 gezielt, wirksam und sicher Generative Künstliche Intelligenz bei "
"täglichen Büroarbeiten einsetzen. Mit dem Übergang in eine Open-Source-Weiterentwicklung "
"wird F13 seit Juli 2025 als gemeinschaftliches Vorhaben geöffnet: Der Quellcode steht "
"frei zur Verfügung, die Nutzung ist offen. Interessierte Verwaltungen, Organisationen "
"sowie Entwicklerinnen und Entwickler sind eingela-den, das System für eigene Bedarfe "
"anzupassen, weiterzuentwickeln und diese Entwicklungen wiederum Open Source "
"bereitzustellen."
),
"output_length": 2.5,
"language_model": "test_model_mock",
},
},
"focus_topic_summary": {
"summary": "Summary request with focus topic",
"description": "Example input for a summary with focus on a specific topic.",
"value": {
"text": (
"F13 ist eine vollständig souveräne und Modell-agnostische KI-Assistenz, die auf eigener "
"Infrastruktur betrieben werden kann und von der Verwaltung für die Verwaltung entwickelt "
"wurde. F13 stellt eine Vielzahl modularer, flexibel einsetzbarer KI-Funktionen bereit – "
"zum Verarbeiten von Informationen, Erstellen von Texten, oder zum Recherchieren. Nutzende "
"können mit F13 gezielt, wirksam und sicher Generative Künstliche Intelligenz bei "
"täglichen Büroarbeiten einsetzen. Mit dem Übergang in eine Open-Source-Weiterentwicklung "
"wird F13 seit Juli 2025 als gemeinschaftliches Vorhaben geöffnet: Der Quellcode steht "
"frei zur Verfügung, die Nutzung ist offen. Interessierte Verwaltungen, Organisationen "
"sowie Entwicklerinnen und Entwickler sind eingeladen, das System für eigene Bedarfe "
"anzupassen, weiterzuentwickeln und diese Entwicklungen wiederum Open Source "
"bereitzustellen."
),
"topics": "Open Source, Community-Gedanke",
"language_model": "test_model_mock",
},
},
}
}
)
TranslateRequest
Bases: BaseModel
Request body for translating free text.
| ATTRIBUTE | DESCRIPTION |
|---|---|
text |
Raw text that should be translated to the configured target language.
TYPE:
|
Source code in docs/microservices/core/src/models/api_input.py
class TranslateRequest(BaseModel):
"""Request body for translating free text.
Attributes:
text (str): Raw text that should be translated to the configured target language.
"""
text: str
model_config = ConfigDict(
json_schema_extra={
"openapi_examples": {
"translate_to_german": {
"summary": "Translate a short text to German",
"description": "Minimal request translating free text to the target language (German).",
"value": {
"text": (
"Hello everyone, welcome to our weekly status meeting. "
"Please summarize the action items."
),
},
}
}
}
)
api_output
pydantic Models for API output parameters.
| CLASS | DESCRIPTION |
|---|---|
ChatOutput |
Chat response model of chat output. |
ChatStreamOutput |
Chat stream response model of chat output. |
InnerTranscriptionReceiveResponse |
Inner model for receiving transcription output. |
MatchedTermItem |
Matched term entry with meaning and occurrence count. |
RAGDatabaseSources |
Model defining Database-Sources. |
RAGOutput |
RAG response model defining RAG output. |
RAGSourceDocument |
Model defining source object of RAG output. |
SummaryAPIOutput |
Summary response output of summary generation. |
TranscriptionBaseResponse |
Base model for transcription responses containing a job ID. |
TranscriptionDialogItem |
Dialog item with speaker metadata, timestamp, and parts. |
TranscriptionDialogPart |
Single dialog part containing the spoken text and its score. |
TranscriptionReceiveResponse |
Response model for receiving transcription output. |
TranscriptionStatusResponse |
Response model for the transcription existence check endpoint. |
TranslateAPIOutput |
Output of translation API. |
UpsertTermsAPIOutput |
Output of upsert term API. |
ChatOutput
Bases: BaseModel
Chat response model of chat output.
| ATTRIBUTE | DESCRIPTION |
|---|---|
response |
The generated chat response.
TYPE:
|
reason |
Optional reasoning or explanation for the response.
TYPE:
|
Source code in docs/microservices/core/src/models/api_output.py
class ChatOutput(BaseModel):
"""Chat response model of chat output.
Attributes:
response (str): The generated chat response.
reason (str | None): Optional reasoning or explanation for the response.
"""
response: str
reason: str | None = None
model_config = ConfigDict(
json_schema_extra={
"openapi_examples": {
"simple": {
"summary": "Simple chat response",
"description": "Used for models that produce a simple response without reasoning.",
"value": {
"response": "The weather is nice today.",
},
},
"reasoning": {
"summary": "Response with reasoning",
"description": "Used for reasoning-enabled models, showing both the answer and its explanation.",
"value": {
"response": "The weather is nice today.",
"reason": "It is sunny outside.",
},
},
}
}
)
ChatStreamOutput
Bases: BaseModel
Chat stream response model of chat output.
| ATTRIBUTE | DESCRIPTION |
|---|---|
type |
The kind of output. One of 'reason', 'response', or 'error'.
TYPE:
|
content |
Partial text content of the stream if type != error.
TYPE:
|
finish_reason |
Optional finish reason from the model.
TYPE:
|
error |
Error message if type == 'error'.
TYPE:
|
Source code in docs/microservices/core/src/models/api_output.py
class ChatStreamOutput(BaseModel):
"""Chat stream response model of chat output.
Attributes:
type (str): The kind of output. One of 'reason', 'response', or 'error'.
content (str | None): Partial text content of the stream if type != error.
finish_reason (str | None): Optional finish reason from the model.
error (str | None): Error message if type == 'error'.
"""
type: Literal["reason", "response", "error"]
content: str | None = None
finish_reason: str | None = None
error: str | None = None
model_config = ConfigDict(
json_schema_extra={
"openapi_examples": {
"response": {
"summary": "Chat response",
"description": "Standard response returned by the chat model when a normal message is processed.",
"value": {
"type": "response",
"content": "Hello, how can I help you today?",
"finish_reason": None,
},
},
"reason": {
"summary": "Reason output",
"description": "Response including the reasoning or explanation of the model's output.",
"value": {
"type": "reason",
"content": "User said hello. I will answer politely.",
"finish_reason": None,
},
},
"error": {
"summary": "Error output",
"description": (
"Output returned when an error occurs, ",
"e.g., an internal server error during processing.",
),
"value": {
"type": "error",
"error": "Internal Error during streaming.",
},
},
}
}
)
InnerTranscriptionReceiveResponse
Bases: TranscriptionBaseResponse
Inner model for receiving transcription output.
| ATTRIBUTE | DESCRIPTION |
|---|---|
job_id |
Unique identifier for the transcription job (inherited)
TYPE:
|
dialog |
The transcription data with speakers, timestamps, and parts
TYPE:
|
translation |
The translation or None if not available
TYPE:
|
matched_terms |
The matched special terms
TYPE:
|
Source code in docs/microservices/core/src/models/api_output.py
class InnerTranscriptionReceiveResponse(TranscriptionBaseResponse):
"""Inner model for receiving transcription output.
Attributes:
job_id (UUID): Unique identifier for the transcription job (inherited)
dialog (list[TranscriptionDialogItem]): The transcription data with speakers, timestamps, and parts
translation (str | None): The translation or None if not available
matched_terms (dict[str, list[MatchedTermItem]] | None): The matched special terms
"""
dialog: list[TranscriptionDialogItem] = Field(
...,
description="The transcription data with speakers, timestamps, and parts",
)
translation: str | None = Field(
None, description="The translation or None if not available"
)
matched_terms: dict[str, list[MatchedTermItem]] | None = Field(
None, description="The matched special terms"
)
MatchedTermItem
Bases: BaseModel
Matched term entry with meaning and occurrence count.
| ATTRIBUTE | DESCRIPTION |
|---|---|
fachbegriff |
The special term.
TYPE:
|
bedeutung |
The meaning of the special term.
TYPE:
|
count |
The number of occurrences of the term.
TYPE:
|
Source code in docs/microservices/core/src/models/api_output.py
class MatchedTermItem(BaseModel):
"""Matched term entry with meaning and occurrence count.
Attributes:
fachbegriff (str): The special term.
bedeutung (str): The meaning of the special term.
count (int): The number of occurrences of the term.
"""
fachbegriff: str = Field(..., alias="Fachbegriff")
bedeutung: str = Field(..., alias="Bedeutung")
count: int
model_config = ConfigDict(populate_by_name=True)
RAGDatabaseSources
Bases: BaseModel
Model defining Database-Sources.
| ATTRIBUTE | DESCRIPTION |
|---|---|
name |
Name of the database source.
TYPE:
|
date_filter |
Indicates whether date-based filtering is applied.
TYPE:
|
Source code in docs/microservices/core/src/models/api_output.py
class RAGDatabaseSources(BaseModel):
"""Model defining Database-Sources.
Attributes:
name (str): Name of the database source.
date_filter (bool): Indicates whether date-based filtering is applied.
"""
name: str
date_filter: bool
model_config = ConfigDict(
json_schema_extra={
"openapi_examples": {
"standard": {
"summary": "Standard database source",
"description": "A typical database source with date-based filtering enabled.",
"value": {
"name": "Coalition Agreement",
"date_filter": False,
},
},
}
}
)
RAGOutput
Bases: BaseModel
RAG response model defining RAG output.
| ATTRIBUTE | DESCRIPTION |
|---|---|
question |
The user question that triggered the retrieval.
TYPE:
|
answer |
The generated answer based on the retrieved sources.
TYPE:
|
sources |
List of source documents used in the response.
TYPE:
|
Source code in docs/microservices/core/src/models/api_output.py
class RAGOutput(BaseModel):
"""RAG response model defining RAG output.
Attributes:
question (str): The user question that triggered the retrieval.
answer (str): The generated answer based on the retrieved sources.
sources (list[RAGSourceDocument]): List of source documents used in the response.
"""
question: str
answer: str
sources: list[RAGSourceDocument]
model_config = ConfigDict(
json_schema_extra={
"openapi_examples": {
"simple": {
"summary": "RAG output example",
"description": (
"Example of a RAG response including the user question, ",
"answer and retrieved sources.",
),
"value": {
"question": "What did the political parties decide regarding the pension system?",
"answer": "The parties decided to improve the pension system.",
"sources": [
{
"content": "We decided to improve the pension system.",
"meta": {
"source": "Coalition Agreement",
"date": "2025-11-07",
},
"url": "https://example.com/coalitionagreement.pdf",
}
],
},
}
}
}
)
RAGSourceDocument
Bases: BaseModel
Model defining source object of RAG output.
| ATTRIBUTE | DESCRIPTION |
|---|---|
content |
The text content of the retrieved document.
TYPE:
|
meta |
Metadata about the document (e.g. title, date, source type).
TYPE:
|
url |
Optional URL pointing to the original document.
TYPE:
|
Source code in docs/microservices/core/src/models/api_output.py
class RAGSourceDocument(BaseModel):
"""Model defining source object of RAG output.
Attributes:
content (str): The text content of the retrieved document.
meta (dict): Metadata about the document (e.g. title, date, source type).
url (str | None): Optional URL pointing to the original document.
"""
content: str
meta: dict
url: str | None = None
SummaryAPIOutput
Bases: BaseModel
Summary response output of summary generation.
| ATTRIBUTE | DESCRIPTION |
|---|---|
summary |
The generated summary text.
TYPE:
|
parsed_text |
The parsed and preprocessed source text used for summarization.
TYPE:
|
warning_msg |
Optional warning message (e.g. about text length of the generated summary).
TYPE:
|
Source code in docs/microservices/core/src/models/api_output.py
class SummaryAPIOutput(BaseModel):
"""Summary response output of summary generation.
Attributes:
summary (str): The generated summary text.
parsed_text (str): The parsed and preprocessed source text used for summarization.
warning_msg (str): Optional warning message (e.g. about text length of the generated summary).
"""
summary: str
parsed_text: str
warning_msg: str
model_config = ConfigDict(
json_schema_extra={
"openapi_examples": {
"summary_output": {
"summary": "Summary output with warning",
"description": "Example showing a summary with a warning message.",
"value": {
"summary": "This is the generated summary of the document.",
"parsed_text": "Original source text preprocessed for summarization.",
"warning_msg": "This is a message to the user encompassing hints or warnings.",
},
},
}
}
)
TranscriptionBaseResponse
Bases: BaseModel
Base model for transcription responses containing a job ID.
| ATTRIBUTE | DESCRIPTION |
|---|---|
job_id |
Unique identifier for the job
TYPE:
|
Source code in docs/microservices/core/src/models/api_output.py
class TranscriptionBaseResponse(BaseModel):
"""Base model for transcription responses containing a job ID.
Attributes:
job_id (UUID): Unique identifier for the job
"""
job_id: UUID
model_config = ConfigDict(
json_schema_extra={
"openapi_examples": {
"standard": {
"summary": "Simple upload output.",
"description": "Example of a standard upload response.",
"value": {
"job_id": "123e4567-e89b-12d3-a456-426614174000",
},
},
}
}
)
TranscriptionDialogItem
Bases: BaseModel
Dialog item with speaker metadata, timestamp, and parts.
| ATTRIBUTE | DESCRIPTION |
|---|---|
speaker_name |
The name of the speaker.
TYPE:
|
time_stamp |
The timestamp of the dialog item.
TYPE:
|
part |
The parts of the dialog item.
TYPE:
|
Source code in docs/microservices/core/src/models/api_output.py
class TranscriptionDialogItem(BaseModel):
"""Dialog item with speaker metadata, timestamp, and parts.
Attributes:
speaker_name (str): The name of the speaker.
time_stamp (str): The timestamp of the dialog item.
part (list[TranscriptionDialogPart]): The parts of the dialog item.
"""
speaker_name: str = Field(..., alias="speaker-name")
time_stamp: str = Field(..., alias="time-stamp")
part: list[TranscriptionDialogPart]
model_config = ConfigDict(populate_by_name=True)
TranscriptionDialogPart
Bases: BaseModel
Single dialog part containing the spoken text and its score.
| ATTRIBUTE | DESCRIPTION |
|---|---|
text |
The spoken text of this part.
TYPE:
|
score |
The confidence score of the transcription for this part.
TYPE:
|
Source code in docs/microservices/core/src/models/api_output.py
TranscriptionReceiveResponse
Bases: BaseModel
Response model for receiving transcription output.
| ATTRIBUTE | DESCRIPTION |
|---|---|
transcript |
The transcription payload. |
Source code in docs/microservices/core/src/models/api_output.py
class TranscriptionReceiveResponse(BaseModel):
"""Response model for receiving transcription output.
Attributes:
transcript (InnerTranscriptionReceiveResponse): The transcription payload.
"""
transcript: InnerTranscriptionReceiveResponse
model_config = ConfigDict(
json_schema_extra={
"openapi_examples": {
"standard": {
"summary": "Simple receive transcription response.",
"description": "Example of a standard receive transcription response.",
"value": {
"transcript": {
"job_id": "Loremui32423",
"dialog": [
{
"speaker-name": "",
"time-stamp": "0:00-0:00",
"part": [
{
"text": "No audio found in this file",
"score": 100,
}
],
}
],
"translation": "Übersetztes Transkript",
"matched_terms": {
"IT": [
{
"Fachbegriff": "API",
"Bedeutung": "Schnittstelle",
"count": 3,
},
]
},
}
},
},
}
}
)
TranscriptionStatusResponse
Bases: BaseModel
Response model for the transcription existence check endpoint.
| ATTRIBUTE | DESCRIPTION |
|---|---|
response |
Mapping of job IDs to their status
TYPE:
|
Source code in docs/microservices/core/src/models/api_output.py
class TranscriptionStatusResponse(BaseModel):
"""Response model for the transcription existence check endpoint.
Attributes:
response (dict): Mapping of job IDs to their status
"""
response: dict[UUID, str]
model_config = ConfigDict(
json_schema_extra={
"openapi_examples": {
"standard": {
"summary": "Simple status response output.",
"description": "Example of a standard status response.",
"value": {
"response": {
"123e4567-e89b-12d3-a456-426614174000": "done",
"550e8400-e29b-41d4-a716-446655440000": "non_existent",
},
},
},
}
}
)
TranslateAPIOutput
Bases: BaseModel
Output of translation API.
| ATTRIBUTE | DESCRIPTION |
|---|---|
translation |
The translated text.
TYPE:
|
Source code in docs/microservices/core/src/models/api_output.py
class TranslateAPIOutput(BaseModel):
"""Output of translation API.
Attributes:
translation (str): The translated text.
"""
translation: str
model_config = ConfigDict(
json_schema_extra={
"openapi_examples": {
"standard": {
"summary": "Simple translation output.",
"description": "Example of a standard translation response.",
"value": {
"translation": "Übersetzter Gesamttext…",
},
},
}
}
)
UpsertTermsAPIOutput
Bases: BaseModel
Output of upsert term API.
| ATTRIBUTE | DESCRIPTION |
|---|---|
updated |
Number of terms that were updated.
TYPE:
|
detail |
Optional detail message, especially in case of errors.
TYPE:
|
Source code in docs/microservices/core/src/models/api_output.py
class UpsertTermsAPIOutput(BaseModel):
"""Output of upsert term API.
Attributes:
updated (int): Number of terms that were updated.
detail (str | None): Optional detail message, especially in case of errors.
"""
updated: int
detail: str | None = None
model_config = ConfigDict(
json_schema_extra={
"openapi_examples": {
"standard": {
"summary": "Simple upsert terms output.",
"description": "Example of a standard upsert terms response.",
"value": {
"updated": 3,
"detail": None,
},
},
}
}
)
general
Load and check Settings from yml.
| CLASS | DESCRIPTION |
|---|---|
Authentication |
Authentication settings group. |
EnabledRoutes |
Configuration for enabling/disabling optional routes. |
FeedbackDbSettings |
Settings for the Feedback-Database. |
InterServiceCommunication |
Configuration of all microservice communications. |
LogLevel |
Enum class specifying possible log levels. |
PostConfig |
Configuration for async_post request to other microservices. |
Settings |
General Settings for the service. |
Authentication
Bases: BaseModel
Authentication settings group.
| ATTRIBUTE | DESCRIPTION |
|---|---|
model_config |
Used to ignore other services, which are defined in the config.
TYPE:
|
guest_mode |
Disable authentication
TYPE:
|
keycloak_base_url |
Base URL of the Keycloak identity provider.
TYPE:
|
keycloak_realm |
Keycloak realm to use for authentication.
TYPE:
|
audience |
Expected JWT audience ("aud") claim; used to validate access tokens.
TYPE:
|
openid_configuration_url |
The endpoint to query the OpenID-Connect configuration. This is not needed when 'guest_mode' is set to 'True'.
TYPE:
|
Source code in docs/microservices/core/src/models/general.py
class Authentication(BaseModel):
"""Authentication settings group.
Attributes:
model_config (ConfigDict): Used to ignore other services, which are defined in the config.
guest_mode (bool): Disable authentication
keycloak_base_url (AnyHttpUrl): Base URL of the Keycloak identity provider.
keycloak_realm (str): Keycloak realm to use for authentication.
audience (str): Expected JWT audience ("aud") claim; used to validate access tokens.
openid_configuration_url (AnyHttpUrl | None): The endpoint to query the OpenID-Connect configuration.
This is not needed when 'guest_mode' is set to 'True'.
"""
model_config = ConfigDict(extra="ignore", frozen=True)
guest_mode: bool = True
keycloak_base_url: AnyHttpUrl = "http://keycloak:8080"
keycloak_realm: str = "f13"
audience: None | str = "f13-api"
keycloak_client: str = "f13-api"
@property
def openid_configuration_url(self) -> AnyHttpUrl | None:
"""Return the OpenID-Connect configuration endpoint.
Returns:
the URL where Keycloak serves its OpenID-Connect configuration or `None` when `guest_mode` is `True`.
"""
if self.guest_mode:
return None
return HttpUrl(
f"{self.keycloak_base_url}/realms/{self.keycloak_realm}/.well-known/openid-configuration"
)
@property
def openid_token_url(self) -> None | AnyHttpUrl:
"""Return the OpenID Connect token endpoint URL.
Returns:
The Keycloak token endpoint URL, or None if in guest mode.
"""
if self.guest_mode:
return None
return HttpUrl(
f"{self.keycloak_base_url}/realms/{self.keycloak_realm}/protocol/openid-connect/token"
)
openid_configuration_url
property
Return the OpenID-Connect configuration endpoint.
| RETURNS | DESCRIPTION |
|---|---|
AnyHttpUrl | None
|
the URL where Keycloak serves its OpenID-Connect configuration or |
EnabledRoutes
Bases: BaseModel
Configuration for enabling/disabling optional routes.
Note: The 'core' route is always enabled and not configurable.
| ATTRIBUTE | DESCRIPTION |
|---|---|
chat |
Enable/disable the chat endpoint
TYPE:
|
rag |
Enable/disable the RAG (Retrieval-Augmented Generation) endpoint
TYPE:
|
summary |
Enable/disable the summary endpoint
TYPE:
|
feedback |
Enable/disable the feedback endpoint
TYPE:
|
transcription |
Enable/disable the transcription endpoint
TYPE:
|
Source code in docs/microservices/core/src/models/general.py
class EnabledRoutes(BaseModel):
"""Configuration for enabling/disabling optional routes.
Note: The 'core' route is always enabled and not configurable.
Attributes:
chat (bool): Enable/disable the chat endpoint
rag (bool): Enable/disable the RAG (Retrieval-Augmented Generation) endpoint
summary (bool): Enable/disable the summary endpoint
feedback (bool): Enable/disable the feedback endpoint
transcription (bool): Enable/disable the transcription endpoint
"""
model_config = ConfigDict(extra="ignore", frozen=True)
chat: bool = True
rag: bool = True
summary: bool = True
feedback: bool = True
transcription: bool = True
FeedbackDbSettings
Bases: BaseModel
Settings for the Feedback-Database.
| ATTRIBUTE | DESCRIPTION |
|---|---|
server |
Hostname of the database server
TYPE:
|
port |
Port number on which the database server is listening
TYPE:
|
dialect |
Database dialect type. Restricted to PostgreSQL since this is the only system we have tested and gained experience with so far.
TYPE:
|
db_name |
Name of the database
TYPE:
|
username |
Username for authenticating with the database
TYPE:
|
path_secret |
Path to the secret file containing the database password
TYPE:
|
Source code in docs/microservices/core/src/models/general.py
class FeedbackDbSettings(BaseModel):
"""Settings for the Feedback-Database.
Attributes:
server (str): Hostname of the database server
port (PositiveInt): Port number on which the database server is listening
dialect (Literal["postgresql"]): Database dialect type. Restricted to PostgreSQL
since this is the only system we have tested and gained experience with so far.
db_name (str): Name of the database
username (str): Username for authenticating with the database
path_secret (FilePath): Path to the secret file containing the database password
"""
model_config = ConfigDict(extra="ignore", frozen=True)
server: str = "feedback-db"
port: PositiveInt = 5432
dialect: Literal["postgresql"] = "postgresql"
db_name: str = "feedback"
username: str = "member"
path_secret: FilePath = Path("/core/secrets/feedback_db.secret")
@property
def url(self) -> str:
"""Construct the database URL using credentials from a secret file."""
with open(self.path_secret) as f:
password = f.read().splitlines()[0]
return f"{self.dialect}://{self.username}:{password}@{self.server}:{self.port}/{self.db_name}"
InterServiceCommunication
Bases: BaseModel
Configuration of all microservice communications.
| ATTRIBUTE | DESCRIPTION |
|---|---|
parser |
Default configuration for parsing microservice.
TYPE:
|
summary |
Default configuration for summary microservice.
TYPE:
|
chat |
Default configuration for chat microservice.
TYPE:
|
rag |
Default configuration for rag microservice.
TYPE:
|
Source code in docs/microservices/core/src/models/general.py
class InterServiceCommunication(BaseModel):
"""Configuration of all microservice communications.
Attributes:
parser (PostConfig): Default configuration for parsing microservice.
summary (PostConfig): Default configuration for summary microservice.
chat (PostConfig): Default configuration for chat microservice.
rag (PostConfig): Default configuration for rag microservice.
"""
parser: PostConfig = PostConfig(timeout_in_s=200)
summary: PostConfig = PostConfig(timeout_in_s=600)
chat: PostConfig = PostConfig()
rag: PostConfig = PostConfig()
transcription: PostConfig = PostConfig(timeout_in_s=600)
LogLevel
Bases: StrEnum
Enum class specifying possible log levels.
Source code in docs/microservices/core/src/models/general.py
class LogLevel(StrEnum):
"""Enum class specifying possible log levels."""
CRITICAL = "CRITICAL"
ERROR = "ERROR"
WARNING = "WARNING"
INFO = "INFO"
DEBUG = "DEBUG"
@classmethod
def _missing_(cls, value: object) -> None:
"""Convert strings to uppercase and recheck for existance."""
if isinstance(value, str):
value = value.upper()
for level in cls:
if level == value:
return level
return None
PostConfig
Bases: BaseModel
Configuration for async_post request to other microservices.
The default values in this class can be overwritten by those values stated in configs/general.yml.
| ATTRIBUTE | DESCRIPTION |
|---|---|
model_config |
Used to ignore other services, which are defined in the config.
TYPE:
|
max_attempts |
Maximal number of requests before returning status code 424.
TYPE:
|
timeout_in_s |
Maximum waiting duration before timeout (in seconds).
TYPE:
|
Source code in docs/microservices/core/src/models/general.py
class PostConfig(BaseModel):
"""Configuration for async_post request to other microservices.
The default values in this class can be overwritten by those values stated in configs/general.yml.
Attributes:
model_config (ConfigDict): Used to ignore other services, which are defined in the config.
max_attempts (PositiveInt): Maximal number of requests before returning status code 424.
timeout_in_s (PositiveInt): Maximum waiting duration before timeout (in seconds).
"""
model_config = ConfigDict(extra="ignore")
max_attempts: PositiveInt = 3
timeout_in_s: PositiveInt = 180
Settings
Bases: BaseModel
General Settings for the service.
| ATTRIBUTE | DESCRIPTION |
|---|---|
model_config |
Used to ignore other services, which are defined in the config.
TYPE:
|
service_name |
Name of service, i.e. 'core'
TYPE:
|
authentication |
Authentication settings
TYPE:
|
n_uvicorn_workers |
Number of parallel uvicorn instances.
TYPE:
|
service_endpoints |
URLs of required services (e.g. rag, chat, feedback-db,...)
TYPE:
|
feedback_db |
Settings for feedback database
TYPE:
|
log_level |
Minimal level of logging output given.
TYPE:
|
log_file_max_bytes |
(PositiveInt): Max file size for logfile
TYPE:
|
log_file_backup_count |
Number of log-files to loop over
TYPE:
|
log_file |
Write logfile there.
TYPE:
|
inter_service_communication |
Configuration of communication with other services. |
enabled_routes |
Configuration for enabling/disabling optional routes.
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
ensure_log_dir |
Create the log directory after validation. |
Source code in docs/microservices/core/src/models/general.py
class Settings(BaseModel):
"""General Settings for the service.
Attributes:
model_config (ConfigDict): Used to ignore other services, which are defined in the config.
service_name (str): Name of service, i.e. 'core'
authentication (Authentication): Authentication settings
n_uvicorn_workers (PositiveInt): Number of parallel uvicorn instances.
service_endpoints (dict[str, AnyHttpUrl]): URLs of required services (e.g. rag, chat, feedback-db,...)
feedback_db (FeedbackDbSettings): Settings for feedback database
log_level (LogLevel): Minimal level of logging output given.
log_file_max_bytes: (PositiveInt): Max file size for logfile
log_file_backup_count (PositiveInt): Number of log-files to loop over
log_file (FilePath): Write logfile there.
inter_service_communication (InterServiceCommunication): Configuration of communication with other services.
enabled_routes (EnabledRoutes): Configuration for enabling/disabling optional routes.
"""
model_config = ConfigDict(extra="ignore", frozen=True)
service_name: str = "Core"
service_descripton: str = (
"Coordination of the microservices and provision of central functions"
)
authentication: Authentication = Authentication()
allow_origins: list[AnyHttpUrl] = [HttpUrl("http://localhost:9999")]
n_uvicorn_workers: PositiveInt = 1
service_endpoints: dict[str, AnyHttpUrl]
feedback_db: FeedbackDbSettings = FeedbackDbSettings()
log_level: LogLevel = LogLevel.INFO
log_file_max_bytes: PositiveInt = 1 * 1024 * 1024
log_file_backup_count: PositiveInt = 3
log_file: FilePath = Path("/core/logs/log")
inter_service_communication: InterServiceCommunication = InterServiceCommunication()
enabled_routes: EnabledRoutes = EnabledRoutes()
@model_validator(mode="after")
def ensure_log_dir(self) -> "Settings":
"""Create the log directory after validation."""
self.log_file.parent.mkdir(parents=True, exist_ok=True)
return self
ensure_log_dir
settings
Load all settings from a central place, not hidden in utils.
| FUNCTION | DESCRIPTION |
|---|---|
get_config_dir |
Get the default config dir. |
get_general_settings |
Read the file 'general.yml' from the config dir. |
get_config_dir
Get the default config dir.
All needed configuration files are supposed to be found here. Directories are checked for existence and an 'Exception' is thrown if the check is negative.
Source code in docs/microservices/core/src/settings.py
def get_config_dir() -> Path:
"""Get the default config dir.
All needed configuration files are supposed to be found here. Directories are checked for
existence and an 'Exception' is thrown if the check is negative.
"""
path_to_config_dir = Path(__file__).parent / ".." / "configs"
if not path_to_config_dir.exists():
logger.fatal(f"Invalid path to config dir: '{path_to_config_dir}'")
raise Exception(f"Invalid path to config dir: '{path_to_config_dir}'")
return path_to_config_dir
get_general_settings
Read the file 'general.yml' from the config dir.
Source code in docs/microservices/core/src/settings.py
def get_general_settings() -> Settings:
"""Read the file 'general.yml' from the config dir."""
general_config_paths = get_config_dir() / "general.yml"
if not general_config_paths.exists():
logger.fatal(f"Invalid path to 'general.yml': '{general_config_paths}'")
raise Exception(f"Invalid path to 'general.yml': '{general_config_paths}'")
settings = load_all_configs(general_config_paths)
logger.debug("Loaded settings:%r", pprint.pformat(settings))
return settings
utils
Utils functions for logging, LLM availability check and configuration processing.
| MODULE | DESCRIPTION |
|---|---|
base_logger |
Set up the root logger for the entire application. This logger will log messages to the console and a file. |
process_configs |
Methods to load and config and start checks of config integrity. |
base_logger
Set up the root logger for the entire application. This logger will log messages to the console and a file.
| FUNCTION | DESCRIPTION |
|---|---|
setup_logger |
Initialize the logger with the desired log level and add handlers. |
setup_logger
Initialize the logger with the desired log level and add handlers.
Sets up the root logger, which all other loggers inherit from. Adds file, console and exit handlers to the logger and sets the format.
Source code in docs/microservices/core/src/utils/base_logger.py
def setup_logger(settings: Settings) -> None:
"""Initialize the logger with the desired log level and add handlers.
Sets up the root logger, which all other loggers inherit from.
Adds file, console and exit handlers to the logger and sets the format.
"""
# root logger, all other loggers inherit from this
logger = logging.getLogger()
# create different handlers for log file and console
file_handler = logging.handlers.RotatingFileHandler(
filename=settings.log_file,
maxBytes=settings.log_file_max_bytes,
backupCount=settings.log_file_backup_count,
)
console_handler = logging.StreamHandler()
# define log format and set for each handler
formatter = logging.Formatter(
fmt="%(asctime)s - %(levelname)8s - %(module)s - %(funcName)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S%z",
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# add handlers to the logger
logger.addHandler(file_handler)
logger.addHandler(console_handler)
logger.setLevel(settings.log_level)
process_configs
Methods to load and config and start checks of config integrity.
| FUNCTION | DESCRIPTION |
|---|---|
load_all_configs |
Load config settings from respective paths. |
load_from_yml_in_pydantic_model |
Load config from 'list_of_yaml_paths' into given pydantic-Model. |
load_yaml |
Load yaml. |
load_all_configs
Load config settings from respective paths.
| PARAMETER | DESCRIPTION |
|---|---|
general_config_paths
|
Path to config, matching 'Settings'
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Settings
|
Config loaded into their Pydantic Model. |
Source code in docs/microservices/core/src/utils/process_configs.py
def load_all_configs(
general_config_paths: Path,
) -> Settings:
"""Load config settings from respective paths.
Args:
general_config_paths (Path): Path to config, matching 'Settings'
Returns:
Config loaded into their Pydantic Model.
"""
settings = load_from_yml_in_pydantic_model(general_config_paths, Settings)
return settings
load_from_yml_in_pydantic_model
Load config from 'list_of_yaml_paths' into given pydantic-Model.
| PARAMETER | DESCRIPTION |
|---|---|
yaml_path
|
Yaml to load
TYPE:
|
pydantic_reference_model
|
pydantic model to load yaml into
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
BaseModel
|
BaseModel derived pydantic data class. |
Source code in docs/microservices/core/src/utils/process_configs.py
def load_from_yml_in_pydantic_model(
yaml_path: Path, pydantic_reference_model: BaseModel
) -> BaseModel:
"""Load config from 'list_of_yaml_paths' into given pydantic-Model.
Args:
yaml_path (Path): Yaml to load
pydantic_reference_model (BaseModel): pydantic model to load yaml into
Returns:
BaseModel derived pydantic data class.
"""
data = load_yaml(yaml_path)
try:
pydantic_class = pydantic_reference_model(**data)
logger.info(f"Config loaded from: '{yaml_path}'")
return pydantic_class
except ValidationError as e:
logger.critical(f"Error loading config: '{e}'")
raise e
load_yaml
Load yaml.
| PARAMETER | DESCRIPTION |
|---|---|
yaml_path
|
Path to yaml
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
dict[str, Any]
|
Content of loaded yaml. |