Zum Inhalt

Core

core

MODULE DESCRIPTION
main

Define a default main entry point of the application.

src

Source code of the core containing endpoints of the microservices and utilities.

main

Define a default main entry point of the application.

FUNCTION DESCRIPTION
main

The main entry point of the application.

main

main()

The main entry point of the application.

Source code in docs/microservices/core/main.py
def main() -> None:
    """The main entry point of the application."""
    settings = get_general_settings()
    setup_logger(settings)

    uvicorn.run(
        "src.app:construct_app",
        factory=True,
        host="0.0.0.0",  # noqa: S104
        port=8000,
        log_level=settings.log_level.lower(),
        workers=settings.n_uvicorn_workers,
    )

src

Source code of the core containing endpoints of the microservices and utilities.

MODULE DESCRIPTION
WebsocketManager

WebSocketManager class keeping track of all websocket connections.

app

Initialize the app.

authentication

Expose the API for authentication in F13.

endpoints

Modules for sending HTTP requests to endpoints of internal microservices.

feedback

Models and functions to store feedback for chat, RAG and summary.

models

Data model classes for loading and validating API and configuration parameters.

settings

Load all settings from a central place, not hidden in utils.

utils

Utils functions for logging, LLM availability check and configuration processing.

WebsocketManager

WebSocketManager class keeping track of all websocket connections.

CLASS DESCRIPTION
WebSocketManager

WebSocketManager keeps track of all websocket connections and dispatches messages to the appropriate connection.

WebSocketManager

WebSocketManager keeps track of all websocket connections and dispatches messages to the appropriate connection.

ATTRIBUTE DESCRIPTION
open_websocket_handler

A dictionary mapping connection identifiers to their corresponding WebSocket handlers.

TYPE: dict

logger

Logger instance for logging events and errors; can be None if not set.

TYPE: Optional[Logger]

message_queue

A queue that holds messages to be sent to connected clients.

TYPE: Queue

sender_thread

Background thread responsible for processing the message queue and sending messages.

TYPE: Thread

METHOD DESCRIPTION
add_websocket

Add a websocket connection to the dictionary of open websocket connections.

check_connection_id_is_unique

Check whether a given connection ID is already in use.

critical

Send a logging message with CRITICAL level.

debug

Send a logging message with DEBUG level.

error

Send a logging message with ERROR level.

info

Send a logging message with INFO level.

remove_websocket

Remove a WebSocket connection from the active connections.

send_message

Put a message into the message queue.

set_logger

Set the Python logger to use for logging.

warning

Send a logging message with WARNING level.

Source code in docs/microservices/core/src/WebsocketManager.py
class WebSocketManager:
    """WebSocketManager keeps track of all websocket connections and dispatches messages to the appropriate connection.

    Attributes:
        open_websocket_handler (dict): A dictionary mapping connection identifiers to their corresponding WebSocket
                                       handlers.
        logger (Optional[logging.Logger]): Logger instance for logging events and errors; can be None if not set.
        message_queue (queue.Queue): A queue that holds messages to be sent to connected clients.
        sender_thread (threading.Thread): Background thread responsible for processing the message queue and sending
                                          messages.

    """

    def __init__(self) -> None:
        """Initialize the WebSocket message dispatcher.

        This constructor sets up internal structures required for handling
        WebSocket communication and message dispatching in a background thread.

        Attributes:
            open_websocket_handler (dict): A mapping of active WebSocket handlers.
            logger (Logger or None): Optional logger instance for internal logging.
            message_queue (queue.Queue): Queue for storing outgoing messages to be sent.
            sender_thread (threading.Thread): Background thread that runs `_run_loop`
                                            to process the message queue.

        """
        self.open_websocket_handler = dict()
        self.logger = None
        self.message_queue = queue.Queue()
        self.sender_thread = threading.Thread(target=self._run_loop)
        self.sender_thread.daemon = True
        self.sender_thread.start()

    def _run_loop(self) -> None:
        """Start the message sender in a second thread."""
        asyncio.run(self._send_messages())

    def check_connection_id_is_unique(self, connection_id: str) -> bool:
        """Check whether a given connection ID is already in use.

        Args:
            connection_id (str): The ID of the WebSocket connection.

        Returns:
            True if the connection ID is unique.
        """
        return connection_id not in self.open_websocket_handler

    def add_websocket(self, websocket: WebSocket, connection_id: str) -> None:
        """Add a websocket connection to the dictionary of open websocket connections.

        Usually this method is invoked manually when a websocket connection is being established.

        Args:
            websocket (WebSocket): The websocket object to be removed.
            connection_id (str): The ID of the websocket connection.

        """
        self.open_websocket_handler[str(connection_id)] = websocket
        if self.logger:
            self.logger.debug(f"WebSocket connection added: {connection_id}")

    def remove_websocket(self, connection_id: str) -> None:
        """Remove a WebSocket connection from the active connections.

        Typically called when a WebSocket is closed.

        Args:
            connection_id (str): Identifier of the WebSocket connection.

        """
        if connection_id in self.open_websocket_handler:
            del self.open_websocket_handler[connection_id]
            if self.logger:
                self.logger.debug(f"WebSocket connection removed: {connection_id}")

    def set_logger(self, logger: logging.Logger) -> None:
        """Set the Python logger to use for logging.

        Args:
            logger (logging.Logger): The logger object to use.

        """
        self.logger = logger

    async def _send_messages(self) -> NoReturn:
        """Pull messages from message queue and sends it to the websocket of a specific user."""
        while True:
            if not self.message_queue.empty():
                message = self.message_queue.get()
                try:
                    connection_id = message["connection_id"]
                    msg = message["message"]
                    ws = self.open_websocket_handler[connection_id]
                    await ws.send_text(msg)
                except Exception:
                    if self.logger:
                        self.logger.info(f"Websocket for {connection_id} not found")

            await asyncio.sleep(0.1)

    def send_message(self, message: str, connection_id: str) -> None:
        """Put a message into the message queue.

        Args:
            message (str): Text message.
            connection_id (str): Connection ID of the addressed WebSocket.

        """
        # ensure connection_id is a string
        connection_id = str(connection_id)

        # try to find the destination websocket
        try:
            if self.logger:
                self.logger.debug(
                    f"Sending Message via websocket connection {connection_id}: {message}"
                )

            self.message_queue.put({"message": message, "connection_id": connection_id})

        except Exception:
            if self.logger:
                self.logger.info(f"Websocket for {connection_id} not found")

    def debug(self, message: str, connection_id: str) -> None:
        """Send a logging message with DEBUG level.

        Args:
            message (str): The log message to send.
            connection_id (str): The ID of the connection associated with this log entry.

        """
        self._send_log_message("DEBUG", message, connection_id)

    def info(self, message: str, connection_id: str) -> None:
        """Send a logging message with INFO level.

        Args:
            message (str): The log message to send.
            connection_id (str): The ID of the connection associated with this log entry.

        """
        self._send_log_message("INFO", message, connection_id)

    def warning(self, message: str, connection_id: str) -> None:
        """Send a logging message with WARNING level.

        Args:
            message (str): The log message to send.
            connection_id (str): The ID of the connection associated with this log entry.

        """
        self._send_log_message("WARNING", message, connection_id)

    def error(self, message: str, connection_id: str) -> None:
        """Send a logging message with ERROR level.

        Args:
            message (str): The log message to send.
            connection_id (str): The ID of the connection associated with this log entry.

        """
        self._send_log_message("ERROR", message, connection_id)

    def critical(self, message: str, connection_id: str) -> None:
        """Send a logging message with CRITICAL level.

        Args:
            message (str): The log message to send.
            connection_id (str): The ID of the connection associated with this log entry.

        """
        self._send_log_message("CRITICAL", message, connection_id)

    def _send_log_message(
        self, msg_type: str, message: str, connection_id: str
    ) -> None:
        """Format to json and send as log message."""
        self.send_message(
            message=json.dumps(dict(type=msg_type, message=message)),
            connection_id=connection_id,
        )
add_websocket
add_websocket(websocket, connection_id)

Add a websocket connection to the dictionary of open websocket connections.

Usually this method is invoked manually when a websocket connection is being established.

PARAMETER DESCRIPTION
websocket

The websocket object to be removed.

TYPE: WebSocket

connection_id

The ID of the websocket connection.

TYPE: str

Source code in docs/microservices/core/src/WebsocketManager.py
def add_websocket(self, websocket: WebSocket, connection_id: str) -> None:
    """Add a websocket connection to the dictionary of open websocket connections.

    Usually this method is invoked manually when a websocket connection is being established.

    Args:
        websocket (WebSocket): The websocket object to be removed.
        connection_id (str): The ID of the websocket connection.

    """
    self.open_websocket_handler[str(connection_id)] = websocket
    if self.logger:
        self.logger.debug(f"WebSocket connection added: {connection_id}")
check_connection_id_is_unique
check_connection_id_is_unique(connection_id)

Check whether a given connection ID is already in use.

PARAMETER DESCRIPTION
connection_id

The ID of the WebSocket connection.

TYPE: str

RETURNS DESCRIPTION
bool

True if the connection ID is unique.

Source code in docs/microservices/core/src/WebsocketManager.py
def check_connection_id_is_unique(self, connection_id: str) -> bool:
    """Check whether a given connection ID is already in use.

    Args:
        connection_id (str): The ID of the WebSocket connection.

    Returns:
        True if the connection ID is unique.
    """
    return connection_id not in self.open_websocket_handler
critical
critical(message, connection_id)

Send a logging message with CRITICAL level.

PARAMETER DESCRIPTION
message

The log message to send.

TYPE: str

connection_id

The ID of the connection associated with this log entry.

TYPE: str

Source code in docs/microservices/core/src/WebsocketManager.py
def critical(self, message: str, connection_id: str) -> None:
    """Send a logging message with CRITICAL level.

    Args:
        message (str): The log message to send.
        connection_id (str): The ID of the connection associated with this log entry.

    """
    self._send_log_message("CRITICAL", message, connection_id)
debug
debug(message, connection_id)

Send a logging message with DEBUG level.

PARAMETER DESCRIPTION
message

The log message to send.

TYPE: str

connection_id

The ID of the connection associated with this log entry.

TYPE: str

Source code in docs/microservices/core/src/WebsocketManager.py
def debug(self, message: str, connection_id: str) -> None:
    """Send a logging message with DEBUG level.

    Args:
        message (str): The log message to send.
        connection_id (str): The ID of the connection associated with this log entry.

    """
    self._send_log_message("DEBUG", message, connection_id)
error
error(message, connection_id)

Send a logging message with ERROR level.

PARAMETER DESCRIPTION
message

The log message to send.

TYPE: str

connection_id

The ID of the connection associated with this log entry.

TYPE: str

Source code in docs/microservices/core/src/WebsocketManager.py
def error(self, message: str, connection_id: str) -> None:
    """Send a logging message with ERROR level.

    Args:
        message (str): The log message to send.
        connection_id (str): The ID of the connection associated with this log entry.

    """
    self._send_log_message("ERROR", message, connection_id)
info
info(message, connection_id)

Send a logging message with INFO level.

PARAMETER DESCRIPTION
message

The log message to send.

TYPE: str

connection_id

The ID of the connection associated with this log entry.

TYPE: str

Source code in docs/microservices/core/src/WebsocketManager.py
def info(self, message: str, connection_id: str) -> None:
    """Send a logging message with INFO level.

    Args:
        message (str): The log message to send.
        connection_id (str): The ID of the connection associated with this log entry.

    """
    self._send_log_message("INFO", message, connection_id)
remove_websocket
remove_websocket(connection_id)

Remove a WebSocket connection from the active connections.

Typically called when a WebSocket is closed.

PARAMETER DESCRIPTION
connection_id

Identifier of the WebSocket connection.

TYPE: str

Source code in docs/microservices/core/src/WebsocketManager.py
def remove_websocket(self, connection_id: str) -> None:
    """Remove a WebSocket connection from the active connections.

    Typically called when a WebSocket is closed.

    Args:
        connection_id (str): Identifier of the WebSocket connection.

    """
    if connection_id in self.open_websocket_handler:
        del self.open_websocket_handler[connection_id]
        if self.logger:
            self.logger.debug(f"WebSocket connection removed: {connection_id}")
send_message
send_message(message, connection_id)

Put a message into the message queue.

PARAMETER DESCRIPTION
message

Text message.

TYPE: str

connection_id

Connection ID of the addressed WebSocket.

TYPE: str

Source code in docs/microservices/core/src/WebsocketManager.py
def send_message(self, message: str, connection_id: str) -> None:
    """Put a message into the message queue.

    Args:
        message (str): Text message.
        connection_id (str): Connection ID of the addressed WebSocket.

    """
    # ensure connection_id is a string
    connection_id = str(connection_id)

    # try to find the destination websocket
    try:
        if self.logger:
            self.logger.debug(
                f"Sending Message via websocket connection {connection_id}: {message}"
            )

        self.message_queue.put({"message": message, "connection_id": connection_id})

    except Exception:
        if self.logger:
            self.logger.info(f"Websocket for {connection_id} not found")
set_logger
set_logger(logger)

Set the Python logger to use for logging.

PARAMETER DESCRIPTION
logger

The logger object to use.

TYPE: Logger

Source code in docs/microservices/core/src/WebsocketManager.py
def set_logger(self, logger: logging.Logger) -> None:
    """Set the Python logger to use for logging.

    Args:
        logger (logging.Logger): The logger object to use.

    """
    self.logger = logger
warning
warning(message, connection_id)

Send a logging message with WARNING level.

PARAMETER DESCRIPTION
message

The log message to send.

TYPE: str

connection_id

The ID of the connection associated with this log entry.

TYPE: str

Source code in docs/microservices/core/src/WebsocketManager.py
def warning(self, message: str, connection_id: str) -> None:
    """Send a logging message with WARNING level.

    Args:
        message (str): The log message to send.
        connection_id (str): The ID of the connection associated with this log entry.

    """
    self._send_log_message("WARNING", message, connection_id)

app

Initialize the app.

CLASS DESCRIPTION
State

The lifespan state of the FastAPI application.

FUNCTION DESCRIPTION
construct_app

Construct the FastAPI application.

get_jwt_params_from_openid_config

Query the openid-configuration and extract all needed parameters for jwt.

lifespan

Hook to provide lifespan state to our FastAPI application.

register_routers

Register routers on the FastAPI app based on the configuration.

State

Bases: TypedDict

The lifespan state of the FastAPI application.

Source code in docs/microservices/core/src/app.py
class State(TypedDict):
    """The lifespan state of the FastAPI application."""

    settings: Settings
    jwks_client: None | jwt.PyJWKClient
    jwt_signign_algorithms: None | Sequence[str]
    db_session: Session
    permissions_cache: PermissionsCacheABC
construct_app
construct_app(lifespan=lifespan, settings=None)

Construct the FastAPI application.

Source code in docs/microservices/core/src/app.py
def construct_app(
    lifespan: Callable[[FastAPI], AsyncIterator[State]] = lifespan,
    settings: None | Settings = None,
) -> FastAPI:
    """Construct the FastAPI application."""
    app = FastAPI(lifespan=lifespan)

    if settings is None:
        settings = get_general_settings()

    app.title = settings.service_name
    app.description = settings.service_descripton
    app.version = "2.0.0"

    app.state.settings = settings
    if settings.enabled_routes.feedback:
        app.state.db_session = create_db_session(settings)
    else:
        app.state.db_session = None

    app.add_middleware(
        CORSMiddleware,
        allow_origins=[str(origin).rstrip("/") for origin in settings.allow_origins],
        allow_methods=["*"],
        allow_headers=["*"],
    )

    register_routers(app, settings)
    return app
get_jwt_params_from_openid_config
get_jwt_params_from_openid_config(settings)

Query the openid-configuration and extract all needed parameters for jwt.

PARAMETER DESCRIPTION
settings

The application settings.

TYPE: Settings

RETURNS DESCRIPTION
tuple[PyJWKClient, Sequence[str]]

A tuple of the JWKs client and the supported signing algorithms.

Source code in docs/microservices/core/src/app.py
def get_jwt_params_from_openid_config(
    settings: Settings,
) -> tuple[jwt.PyJWKClient, Sequence[str]]:
    """Query the openid-configuration and extract all needed parameters for jwt.

    Args:
        settings: The application settings.

    Returns:
        A tuple of the JWKs client and the supported signing algorithms.
    """
    logger.debug(
        "Querying openid-configuration from %s",
        settings.authentication.openid_configuration_url,
    )

    resp_oid_config = requests.get(
        settings.authentication.openid_configuration_url, timeout=5
    )
    oid_config = resp_oid_config.json()

    logger.debug(oid_config)

    if "error" in oid_config:
        logger.error(
            "Could not obtain OpenID configuration from %s: %s",
            settings.authentication.openid_configuration_url,
            oid_config["error"],
        )
        raise Exception(oid_config["error"])

    logger.debug("Decoded oidc config: %r", oid_config)

    jwks_client = jwt.PyJWKClient(oid_config["jwks_uri"])
    jwt_signign_algorithms = oid_config["id_token_signing_alg_values_supported"]

    return jwks_client, jwt_signign_algorithms
lifespan async
lifespan(app)

Hook to provide lifespan state to our FastAPI application.

Source code in docs/microservices/core/src/app.py
@contextlib.asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[State]:
    """Hook to provide lifespan state to our FastAPI application."""
    # 'app.state.settings' is set in 'construct_app'.
    settings = app.state.settings

    logger.debug("Running with settings:\n%s", settings)
    if settings.authentication.guest_mode:
        jwks_client = None
        jwt_signign_algorithms = None
    else:
        jwks_client, jwt_signign_algorithms = get_jwt_params_from_openid_config(
            settings
        )

    if settings.enabled_routes.feedback:
        db_session = app.state.db_session
        scheduler = BackgroundScheduler()
        scheduler.add_job(
            functools.partial(cleanup_userfeedback, db_session), "interval", minutes=60
        )
        scheduler.start()
    else:
        db_session = None
        scheduler = None

    yield {
        "settings": settings,
        "jwks_client": jwks_client,
        "jwt_signign_algorithms": jwt_signign_algorithms,
        "db_session": db_session,
        "permissions_cache": InMemoryPermissionsCache(),
    }

    if scheduler:
        scheduler.shutdown()
register_routers
register_routers(app, settings)

Register routers on the FastAPI app based on the configuration.

This function conditionally includes route handlers for various features (chat, RAG, summary, feedback, and transcription) into the FastAPI application based on their enabled status in the settings configuration. The core router is always included. Each router inclusion or exclusion is logged for monitoring.

PARAMETER DESCRIPTION
app

The FastAPI application instance to register routers on.

TYPE: FastAPI

settings

Configuration object containing enabled_routes settings that determine which routers should be registered.

TYPE: Settings

Source code in docs/microservices/core/src/app.py
def register_routers(app: FastAPI, settings: Settings) -> None:
    """Register routers on the FastAPI app based on the configuration.

    This function conditionally includes route handlers for various features
    (chat, RAG, summary, feedback, and transcription) into the FastAPI application
    based on their enabled status in the settings configuration. The core router
    is always included. Each router inclusion or exclusion is logged for monitoring.

    Args:
        app (FastAPI): The FastAPI application instance to register routers on.
        settings (Settings): Configuration object containing enabled_routes settings
                             that determine which routers should be registered.
    """
    app.include_router(core.router)
    logger.info("Core route enabled")

    if settings.enabled_routes.chat:
        app.include_router(chat.router)
        logger.info("Chat route enabled")
    else:
        logger.warning("Chat route disabled")

    if settings.enabled_routes.rag:
        app.include_router(rag.router)
        logger.info("RAG route enabled")
    else:
        logger.warning("RAG route disabled")

    if settings.enabled_routes.summary:
        app.include_router(summary.router)
        logger.info("Summary route enabled")
    else:
        logger.warning("Summary route disabled")

    if settings.enabled_routes.feedback:
        app.include_router(feedback.router)
        logger.info("Feedback route enabled")
    else:
        logger.warning("Feedback route disabled")

    if settings.enabled_routes.transcription:
        app.include_router(transcription.router)
        logger.info("Transcription route enabled")
    else:
        logger.warning("Transcription route disabled")

authentication

Expose the API for authentication in F13.

MODULE DESCRIPTION
authentication

Primitives for user authentication.

authorization

Primitives for user authorization.

common

Define common data types and utility functions used in several auth-related modules.

permissions_cache

Primitives for user authentication.

FUNCTION DESCRIPTION
add_auth_status_codes_to_responses

Add documentation for authorization related status codes.

add_auth_status_codes_to_responses
add_auth_status_codes_to_responses(responses)

Add documentation for authorization related status codes.

Source code in docs/microservices/core/src/authentication/__init__.py
def add_auth_status_codes_to_responses(
    responses: dict[int | str, dict[str, Any]],
) -> dict[int | str, dict[str, Any]]:
    """Add documentation for authorization related status codes."""
    responses[401] = {
        "description": "Missing authorization, invalid or expired token",
    }
    responses[403] = {
        "description": "Insufficient permissions to access the endpoint.",
    }
    return responses
authentication

Primitives for user authentication.

To authenticate a user we perform a User-Managed Access (UMA) request in Keycloak. Keycloak returns a Requesting Party token (RPT) which contains the permissions of the user.

CLASS DESCRIPTION
Permission

Enumeration of known permissions to F13.

FUNCTION DESCRIPTION
needs_permissions

Create a Security dependency that requires specific permissions.

Permission

Bases: StrEnum

Enumeration of known permissions to F13.

Source code in docs/microservices/core/src/authentication/authentication.py
class Permission(enum.StrEnum):
    """Enumeration of known permissions to F13."""

    CHAT = "chat"
    FEEDBACK = "feedback"
    SUMMARY = "summary"
    RAG_DATABASE = "rag-database"
    RAG_FILE = "rag-file"
    TRANSCRIPTION = "transcription"
needs_permissions
needs_permissions(perms)

Create a Security dependency that requires specific permissions.

PARAMETER DESCRIPTION
perms

List of required permission names.

TYPE: Sequence[Permission]

RETURNS DESCRIPTION
Security

Security dependency configured to check the specified permissions.

Source code in docs/microservices/core/src/authentication/authentication.py
def needs_permissions(perms: Sequence[Permission]) -> Security:
    """Create a Security dependency that requires specific permissions.

    Args:
        perms: List of required permission names.

    Returns:
        Security dependency configured to check the specified permissions.
    """
    return Security(_check_permissions, scopes=[perm.value for perm in perms])
authorization

Primitives for user authorization.

This will extract user information from a Javascript web token provided in the request.

CLASS DESCRIPTION
User

Model of a frontend enduser of F13.

FUNCTION DESCRIPTION
get_current_user

Construct a 'User' object from an access token.

User

Bases: BaseModel

Model of a frontend enduser of F13.

The relevant data can all be extracted from a Keycloak access_token.

METHOD DESCRIPTION
from_token

Construct a 'User' from a Keycloak token.

Source code in docs/microservices/core/src/authentication/authorization.py
class User(BaseModel):
    """Model of a frontend enduser of F13.

    The relevant data can all be extracted from a Keycloak `access_token`.
    """

    user_id: str
    session_id: str

    @classmethod
    def from_token(cls, decoded_token: AccessTokenContent) -> User:
        """Construct a 'User' from a Keycloak token."""
        logger.debug("Decoded token: %r", decoded_token)

        sub = decoded_token.get("sub")
        if not sub:
            raise ValueError("Token does not contain the 'sub' claim")

        session_id = decoded_token.get("sid")
        if not session_id:
            raise ValueError("Token does not contain the 'sid' claim")

        return cls(user_id=sub, session_id=session_id)
from_token classmethod
from_token(decoded_token)

Construct a 'User' from a Keycloak token.

Source code in docs/microservices/core/src/authentication/authorization.py
@classmethod
def from_token(cls, decoded_token: AccessTokenContent) -> User:
    """Construct a 'User' from a Keycloak token."""
    logger.debug("Decoded token: %r", decoded_token)

    sub = decoded_token.get("sub")
    if not sub:
        raise ValueError("Token does not contain the 'sub' claim")

    session_id = decoded_token.get("sid")
    if not session_id:
        raise ValueError("Token does not contain the 'sid' claim")

    return cls(user_id=sub, session_id=session_id)
get_current_user async
get_current_user(request, token)

Construct a 'User' object from an access token.

The token is supposed to be passed in as "Bearer" in the "Authorization" header.

Source code in docs/microservices/core/src/authentication/authorization.py
async def get_current_user(
    request: Request,
    token: Annotated[None | str, Depends(oauth2_scheme)],
) -> None | User:
    """Construct a 'User' object from an access token.

    The token is supposed to be passed in as "Bearer" in the "Authorization" header.
    """
    if request.state.settings.authentication.guest_mode:
        logger.debug(
            "Application is configured for guest-mode. Skipping authorization."
        )
        return None

    elif token is None:
        logger.debug("No token provided!")
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Not authenticated",
            headers={"WWW-Authenticate": "Bearer"},
        )

    else:
        logger.debug("Extract user data from token: %s", token)
        jwks_client = cast(jwt.PyJWKClient, request.state.jwks_client)
        signing_algos = request.state.jwt_signign_algorithms
        try:
            decoded = await decode_access_token(
                token,
                jwks_client,
                signing_algos,
                request.state.settings.authentication.audience,
            )
        except (jwt.DecodeError, jwt.PyJWKClientError) as exc:
            logger.debug("Error while decoding token %s", token, exc_info=exc)
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token"
            )
        except jwt.ExpiredSignatureError:
            logger.debug("Presented token has expired")
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED, detail="Expired signature"
            )
        return User.from_token(decoded)
common

Define common data types and utility functions used in several auth-related modules.

CLASS DESCRIPTION
AccessTokenContent

Encountered fields in a users access token.

RequestingPartyToken

Decoded Requesting Party Token (RPT) from Keycloak.

FUNCTION DESCRIPTION
decode_access_token

Decode the given 'access_token'.

AccessTokenContent

Bases: TypedDict

Encountered fields in a users access token.

Source code in docs/microservices/core/src/authentication/common.py
class AccessTokenContent(TypedDict):
    """Encountered fields in a users access token."""

    # We only care about sub and sid
    sub: str
    sid: str
RequestingPartyToken

Bases: TypedDict

Decoded Requesting Party Token (RPT) from Keycloak.

This and related data types only serve a documentary purpose. No validation or parsing happens to ensure that this is indeed the structure returned by Keycloak. The functions working with these types have to carry out checks on their behalf.

ATTRIBUTE DESCRIPTION
exp

Token expiration timestamp (Unix epoch).

TYPE: int

sid

Session identifier.

TYPE: str

authorization

UMA authorization data containing permissions.

TYPE: _AuthorizationBlock

Source code in docs/microservices/core/src/authentication/common.py
class RequestingPartyToken(TypedDict):
    """Decoded Requesting Party Token (RPT) from Keycloak.

    This and related data types only serve a documentary purpose. No validation or parsing happens
    to ensure that this is indeed the structure returned by Keycloak. The functions working with
    these types have to carry out checks on their behalf.

    Attributes:
        exp: Token expiration timestamp (Unix epoch).
        sid: Session identifier.
        authorization: UMA authorization data containing permissions.
    """

    exp: int
    sid: str
    authorization: _AuthorizationBlock
decode_access_token async
decode_access_token(access_token, jwks_client, signing_algos, audience)

Decode the given 'access_token'.

If 'audience' is set to 'None' then audience verification is disabled entirely.

Source code in docs/microservices/core/src/authentication/common.py
async def decode_access_token(
    access_token: str,
    jwks_client: jwt.PyJWKClient,
    signing_algos: Sequence[str],
    audience: None | str,
) -> AccessTokenContent:
    """Decode the given 'access_token'.

    If 'audience' is set to 'None' then audience verification is disabled entirely.
    """
    # 'get_signing_key_from_jwt' eventually performs a request to Keycloak
    signing_key = await asyncio.to_thread(
        jwks_client.get_signing_key_from_jwt, access_token
    )

    options = {}

    if audience is None:
        options = {"verify_aud": False}

    decoded = jwt.decode(
        access_token,
        key=signing_key.key,
        algorithms=signing_algos,
        audience=audience,
        options=options,
    )

    return decoded
permissions_cache

Primitives for user authentication.

Authentication happens via OAuth2 from Keycloak.

CLASS DESCRIPTION
AuthCacheVal

Cached authorization data for a user session.

InMemoryPermissionsCache

In-memory cache for UMA RPT authorization data.

PermissionsCacheABC

Abstract cache interface for UMA RPT authorization data.

AuthCacheVal dataclass

Cached authorization data for a user session.

ATTRIBUTE DESCRIPTION
expires

When this cache entry expires.

TYPE: datetime

session_id

The user's session identifier.

TYPE: SessionId

rpt

The decoded Requesting Party Token.

TYPE: RequestingPartyToken

permissions

List of permission names extracted from the RPT.

TYPE: list[str]

raw_rpt

The raw JWT token string.

TYPE: str

METHOD DESCRIPTION
construct_from_rpt

Construct an AuthCacheVal from a decoded RPT.

Source code in docs/microservices/core/src/authentication/permissions_cache.py
@dataclasses.dataclass(frozen=True)
class AuthCacheVal:
    """Cached authorization data for a user session.

    Attributes:
        expires: When this cache entry expires.
        session_id: The user's session identifier.
        rpt: The decoded Requesting Party Token.
        permissions: List of permission names extracted from the RPT.
        raw_rpt: The raw JWT token string.
    """

    expires: datetime.datetime
    session_id: SessionId
    rpt: RequestingPartyToken
    permissions: list[str]
    raw_rpt: str

    @classmethod
    def construct_from_rpt(
        cls, rpt: RequestingPartyToken, raw_rpt: str
    ) -> AuthCacheVal:
        """Construct an AuthCacheVal from a decoded RPT.

        Args:
            rpt: The decoded Requesting Party Token.
            raw_rpt: The raw JWT token string.

        Returns:
            A new AuthCacheVal instance with extracted permissions.

        Raises:
            HTTPException: If required claims are missing from the RPT.
        """
        if "authorization" not in rpt:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Missing 'authorization' claim in RPT.",
            )
        if "permissions" not in rpt["authorization"]:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Missing 'permissions' field in authorization claim in RPT.",
            )
        permissions = [perm["rsname"] for perm in rpt["authorization"]["permissions"]]

        expires = datetime.datetime.fromtimestamp(rpt["exp"])
        session_id = rpt["sid"]

        return cls(expires, session_id, rpt, permissions, raw_rpt)
construct_from_rpt classmethod
construct_from_rpt(rpt, raw_rpt)

Construct an AuthCacheVal from a decoded RPT.

PARAMETER DESCRIPTION
rpt

The decoded Requesting Party Token.

TYPE: RequestingPartyToken

raw_rpt

The raw JWT token string.

TYPE: str

RETURNS DESCRIPTION
AuthCacheVal

A new AuthCacheVal instance with extracted permissions.

RAISES DESCRIPTION
HTTPException

If required claims are missing from the RPT.

Source code in docs/microservices/core/src/authentication/permissions_cache.py
@classmethod
def construct_from_rpt(
    cls, rpt: RequestingPartyToken, raw_rpt: str
) -> AuthCacheVal:
    """Construct an AuthCacheVal from a decoded RPT.

    Args:
        rpt: The decoded Requesting Party Token.
        raw_rpt: The raw JWT token string.

    Returns:
        A new AuthCacheVal instance with extracted permissions.

    Raises:
        HTTPException: If required claims are missing from the RPT.
    """
    if "authorization" not in rpt:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Missing 'authorization' claim in RPT.",
        )
    if "permissions" not in rpt["authorization"]:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Missing 'permissions' field in authorization claim in RPT.",
        )
    permissions = [perm["rsname"] for perm in rpt["authorization"]["permissions"]]

    expires = datetime.datetime.fromtimestamp(rpt["exp"])
    session_id = rpt["sid"]

    return cls(expires, session_id, rpt, permissions, raw_rpt)
InMemoryPermissionsCache dataclass

Bases: PermissionsCacheABC

In-memory cache for UMA RPT authorization data.

Stores authorization cache entries indexed by session ID with automatic expiration handling. This is a simple in-memory implementation suitable for smaller deployments.

Cleanup is triggered in the add method and is time based. It is executed every '_cleanup_interval' when no other request is owning the '_lock'.

This cache should perform nicely for up to 100,000 users.

METHOD DESCRIPTION
add

Insert or replace a cached authorization entry.

get_by_session_id

Return the cached AuthCacheVal for this session_id, or None.

Source code in docs/microservices/core/src/authentication/permissions_cache.py
@dataclasses.dataclass
class InMemoryPermissionsCache(PermissionsCacheABC):
    """In-memory cache for UMA RPT authorization data.

    Stores authorization cache entries indexed by session ID with automatic expiration handling.
    This is a simple in-memory implementation suitable for smaller deployments.

    Cleanup is triggered in the `add` method and is time based.  It is executed every
    '_cleanup_interval' when no other request is owning the '_lock'.

    This cache should perform nicely for up to 100,000 users.
    """

    _cache: dict[SessionId, AuthCacheVal] = dataclasses.field(default_factory=dict)
    _last_cleanup: datetime.datetime = dataclasses.field(
        default_factory=datetime.datetime.now
    )
    _cleanup_interval: datetime.timedelta = dataclasses.field(
        default_factory=lambda: datetime.timedelta(minutes=5)
    )
    _lock: asyncio.Lock = dataclasses.field(default_factory=asyncio.Lock)

    async def add(self, val: AuthCacheVal) -> None:
        """Insert or replace a cached authorization entry.

        Also run the cleanup routine to remove old

        Args:
            val: The authorization cache value to store.
        """
        logger.debug("Added to permissions cache: %r", val.session_id)
        async with self._lock:
            self._cache[val.session_id] = val
        logger.debug("Current cache size: %s", len(self._cache))
        await self._check_cleanup()

    async def get_by_session_id(self, session_id: SessionId) -> None | AuthCacheVal:
        """Return the cached AuthCacheVal for this session_id, or None.

        Args:
            session_id: The session identifier to lookup.

        Returns:
            The cached authorization value if found and not expired, None otherwise.
        """
        async with self._lock:
            entry = self._cache.get(session_id)
            if entry is None:
                return None

            if entry.expires <= datetime.datetime.now():
                self._cache.pop(session_id)
                return None

            return entry

    async def _check_cleanup(self) -> None:
        """Remove all expired cache entries.

        This is time based and runs after '_cleanup_interval' elapsed. To avoid blocking other
        requests we try to acquire the lock non-blocking.
        """
        now = datetime.datetime.now()
        if now - self._last_cleanup < self._cleanup_interval:
            return

        # Postpone cleanup if the lock is already held by another coroutine
        if self._lock.locked():
            logger.info("Could not acquire lock. Postponing cleanup")
            return

        async with self._lock:
            # Recheck to side-step race condition with 'self._lock.locked()'
            if now - self._last_cleanup < self._cleanup_interval:
                return
            logger.info("Cleaning up permissions cache.")
            expired_keys = [
                sid for sid, val in self._cache.items() if val.expires <= now
            ]
            for key in expired_keys:
                entry = self._cache.pop(key)
                logger.debug("Removed %r", entry.session_id)

            self._last_cleanup = now
add async
add(val)

Insert or replace a cached authorization entry.

Also run the cleanup routine to remove old

PARAMETER DESCRIPTION
val

The authorization cache value to store.

TYPE: AuthCacheVal

Source code in docs/microservices/core/src/authentication/permissions_cache.py
async def add(self, val: AuthCacheVal) -> None:
    """Insert or replace a cached authorization entry.

    Also run the cleanup routine to remove old

    Args:
        val: The authorization cache value to store.
    """
    logger.debug("Added to permissions cache: %r", val.session_id)
    async with self._lock:
        self._cache[val.session_id] = val
    logger.debug("Current cache size: %s", len(self._cache))
    await self._check_cleanup()
get_by_session_id async
get_by_session_id(session_id)

Return the cached AuthCacheVal for this session_id, or None.

PARAMETER DESCRIPTION
session_id

The session identifier to lookup.

TYPE: SessionId

RETURNS DESCRIPTION
None | AuthCacheVal

The cached authorization value if found and not expired, None otherwise.

Source code in docs/microservices/core/src/authentication/permissions_cache.py
async def get_by_session_id(self, session_id: SessionId) -> None | AuthCacheVal:
    """Return the cached AuthCacheVal for this session_id, or None.

    Args:
        session_id: The session identifier to lookup.

    Returns:
        The cached authorization value if found and not expired, None otherwise.
    """
    async with self._lock:
        entry = self._cache.get(session_id)
        if entry is None:
            return None

        if entry.expires <= datetime.datetime.now():
            self._cache.pop(session_id)
            return None

        return entry
PermissionsCacheABC

Bases: ABC

Abstract cache interface for UMA RPT authorization data.

We want to keep this compatible with Redis. Note that Redis will invalidate expired tokens automatically. Thus, any implementation has to take care of cleanups themselves and not to rely on FastAPI background tasks or similar.

METHOD DESCRIPTION
add

Insert or replace a cached authorization entry.

get_by_session_id

Return the cached AuthCacheVal for this session_id, or None.

Source code in docs/microservices/core/src/authentication/permissions_cache.py
class PermissionsCacheABC(abc.ABC):
    """Abstract cache interface for UMA RPT authorization data.

    We want to keep this compatible with Redis. Note that Redis will invalidate expired tokens
    automatically. Thus, any implementation has to take care of cleanups themselves and not to rely
    on FastAPI background tasks or similar.
    """

    @abc.abstractmethod
    async def add(self, val: AuthCacheVal) -> None:
        """Insert or replace a cached authorization entry."""
        raise NotImplementedError

    @abc.abstractmethod
    async def get_by_session_id(self, session_id: SessionId) -> None | AuthCacheVal:
        """Return the cached AuthCacheVal for this session_id, or None."""
        raise NotImplementedError
add abstractmethod async
add(val)

Insert or replace a cached authorization entry.

Source code in docs/microservices/core/src/authentication/permissions_cache.py
@abc.abstractmethod
async def add(self, val: AuthCacheVal) -> None:
    """Insert or replace a cached authorization entry."""
    raise NotImplementedError
get_by_session_id abstractmethod async
get_by_session_id(session_id)

Return the cached AuthCacheVal for this session_id, or None.

Source code in docs/microservices/core/src/authentication/permissions_cache.py
@abc.abstractmethod
async def get_by_session_id(self, session_id: SessionId) -> None | AuthCacheVal:
    """Return the cached AuthCacheVal for this session_id, or None."""
    raise NotImplementedError

endpoints

Modules for sending HTTP requests to endpoints of internal microservices.

MODULE DESCRIPTION
chat

Endpoints of the chat-microservice.

core

General endpoints of f13 core.

feedback

Endpoints for Feedback (general, chat, rag and summary).

rag

Endpoints of the RAG-microservice.

summary

Endpoints of the summary-microservice.

transcription

Endpoints of the transcrpition-microservice including: upload, retrieval, word and upsertion of special terms.

chat

Endpoints of the chat-microservice.

FUNCTION DESCRIPTION
chat_completion

Chat completion endpoint.

chat_completion_json_stream

Chat completion endpoint with json-stream.

chat_completion_stream

Chat completion endpoint with text-stream.

get_chat_endpoint

Extract the rag endpoint from the 'settings'.

get_llms

Return model information of available LLMs.

chat_completion async
chat_completion(request, chat_input)

Chat completion endpoint.

PARAMETER DESCRIPTION
request

the currently performed request

TYPE: Request

chat_input

Input containing the chat message.

TYPE: ChatInput

RETURNS DESCRIPTION
ChatOutput

Output of the chat response.

Source code in docs/microservices/core/src/endpoints/chat.py
@router.post(
    "/completion",
    response_model=ChatOutput,
    summary="Chat completion endpoint.",
    description=(
        "Performs response for chat completions.\n\n"
        "The endpoint returns a single JSON response containing the chat output.\n\n"
    ),
    openapi_extra={
        "requestBody": {
            "content": {
                "application/json": {
                    "examples": ChatInput.model_config["json_schema_extra"][
                        "openapi_examples"
                    ],
                }
            },
        }
    },
    responses=add_auth_status_codes_to_responses(
        {
            200: {
                "description": "Successful chat response.",
                "content": {
                    "application/json": {
                        "examples": ChatOutput.model_config["json_schema_extra"][
                            "openapi_examples"
                        ],
                    },
                },
            },
            400: {"description": "Invalid language model."},
            408: {"description": "Request timeout of a dependency."},
            502: {"description": "Error accessing microservice or LLM client."},
            500: {"description": "Error processing answer of LLM client."},
        }
    ),
)
async def chat_completion(
    request: Request,
    chat_input: ChatInput,
) -> ChatOutput:
    """Chat completion endpoint.

    Args:
        request (Request): the currently performed request
        chat_input (ChatInput): Input containing the chat message.

    Returns:
        Output of the chat response.
    """
    settings = request.state.settings
    chat_endpoint = get_chat_endpoint(settings)
    url = f"{chat_endpoint}/completion"
    json = chat_input.model_dump()

    headers = {"Content-Type": "application/json", "Accept": "application/json"}

    post_config = settings.inter_service_communication.chat
    request_options = {
        "json": json,
        "headers": headers,
    }

    result = await async_post(
        url=url,
        response_model=ChatOutput,
        config=post_config,
        request_options=request_options,
        service_name="Chat",
    )

    return result
chat_completion_json_stream async
chat_completion_json_stream(request, chat_input)

Chat completion endpoint with json-stream.

PARAMETER DESCRIPTION
request

the currently performed request

TYPE: Request

chat_input

Input containing the chat message.

TYPE: ChatInput

RETURNS DESCRIPTION
StreamingResponse

Output of the chat response.

Source code in docs/microservices/core/src/endpoints/chat.py
@router.post(
    "/v2/completion/stream",
    response_class=StreamingResponse,
    summary="Chat completion endpoint with x-ndjson-stream.",
    description=(
        "Starts a streaming response for chat completions.\n\n"
        "The endpoint streams messages as NDJSON (`application/x-ndjson`) "
        "with different types: `response`, `reason`, and `error`."
    ),
    openapi_extra={
        "requestBody": {
            "content": {
                "application/json": {
                    "examples": ChatInput.model_config["json_schema_extra"][
                        "openapi_examples"
                    ],
                }
            },
        }
    },
    responses=add_auth_status_codes_to_responses(
        {
            200: {
                "description": "Streaming started successfully.",
                "content": {
                    "application/x-ndjson": {
                        "examples": ChatStreamOutput.model_config["json_schema_extra"][
                            "openapi_examples"
                        ],
                    },
                },
            },
            400: {"description": "Invalid language model."},
        }
    ),
)
async def chat_completion_json_stream(
    request: Request,
    chat_input: ChatInput,
) -> StreamingResponse:
    """Chat completion endpoint with json-stream.

    Args:
        request (Request): the currently performed request
        chat_input (ChatInput): Input containing the chat message.

    Returns:
        Output of the chat response.

    """
    settings = request.state.settings
    chat_endpoint = get_chat_endpoint(settings)
    url = f"{chat_endpoint}/v2/completion/stream"
    json = chat_input.model_dump()

    headers = {"Content-Type": "application/json", "Accept": "application/json"}

    post_config = settings.inter_service_communication.chat

    return StreamingResponse(
        async_post_stream(
            url=url,
            request_options={
                "json": json,
                "headers": headers,
            },
            config=post_config,
            service_name="Chat",
        ),
        media_type="application/x-ndjson",
        headers={
            "Connection": "keep-alive",
        },
    )
chat_completion_stream async
chat_completion_stream(request, chat_input)

Chat completion endpoint with text-stream.

PARAMETER DESCRIPTION
request

the currently performed request

TYPE: Request

chat_input

Input containing the chat message.

TYPE: ChatInput

RETURNS DESCRIPTION
StreamingResponse

Output of the chat response.

Source code in docs/microservices/core/src/endpoints/chat.py
@router.post(
    "/completion/stream",
    response_class=StreamingResponse,
    summary="Chat completion endpoint with text-stream.",
    description=(
        "Starts a streaming response for chat completions.\n\n"
        "The endpoint streams messages as text (`text/event-stream`)."
    ),
    openapi_extra={
        "requestBody": {
            "content": {
                "application/json": {
                    "examples": ChatInput.model_config["json_schema_extra"][
                        "openapi_examples"
                    ],
                }
            },
        }
    },
    responses=add_auth_status_codes_to_responses(
        {
            200: {
                "description": "Streaming started successfully.",
                "content": {
                    "text/event-stream": {
                        "schema": {
                            "type": "string",
                            "example": "Hello, how can I help you today?\n\n",
                        },
                        "examples": {
                            "response": {
                                "summary": "Chat response",
                                "description": (
                                    "This is the standard output ",
                                    "returned by the chat model for a normal request.",
                                ),
                                "value": "Hello, how can I help you today?\n\n",
                            },
                            "error": {
                                "summary": "Error output",
                                "description": "This example shows the output returned when an error occurs.",
                                "value": "Es ist ein Fehler aufgetreten. Bitte versuchen Sie es später erneut.",
                            },
                        },
                    }
                },
            },
            400: {"description": "Invalid language model."},
        }
    ),
)
async def chat_completion_stream(
    request: Request,
    chat_input: ChatInput,
) -> StreamingResponse:
    """Chat completion endpoint with text-stream.

    Args:
        request (Request): the currently performed request
        chat_input (ChatInput): Input containing the chat message.

    Returns:
        Output of the chat response.
    """
    settings = request.state.settings
    chat_endpoint = get_chat_endpoint(settings)
    url = f"{chat_endpoint}/completion/stream"
    json = chat_input.model_dump()

    headers = {"Content-Type": "application/json", "Accept": "application/json"}

    post_config = settings.inter_service_communication.chat

    return StreamingResponse(
        async_post_stream(
            url=url,
            request_options={
                "json": json,
                "headers": headers,
            },
            config=post_config,
            service_name="Chat",
        ),
        media_type="text/event-stream",
        headers={
            "Connection": "keep-alive",
        },
    )
get_chat_endpoint
get_chat_endpoint(settings)

Extract the rag endpoint from the 'settings'.

Source code in docs/microservices/core/src/endpoints/chat.py
def get_chat_endpoint(settings: Settings) -> str:
    """Extract the rag endpoint from the 'settings'."""
    return str(settings.service_endpoints["chat"]).rstrip("/")
get_llms async
get_llms(request)

Return model information of available LLMs.

PARAMETER DESCRIPTION
request

the currently performed request

TYPE: Request

RETURNS DESCRIPTION
list[dict]

The list of available models.

Source code in docs/microservices/core/src/endpoints/chat.py
@router.get(
    "/llms",
    summary="List available language models.",
    description=("Returns a list of available language models (LLMs).\n\n"),
    responses=add_auth_status_codes_to_responses(
        {
            200: {
                "description": "List of available LLMs.",
                "content": {
                    "application/json": {
                        "example": [
                            {
                                "label": "test_model:mock",
                                "is_remote": False,
                                "name": "test_model_mock",
                            },
                        ]
                    }
                },
            },
            500: {"description": "Internal server error accessing microservice"},
        },
    ),
)
async def get_llms(request: Request) -> list[dict]:
    """Return model information of available LLMs.

    Args:
        request (Request): the currently performed request

    Returns:
        The list of available models.
    """
    chat_endpoint = get_chat_endpoint(request.state.settings)
    async with httpx.AsyncClient(timeout=30) as client:
        response = await client.get(f"{chat_endpoint}/llms")
    result = response.json()

    return result
core

General endpoints of f13 core.

FUNCTION DESCRIPTION
health

Return a health check message.

websocket_endpoint

WebSocket endpoint to establish a bi-directional connection between clients and server.

health async
health()

Return a health check message.

RETURNS DESCRIPTION
dict[str, str]

The health check message as a dictonary.

Source code in docs/microservices/core/src/endpoints/core.py
@router.get(
    "/",
    summary="Health check endpoint",
    description=(
        "Returns a simple message indicating that the F13-Core service is running.\n\n"
        "Use this endpoint to verify that the service is alive and responsive."
    ),
    responses={
        200: {
            "description": "Health check successful",
            "content": {
                "application/json": {"example": {"status": "F13-Core is running"}}
            },
        },
        500: {"description": "Internal server error"},
    },
)
@router.get(
    "/health",
    summary="Health check endpoint",
    description=(
        "Returns a simple message indicating that the F13-Core service is running.\n\n"
        "Use this endpoint to verify that the service is alive and responsive."
    ),
    responses={
        200: {
            "description": "Health check successful",
            "content": {
                "application/json": {"example": {"status": "F13-Core is running"}}
            },
        },
        500: {"description": "Internal server error"},
    },
)
async def health() -> dict[str, str]:
    """Return a health check message.

    Returns:
        The health check message as a dictonary.
    """
    return {"status": "F13-Core is running"}
websocket_endpoint async
websocket_endpoint(websocket, connection_id, token=None)

WebSocket endpoint to establish a bi-directional connection between clients and server.

PARAMETER DESCRIPTION
websocket

The WebSocket connection to be established.

TYPE: WebSocket

connection_id

A client-defined connection ID.

TYPE: str

token

A JWT access tokenIf the application runs in guest_mode the token is ignored

TYPE: str DEFAULT: None

Source code in docs/microservices/core/src/endpoints/core.py
@router.websocket("/ws/{connection_id}")
async def websocket_endpoint(
    websocket: WebSocket,
    connection_id: str,
    token: None | str = None,
) -> None:
    """WebSocket endpoint to establish a bi-directional connection between clients and server.

    Args:
        websocket (WebSocket): The WebSocket connection to be established.
        connection_id (str): A client-defined connection ID.
        token (str): A JWT access tokenIf the application runs in guest_mode the token is ignored

    """
    current_user = await get_current_user(websocket, token)
    logger.debug("WS initiated via a token from %s", current_user)

    # If a connection with the given connection_id already exists: raise Exeption
    if not websocket_manager.check_connection_id_is_unique(connection_id):
        raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)

    # Accept Connection and update websocketHandler
    await websocket.accept()
    websocket_manager.add_websocket(websocket, connection_id)
    websocket_manager.debug(f"Welcome {connection_id}!", connection_id=connection_id)

    try:
        while True:
            message = await websocket.receive_text()
            websocket_manager.debug(
                f"{connection_id} says: {message}", connection_id=connection_id
            )
    except WebSocketDisconnect:
        websocket_manager.remove_websocket(connection_id)
    except Exception as e:
        logger.exception(e)
    finally:
        # Remove Handler, if connection closed
        websocket_manager.remove_websocket(connection_id)
feedback

Endpoints for Feedback (general, chat, rag and summary).

FUNCTION DESCRIPTION
chat_feedback

Store chat-specific feedback in feedback-database.

general_feedback

Store general feedback in feedback-database.

rag_feedback

Store RAG-specific feedback in feedback-database.

summary_feedback

Store Summary-specific feedback in feedback-database.

chat_feedback async
chat_feedback(session, feedback)

Store chat-specific feedback in feedback-database.

PARAMETER DESCRIPTION
session

A database session object.

TYPE: Session

feedback

Chat feedback.

TYPE: ChatFeedback

RETURNS DESCRIPTION
bool

True if feedback was successfully stored.

RAISES DESCRIPTION
HTTPException

Error if general feedback was not stored.

Source code in docs/microservices/core/src/endpoints/feedback.py
@router.post(
    "/chat",
    response_model=bool,
    responses=add_auth_status_codes_to_responses(
        {
            400: {"description": "Invalid input."},
            422: {"description": "Validation error in the request."},
            500: {"description": "Error saving the feedback."},
        }
    ),
)
async def chat_feedback(session: SessionDep, feedback: ChatFeedback) -> bool:
    """Store chat-specific feedback in feedback-database.

    Args:
        session (Session): A database session object.
        feedback (ChatFeedback): Chat feedback.

    Returns:
        True if feedback was successfully stored.

    Raises:
        HTTPException: Error if general feedback was not stored.
    """
    try:
        return save_chat_feedback(session, feedback)
    except Exception as e:
        logger.error(f"Error while saving the Chat-feedback: {e}")
        raise HTTPException(
            status_code=500, detail="Fehler beim Speichern des Feedback."
        )
general_feedback async
general_feedback(session, feedback)

Store general feedback in feedback-database.

PARAMETER DESCRIPTION
session

A database session object.

TYPE: Session

feedback

General feedback.

TYPE: GeneralFeedback

RETURNS DESCRIPTION
bool

True if feedback was successfully stored.

RAISES DESCRIPTION
HTTPException

Error if general feedback was not stored.

Source code in docs/microservices/core/src/endpoints/feedback.py
@router.post(
    "/general",
    response_model=bool,
    responses=add_auth_status_codes_to_responses(
        {
            400: {"description": "Invalid input."},
            422: {"description": "Validation error in the request."},
            500: {"description": "Error saving the feedback."},
        }
    ),
)
async def general_feedback(session: SessionDep, feedback: GeneralFeedback) -> bool:
    """Store general feedback in feedback-database.

    Args:
        session (Session): A database session object.
        feedback (GeneralFeedback): General feedback.

    Returns:
        True if feedback was successfully stored.

    Raises:
        HTTPException: Error if general feedback was not stored.
    """
    try:
        return save_general_feedback(session, feedback)
    except Exception as e:
        logger.error(f"Error while saving the general feedback: {e}")
        raise HTTPException(
            status_code=500, detail="Fehler beim Speichern des Feedback."
        )
rag_feedback async
rag_feedback(session, feedback)

Store RAG-specific feedback in feedback-database.

PARAMETER DESCRIPTION
session

A database session object.

TYPE: Session

feedback

RAG feedback.

TYPE: RAGFeedback

RETURNS DESCRIPTION
bool

True if feedback was successfully stored.

RAISES DESCRIPTION
HTTPException

Error if general feedback was not stored.

Source code in docs/microservices/core/src/endpoints/feedback.py
@router.post(
    "/rag",
    response_model=bool,
    responses=add_auth_status_codes_to_responses(
        {
            400: {"description": "Invalid input."},
            422: {"description": "Validation error in the request."},
            500: {"description": "Error saving the feedback."},
        }
    ),
)
async def rag_feedback(session: SessionDep, feedback: RAGFeedback) -> bool:
    """Store RAG-specific feedback in feedback-database.

    Args:
        session (Session): A database session object.
        feedback (RAGFeedback): RAG feedback.

    Returns:
        True if feedback was successfully stored.

    Raises:
        HTTPException: Error if general feedback was not stored.
    """
    try:
        return save_rag_feedback(session, feedback)
    except Exception as e:
        logger.error(f"Error while saving the RAG-feedback: {e}")
        raise HTTPException(
            status_code=500, detail="Fehler beim Speichern des Feedback."
        )
summary_feedback async
summary_feedback(session, feedback)

Store Summary-specific feedback in feedback-database.

PARAMETER DESCRIPTION
session

A database session object.

TYPE: Session

feedback

Summary feedback.

TYPE: SummaryFeedback

RETURNS DESCRIPTION
bool

True if feedback was successfully stored.

RAISES DESCRIPTION
HTTPException

Error if general feedback was not stored.

Source code in docs/microservices/core/src/endpoints/feedback.py
@router.post(
    "/summary",
    response_model=bool,
    responses=add_auth_status_codes_to_responses(
        {
            400: {"description": "Invalid input."},
            422: {"description": "Validation error in the request."},
            500: {"description": "Error saving the feedback."},
        }
    ),
)
async def summary_feedback(session: SessionDep, feedback: SummaryFeedback) -> bool:
    """Store Summary-specific feedback in feedback-database.

    Args:
        session (Session): A database session object.
        feedback (SummaryFeedback): Summary feedback.

    Returns:
        True if feedback was successfully stored.

    Raises:
        HTTPException: Error if general feedback was not stored.
    """
    try:
        return save_summary_feedback(session, feedback)
    except Exception as e:
        logger.error(f"Error while saving the Summary-feedback: {e}")
        raise HTTPException(
            status_code=500, detail="Fehler beim Speichern des Feedback."
        )
rag

Endpoints of the RAG-microservice.

FUNCTION DESCRIPTION
database_rag

RAG Endpoint for knowledge DB.

fetch_file_rag

Endpoint for RAG on a given list of files.

get_llms

LLM-Information of available Models for RAG.

get_rag_endpoint

Extract the rag endpoint from the general settings.

get_sources

Available sources in the knowledge database.

no_database_ingestion

Ingestion Endpoint, returns forbidden Error message. Only for internal use.

database_rag async
database_rag(request, _auth, rag_input)

RAG Endpoint for knowledge DB.

PARAMETER DESCRIPTION
request

the currently performed request

TYPE: Request

_auth

Authorization dependency that checks for required permissions.

TYPE: None

rag_input

Input containing RAG request.

TYPE: RAGInput

RETURNS DESCRIPTION
list[RAGOutput]

RAG response.

Source code in docs/microservices/core/src/endpoints/rag.py
@router.post(
    "/database",
    response_model=list[RAGOutput],
    summary="RAG knowledge database endpoint.",
    description=(
        "Performs a RAG (Retrieval-Augmented Generation) query on the knowledge database.\n\n"
        "The endpoint retrieves relevant documents based on the input filters and question, "
        "then generates answers using the specified language model."
    ),
    openapi_extra={
        "requestBody": {
            "content": {
                "application/json": {
                    "examples": RAGInput.model_config["json_schema_extra"][
                        "openapi_examples"
                    ],
                }
            }
        },
    },
    responses=add_auth_status_codes_to_responses(
        {
            200: {
                "description": "Successful RAG response with retrieved documents.",
                "content": {
                    "application/json": {
                        "examples": RAGOutput.model_config["json_schema_extra"][
                            "openapi_examples"
                        ],
                    }
                },
            },
            400: {"description": "Invalid language model."},
            408: {"description": "Request timeout of a dependency."},
            500: {"description": "Internal server error."},
            502: {"description": "Error accessing microservice."},
        }
    ),
)
async def database_rag(
    request: Request,
    _auth: Annotated[None, needs_permissions([Permission.RAG_DATABASE])],
    rag_input: RAGInput,
) -> list[RAGOutput]:
    """RAG Endpoint for knowledge DB.

    Args:
        request (Request): the currently performed request
        _auth: Authorization dependency that checks for required permissions.
        rag_input (RAGInput): Input containing RAG request.

    Returns:
        RAG response.
    """
    settings = request.state.settings
    post_config = settings.inter_service_communication.rag
    request_options = {
        "json": rag_input.model_dump(),
        "headers": {
            "Content-Type": "application/json",
            "Accept": "application/json",
        },
    }

    rag_endpoint = get_rag_endpoint(settings)
    result = await async_post(
        url=f"{rag_endpoint}/database",
        response_model=RAGOutput,
        config=post_config,
        request_options=request_options,
        service_name="RAG",
    )

    return result
fetch_file_rag async
fetch_file_rag(request, _auth, request_timestamp=Form(..., description='Unix timestamp of the request.', examples=[1731252767]), question=Form(..., description="The user's input question to be answered.", examples=['What did the parties decide?']), language_model=Form(..., description='Identifier for the language model to use.', examples=['test_model_mock']), max_chunks_to_use=Form(10, description='Optional upper limit on the number of text chunks used for the response.', examples=[5]), files=File(..., description='One or more files (pdf, txt, docx).'))

Endpoint for RAG on a given list of files.

PARAMETER DESCRIPTION
request

the currently performed request

TYPE: Request

_auth

Authorization dependency that checks for required permissions.

TYPE: None

request_timestamp

The timestamp of the request.

TYPE: int DEFAULT: Form(..., description='Unix timestamp of the request.', examples=[1731252767])

question

The question to be answered, encoded as FormData.

TYPE: str DEFAULT: Form(..., description="The user's input question to be answered.", examples=['What did the parties decide?'])

language_model

The selected language model. LLMs are defined in configs/llm_models.yml.

TYPE: str DEFAULT: Form(..., description='Identifier for the language model to use.', examples=['test_model_mock'])

max_chunks_to_use

The number of chunks to use for answer generation.

TYPE: int DEFAULT: Form(10, description='Optional upper limit on the number of text chunks used for the response.', examples=[5])

files

List of binary-encoded uploaded files (e.g., PDF, DOCX/ODT, TXT), sent as FormData.

TYPE: list DEFAULT: File(..., description='One or more files (pdf, txt, docx).')

RETURNS DESCRIPTION
list[RAGOutput]

The generated answer as an inference result.

Source code in docs/microservices/core/src/endpoints/rag.py
@router.post(
    "/file",
    response_model=list[RAGOutput],
    summary="RAG file endpoint with multiple uploads.",
    description=(
        "Performs a RAG (Retrieval-Augmented Generation) query based on uploaded files.\n\n"
        "The endpoint parses and chunks the uploaded files, retrieves relevant documents, "
        "and generates answers using the specified language model."
    ),
    responses=add_auth_status_codes_to_responses(
        {
            200: {
                "description": "Successful RAG response with retrieved documents.",
                "content": {
                    "application/json": {
                        "examples": RAGOutput.model_config["json_schema_extra"][
                            "openapi_examples"
                        ],
                    }
                },
            },
            400: {"description": "Invalid language model or request."},
            408: {"description": "Request timeout of a dependency."},
            500: {"description": "Internal server error."},
            502: {"description": "Error accessing microservice."},
        }
    ),
)
async def fetch_file_rag(
    request: Request,
    _auth: Annotated[None, needs_permissions([Permission.RAG_FILE])],
    request_timestamp: int = Form(
        ...,
        description="Unix timestamp of the request.",
        examples=[1731252767],
    ),
    question: str = Form(
        ...,
        description="The user's input question to be answered.",
        examples=["What did the parties decide?"],
    ),
    language_model: str = Form(
        ...,
        description="Identifier for the language model to use.",
        examples=["test_model_mock"],
    ),
    max_chunks_to_use: int | None = Form(
        10,
        description="Optional upper limit on the number of text chunks used for the response.",
        examples=[5],
    ),
    files: list[UploadFile] = File(
        ...,
        description="One or more files (pdf, txt, docx).",
    ),
) -> list[RAGOutput]:
    """Endpoint for RAG on a given list of files.

    Args:
        request (Request): the currently performed request
        _auth: Authorization dependency that checks for required permissions.
        request_timestamp (int): The timestamp of the request.
        question (str): The question to be answered, encoded as FormData.
        language_model (str): The selected language model. LLMs are defined in configs/llm_models.yml.
        max_chunks_to_use (int): The number of chunks to use for answer generation.
        files (list): List of binary-encoded uploaded files (e.g., PDF, DOCX/ODT, TXT), sent as FormData.

    Returns:
        The generated answer as an inference result.
    """
    settings = request.state.settings
    # validating input data
    rag_input = RAGInput(
        request_timestamp=request_timestamp,
        question=question,
        meta_data_filters=None,
        max_chunks_to_use=max_chunks_to_use,
        language_model=language_model,
    )

    # preparing uploaded files for sending to rag microservice
    files = [("files", (f.filename, await f.read(), f.content_type)) for f in files]

    post_config = settings.inter_service_communication.rag
    request_options = {
        "data": rag_input.model_dump(),
        "files": files,
    }

    rag_endpoint = get_rag_endpoint(request.state.settings)
    result = await async_post(
        url=f"{rag_endpoint}/file",
        response_model=RAGOutput,
        config=post_config,
        request_options=request_options,
        service_name="RAG",
    )

    return result
get_llms async
get_llms(request, _authenticated_user)

LLM-Information of available Models for RAG.

PARAMETER DESCRIPTION
request

the currently performed request

TYPE: Request

_authenticated_user

the (authenticated) user performing the request This is 'None' if the application is configured for guest mode.

TYPE: User

RETURNS DESCRIPTION
list[dict]

List of available RAG models.

Source code in docs/microservices/core/src/endpoints/rag.py
@router.get(
    "/llms",
    summary="List available language models.",
    description=("Returns a list of available language models (LLMs).\n\n"),
    responses=add_auth_status_codes_to_responses(
        {
            200: {
                "description": "List of available LLMs.",
                "content": {
                    "application/json": {
                        "example": [
                            {
                                "label": "test_model:mock",
                                "is_remote": False,
                                "name": "test_model_mock",
                            },
                        ]
                    }
                },
            },
            500: {"description": "Internal server error accessing microservice"},
        }
    ),
)
async def get_llms(request: Request, _authenticated_user: UserDep) -> list[dict]:
    """LLM-Information of available Models for RAG.

    Args:
        request (Request): the currently performed request
        _authenticated_user (User): the (authenticated) user performing the request
            This is 'None' if the application is configured for guest mode.

    Returns:
        List of available RAG models.
    """
    rag_endpoint = get_rag_endpoint(request.state.settings)
    async with httpx.AsyncClient(timeout=30) as client:
        response = await client.get(f"{rag_endpoint}/llms")
    result = response.json()

    return result
get_rag_endpoint
get_rag_endpoint(settings)

Extract the rag endpoint from the general settings.

Source code in docs/microservices/core/src/endpoints/rag.py
def get_rag_endpoint(settings: Settings) -> str:
    """Extract the rag endpoint from the general settings."""
    return str(settings.service_endpoints["rag"]).rstrip("/")
get_sources async
get_sources(request, _auth)

Available sources in the knowledge database.

PARAMETER DESCRIPTION
request

the currently performed request

TYPE: Request

_auth

Authorization dependency that checks for required permissions.

TYPE: None

RETURNS DESCRIPTION
list[RAGDatabaseSources]

List of available database sources (e.g. Koalitionsvertrag, Drucksachen).

Source code in docs/microservices/core/src/endpoints/rag.py
@router.get(
    "/sources",
    response_model=list[RAGDatabaseSources],
    summary="List available RAG database sources.",
    description=(
        "Returns a list of all available sources in the knowledge database.\n\n"
        "Each source contains its name and whether date-based filtering is applied."
    ),
    responses=add_auth_status_codes_to_responses(
        {
            200: {
                "description": "List of available database sources.",
                "content": {
                    "application/json": {
                        "examples": RAGDatabaseSources.model_config[
                            "json_schema_extra"
                        ]["openapi_examples"],
                    }
                },
            },
            500: {"description": "Internal server error accessing microservice."},
        }
    ),
)
async def get_sources(
    request: Request,
    _auth: Annotated[None, needs_permissions([Permission.RAG_DATABASE])],
) -> list[RAGDatabaseSources]:
    """Available sources in the knowledge database.

    Args:
        request (Request): the currently performed request
        _auth: Authorization dependency that checks for required permissions.

    Returns:
        List of available database sources (e.g. Koalitionsvertrag, Drucksachen).
    """
    rag_endpoint = get_rag_endpoint(request.state.settings)
    async with httpx.AsyncClient(timeout=30) as client:
        response = await client.get(url=f"{rag_endpoint}/sources")
    result = response.json()

    return result
no_database_ingestion async
no_database_ingestion()

Ingestion Endpoint, returns forbidden Error message. Only for internal use.

Source code in docs/microservices/core/src/endpoints/rag.py
@router.post(
    "/database_ingestion",
    responses={
        403: {
            "description": "Ingestion-enpoint only for internal use.",
        }
    },
)
async def no_database_ingestion() -> None:
    """Ingestion Endpoint, returns forbidden Error message. Only for internal use."""
    raise HTTPException(
        status_code=403,
        detail="This endpoint is for internal use only.",
    )
summary

Endpoints of the summary-microservice.

FUNCTION DESCRIPTION
get_llms

LLM-Information of available Models for Summary.

get_summary_endpoint

Extract the summary endpoint from the general settings.

summarize_file

Summary of a pdf, docx or txt file including its parsing, cleaning and chunking.

summarize_text

Summary of text input including basic text cleaning and chunking of text input.

get_llms async
get_llms(request)

LLM-Information of available Models for Summary.

PARAMETER DESCRIPTION
request

the currently performed request

TYPE: Request

RETURNS DESCRIPTION
list[dict]

List of available summary models.

Source code in docs/microservices/core/src/endpoints/summary.py
@router.get(
    "/llms",
    summary="List available language models.",
    description=("Returns a list of available language models (LLMs).\n\n"),
    responses=add_auth_status_codes_to_responses(
        {
            200: {
                "description": "List of available LLMs.",
                "content": {
                    "application/json": {
                        "example": [
                            {
                                "label": "test_model:mock",
                                "is_remote": False,
                                "name": "test_model_mock",
                            },
                        ]
                    }
                },
            },
            500: {"description": "Internal server error accessing microservice"},
        }
    ),
)
async def get_llms(request: Request) -> list[dict]:
    """LLM-Information of available Models for Summary.

    Args:
        request (Request): the currently performed request

    Returns:
        List of available summary models.
    """
    summary_endpoint = get_summary_endpoint(request.state.settings)
    async with httpx.AsyncClient(timeout=30) as client:
        response = await client.get(f"{summary_endpoint}/llms")
    result = response.json()

    return result
get_summary_endpoint
get_summary_endpoint(settings)

Extract the summary endpoint from the general settings.

Source code in docs/microservices/core/src/endpoints/summary.py
def get_summary_endpoint(settings: Settings) -> str:
    """Extract the summary endpoint from the general settings."""
    return str(settings.service_endpoints["summary"]).rstrip("/")
summarize_file async
summarize_file(request, file=File(..., description='Upload a PDF, DOCX, or TXT file.'), api_input=Depends(SummaryFileAPIInputParameters.as_form))

Summary of a pdf, docx or txt file including its parsing, cleaning and chunking.

PARAMETER DESCRIPTION
request

the currently performed request

TYPE: Request

file

Input file either pdf, docx or txt.

TYPE: UploadFile DEFAULT: File(..., description='Upload a PDF, DOCX, or TXT file.')

api_input

Containing the name of the language model, desired length of summary output as number of DIN-A4 pages, topics on which the summary should be focussed.

TYPE: SummaryFileAPIInputParameters DEFAULT: Depends(as_form)

RETURNS DESCRIPTION
SummaryAPIOutput

The summary, a message to the user and the parsing output.

Source code in docs/microservices/core/src/endpoints/summary.py
@router.post(
    "/file",
    response_model=SummaryAPIOutput,
    summary="File summary endpoint.",
    description=(
        "Generates a summary for a PDF, DOCX, or TXT file.\n\n"
        "The endpoint parses, cleans, chunks the input file and then summarizes the text "
        "according to the requested output length, focusing the summary on the user-defined topics "
        "and using the specified language model."
    ),
    openapi_extra={
        "requestBody": {
            "content": {
                "application/json": {
                    "examples": SummaryFileAPIInputParameters.model_config[
                        "json_schema_extra"
                    ]["openapi_examples"],
                }
            }
        }
    },
    responses=add_auth_status_codes_to_responses(
        {
            200: {
                "description": "Successful summary generation.",
                "content": {
                    "application/json": {
                        "examples": SummaryAPIOutput.model_config["json_schema_extra"][
                            "openapi_examples"
                        ],
                    }
                },
            },
            400: {"description": "Invalid request."},
            424: {"description": "Failed dependency."},
            408: {"description": "Request timeout of a dependency."},
            502: {"description": "Error accessing microservice."},
        }
    ),
)
async def summarize_file(
    request: Request,
    file: UploadFile = File(..., description="Upload a PDF, DOCX, or TXT file."),
    api_input: SummaryFileAPIInputParameters = Depends(
        SummaryFileAPIInputParameters.as_form
    ),
) -> SummaryAPIOutput:
    """Summary of a pdf, docx or txt file including its parsing, cleaning and chunking.

    Args:
        request (Request): the currently performed request
        file (UploadFile): Input file either pdf, docx or txt.
        api_input (SummaryFileAPIInputParameters): Containing the name of the language model, desired length of
            summary output as number of DIN-A4 pages, topics on which the summary should be focussed.

    Returns:
        The summary, a message to the user and the parsing output.
    """
    settings = request.state.settings
    summary_endpoint = get_summary_endpoint(settings)
    url = f"{summary_endpoint}/summary/file"
    files = dict(file=(file.filename, file.file, file.content_type))
    data = dict(
        language_model=api_input.language_model,
        output_length=api_input.output_length,
        topics=api_input.topics,
    )
    post_config = settings.inter_service_communication.summary
    request_options = {
        "files": files,
        "data": data,
    }

    result = await async_post(
        url=url,
        response_model=SummaryAPIOutput,
        config=post_config,
        request_options=request_options,
        service_name="Summary",
    )

    return result
summarize_text async
summarize_text(request, api_input)

Summary of text input including basic text cleaning and chunking of text input.

PARAMETER DESCRIPTION
request

the currently performed request

TYPE: Request

api_input

Containing the input text, name of the language model, desired length of summary output as number of DIN-A4 pages, topics on which the summary should be focussed.

TYPE: SummaryTextAPIInput

RETURNS DESCRIPTION
SummaryAPIOutput

The summary, a message to the user and the parsing output.

Source code in docs/microservices/core/src/endpoints/summary.py
@router.post(
    "/text",
    response_model=SummaryAPIOutput,
    summary="Text summary endpoint.",
    description=(
        "Generates a summary from a plain text input.\n\n"
        "The endpoint performs basic text cleaning, chunking, and then summarizes the text "
        "according to the requested output length, focusing the summary on the user-defined topics "
        "and using the specified language model."
    ),
    openapi_extra={
        "requestBody": {
            "content": {
                "application/json": {
                    "examples": SummaryTextAPIInput.model_config["json_schema_extra"][
                        "openapi_examples"
                    ],
                }
            }
        }
    },
    responses=add_auth_status_codes_to_responses(
        {
            200: {
                "description": "Successful summary generation.",
                "content": {
                    "application/json": {
                        "examples": SummaryAPIOutput.model_config["json_schema_extra"][
                            "openapi_examples"
                        ],
                    }
                },
            },
            400: {"description": "Invalid request."},
            424: {"description": "Failed dependency."},
            408: {"description": "Request timeout of a dependency."},
            502: {"description": "Error accessing microservice."},
        }
    ),
)
async def summarize_text(
    request: Request,
    api_input: SummaryTextAPIInput,
) -> SummaryAPIOutput:
    """Summary of text input including basic text cleaning and chunking of text input.

    Args:
        request (Request): the currently performed request
        api_input (SummaryTextAPIInput): Containing the input text, name of the language model, desired length of
            summary output as number of DIN-A4 pages, topics on which the summary should be focussed.

    Returns:
        The summary, a message to the user and the parsing output.
    """
    settings = request.state.settings
    summary_endpoint = get_summary_endpoint(settings)
    url = f"{summary_endpoint}/summary/text"
    json = dict(
        text=api_input.text,
        language_model=api_input.language_model,
        output_length=float(api_input.output_length),
        topics=api_input.topics,
    )
    headers = {"Content-Type": "application/json", "Accept": "application/json"}

    post_config = settings.inter_service_communication.summary
    request_options = {
        "json": json,
        "headers": headers,
    }

    result = await async_post(
        url=url,
        response_model=SummaryAPIOutput,
        config=post_config,
        request_options=request_options,
        service_name="Summary",
    )

    return result
transcription

Endpoints of the transcrpition-microservice including: upload, retrieval, word and upsertion of special terms.

FUNCTION DESCRIPTION
check_transcription_exists

Verify if transcription exists for given job ID.

get_transcription

Retrieve transcription for given job ID.

get_transcription_endpoint

Extract the transcription endpoint from the 'settings'.

post_transcription_docx_or_zip

Retrieve stored transcription as DOCX document.

translate

Translate the translation_chunks to German.

upload

Upload file for transcription endpoint.

upload_special_terms

Upserts new special terms to database.

check_transcription_exists async
check_transcription_exists(request, job_ids)

Verify if transcription exists for given job ID.

Can be used for periodic checks. No retrieval of transcription.

PARAMETER DESCRIPTION
request

The currently performed request.

TYPE: Request

job_ids

List of transcription job IDs to check.

TYPE: CheckTranscriptionExistsRequest

RETURNS DESCRIPTION
TranscriptionStatusResponse

Contains job_id and status of the job.

TYPE: TranscriptionStatusResponse

RAISES DESCRIPTION
HTTPException

If no job_id is provided in the request body.

Source code in docs/microservices/core/src/endpoints/transcription.py
@router.post(
    "/check_transcription_exists",
    summary="Check if transcriptions exists and retrieve status",
    description="Check whether transcriptions exist for the given job IDs. Returns booleans and status.",
    openapi_extra={
        "requestBody": {
            "content": {
                "application/json": {
                    "examples": CheckTranscriptionExistsRequest.model_config[
                        "json_schema_extra"
                    ]["openapi_examples"],
                }
            },
        }
    },
    response_model=TranscriptionStatusResponse,
    responses=add_auth_status_codes_to_responses(
        {
            200: {
                "description": "Successful status check.",
                "content": {
                    "application/json": {
                        "examples": TranscriptionStatusResponse.model_config[
                            "json_schema_extra"
                        ]["openapi_examples"]
                    }
                },
            },
            400: {"description": "Payload must contain at least one job_id."},
            500: {"description": "Internal Server Error"},
        },
    ),
)
async def check_transcription_exists(
    request: Request,
    job_ids: CheckTranscriptionExistsRequest,
) -> TranscriptionStatusResponse:
    """Verify if transcription exists for given job ID.

    Can be used for periodic checks. No retrieval of transcription.

    Args:
        request (Request): The currently performed request.
        job_ids (CheckTranscriptionExistsRequest): List of transcription job IDs to check.

    Returns:
        TranscriptionStatusResponse: Contains job_id and status of the job.

    Raises:
        HTTPException: If no job_id is provided in the request body.
    """
    settings = request.state.settings
    transcription_endpoint = get_transcription_endpoint(settings)
    url: str = f"{transcription_endpoint}/check_transcription_exists"

    job_id_list = job_ids.root
    if not job_id_list:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Payload must contain at least one job_id.",
        )

    request_options = {
        "json": [str(job_id) for job_id in job_id_list],
    }
    post_config = settings.inter_service_communication.transcription

    result = await async_post(
        url=url,
        response_model=TranscriptionStatusResponse,
        request_options=request_options,
        config=post_config,
        service_name="Transcription",
    )
    if result is None:
        return TranscriptionStatusResponse(
            job_statuses=[
                {
                    "job_id": str(job_id),
                    "exists": False,
                    "status": "unavailable",
                }
                for job_id in job_id_list
            ]
        )
    return result
get_transcription async
get_transcription(request, job_id=Path(..., example=TranscriptionBaseResponse.model_config['json_schema_extra']['openapi_examples']['standard']['value']['job_id']))

Retrieve transcription for given job ID.

PARAMETER DESCRIPTION
request

The currently performed request.

TYPE: Request

job_id

Unique identifier for transcription job

TYPE: UUID DEFAULT: Path(..., example=model_config['json_schema_extra']['openapi_examples']['standard']['value']['job_id'])

RETURNS DESCRIPTION
TranscriptionReceiveResponse

Contains job_id, transcription text if found, None if not found, translation and matched special terms.

TYPE: TranscriptionReceiveResponse

Source code in docs/microservices/core/src/endpoints/transcription.py
@router.get(
    "/retrieve_transcription/{job_id}",
    summary="Retrieve transcription",
    description="""
    Retrieve the transcription text for the given job ID.
    The transcription remains stored until deleted by the recurring deletion process.""",
    response_model=TranscriptionReceiveResponse,
    responses=add_auth_status_codes_to_responses(
        {
            200: {
                "description": "Transcription retrieved (possibly empty)",
                "content": {
                    "application/json": {
                        "examples": TranscriptionReceiveResponse.model_config[
                            "json_schema_extra"
                        ]["openapi_examples"]
                    }
                },
            },
            404: {
                "description": "No transcription found for the given job ID.",
                "content": {
                    "application/json": {
                        "example": {"detail": "No transcription found for job_id=..."}
                    }
                },
            },
            500: {"description": "Internal Server Error"},
        },
    ),
)
async def get_transcription(
    request: Request,
    job_id: UUID = Path(
        ...,
        example=TranscriptionBaseResponse.model_config["json_schema_extra"][
            "openapi_examples"
        ]["standard"]["value"]["job_id"],
    ),
) -> TranscriptionReceiveResponse:
    """Retrieve transcription for given job ID.

    Args:
        request (Request): The currently performed request.
        job_id (UUID): Unique identifier for transcription job

    Returns:
        TranscriptionReceiveResponse: Contains job_id, transcription text if found,
              None if not found, translation and matched special terms.
    """
    settings = request.state.settings
    transcription_endpoint = get_transcription_endpoint(settings)
    url = f"{transcription_endpoint}/retrieve_transcription/{job_id}"

    async with httpx.AsyncClient(timeout=30) as client:
        response = await client.get(url=url)

    payload = response.json()

    if response.status_code != status.HTTP_200_OK:
        error_detail = (
            payload.get("detail", response.text)
            if isinstance(payload, dict)
            else response.text
        )
        raise HTTPException(status_code=response.status_code, detail=error_detail)

    return TranscriptionReceiveResponse(**payload)
get_transcription_endpoint
get_transcription_endpoint(settings)

Extract the transcription endpoint from the 'settings'.

Source code in docs/microservices/core/src/endpoints/transcription.py
def get_transcription_endpoint(settings: Settings) -> str:
    """Extract the transcription endpoint from the 'settings'."""
    return str(settings.service_endpoints["transcription"]).rstrip("/")
post_transcription_docx_or_zip async
post_transcription_docx_or_zip(request, items)

Retrieve stored transcription as DOCX document.

PARAMETER DESCRIPTION
request

The currently performed request.

TYPE: Request

items

The requested transcripts with speaker rename mapping.

TYPE: list[CreateWordFilePayload]

RETURNS DESCRIPTION
StreamingResponse

DOCX/ZIP file as downloadable response.

TYPE: StreamingResponse

RAISES DESCRIPTION
HTTPException

If payload is empty

Source code in docs/microservices/core/src/endpoints/transcription.py
@router.post(
    "/download_docx",
    summary="Download a single DOCX or a ZIP of DOCX transcriptions",
    description="Send a list of items with uuid and speaker mapping. If list has one item, returns a single DOCX. \
        If multiple, returns a ZIP.",
    response_class=StreamingResponse,
    openapi_extra={
        "requestBody": {
            "content": {
                "application/json": {
                    "examples": CreateWordFilePayload.model_config["json_schema_extra"][
                        "openapi_examples"
                    ]
                }
            }
        }
    },
    responses=add_auth_status_codes_to_responses(
        {
            200: {
                "description": "DOCX or ZIP file",
                "content": {
                    "application/vnd.openxmlformats-officedocument.wordprocessingml.document": {
                        "schema": {"type": "string", "format": "binary"}
                    },
                    "application/zip": {
                        "schema": {"type": "string", "format": "binary"}
                    },
                },
            },
            400: {"description": "Payload must contain at least one item."},
            404: {"description": "No transcription found for the given job ID."},
            500: {"description": "Internal Server Error"},
        },
    ),
)
async def post_transcription_docx_or_zip(
    request: Request,
    items: list[CreateWordFilePayload],
) -> StreamingResponse:
    """Retrieve stored transcription as DOCX document.

    Args:
        request (Request): The currently performed request.
        items (list[CreateWordFilePayload]): The requested transcripts with speaker rename mapping.

    Returns:
        StreamingResponse: DOCX/ZIP file as downloadable response.

    Raises:
        HTTPException: If payload is empty

    """
    settings = request.state.settings
    transcription_endpoint = get_transcription_endpoint(settings)
    url = f"{transcription_endpoint}/download_docx"

    if not items:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Payload must contain at least one item.",
        )

    client = httpx.AsyncClient(timeout=PostConfig().timeout_in_s)
    resp_context_manager = client.stream("POST", url, json=jsonable_encoder(items))
    resp = await resp_context_manager.__aenter__()

    if resp.status_code != status.HTTP_200_OK:
        close_streaming_client_context_manager(resp, resp_context_manager, client)

    headers = {
        k: v
        for k, v in resp.headers.items()
        if k.lower()
        in {
            "content-type",
            "content-disposition",
            "content-length",
        }
    }

    return StreamingResponse(
        stream_bytes_from_streaming_response(resp, resp_context_manager, client),
        headers=headers,
        media_type=headers.get("content-type", "application/octet-stream"),
        status_code=resp.status_code,
    )
translate async
translate(request, translation_text)

Translate the translation_chunks to German.

PARAMETER DESCRIPTION
request

The currently performed request.

TYPE: Request

translation_text

Text to translate

TYPE: TranslateRequest

RETURNS DESCRIPTION
translation

The full combined translated text

TYPE: TranslateAPIOutput

Source code in docs/microservices/core/src/endpoints/transcription.py
@router.post(
    "/translate",
    summary="Translate text to German",
    description=("Uses an LLM to translate a text to German."),
    response_model=TranslateAPIOutput,
    openapi_extra={
        "requestBody": {
            "content": {
                "application/json": {
                    "examples": TranslateRequest.model_config["json_schema_extra"][
                        "openapi_examples"
                    ]
                }
            }
        }
    },
    responses=add_auth_status_codes_to_responses(
        {
            200: {
                "description": "Successful translation response.",
                "content": {
                    "application/json": {
                        "examples": {
                            "translation": TranslateAPIOutput.model_config[
                                "json_schema_extra"
                            ]["openapi_examples"],
                        }
                    }
                },
            },
            404: {"description": "Job not found or no chunks available"},
            500: {"description": "Internal Server Error"},
        },
    ),
)
async def translate(
    request: Request,
    translation_text: TranslateRequest,
) -> TranslateAPIOutput:
    """Translate the translation_chunks to German.

    Args:
        request (Request): The currently performed request.
        translation_text (TranslateRequest): Text to translate

    Returns:
        translation (TranslateAPIOutput): The full combined translated text

    """
    settings = request.state.settings
    transcription_endpoint = get_transcription_endpoint(settings)
    url = f"{transcription_endpoint}/translate"

    post_config = settings.inter_service_communication.transcription

    request_options = {
        "json": translation_text.model_dump(),
    }
    result = await async_post(
        url=url,
        response_model=TranslateAPIOutput,
        config=post_config,
        request_options=request_options,
    )
    return result
upload async
upload(request, file=File(..., description='Audio or video file to transcribe.'))

Upload file for transcription endpoint.

PARAMETER DESCRIPTION
request

The currently performed request.

TYPE: Request

file

Uploaded file from client.

TYPE: File DEFAULT: File(..., description='Audio or video file to transcribe.')

RETURNS DESCRIPTION
TranscriptionBaseResponse

The unique job ID for tracking transcription status

TYPE: TranscriptionBaseResponse

Source code in docs/microservices/core/src/endpoints/transcription.py
@router.post(
    "/upload",
    summary="Upload file for transcription",
    description="Upload an audio/video file. Returns a job ID to track transcription status.",
    status_code=status.HTTP_201_CREATED,
    response_model=TranscriptionBaseResponse,
    responses=add_auth_status_codes_to_responses(
        {
            201: {
                "description": "Successful upload response.",
                "content": {
                    "application/json": {
                        "examples": TranscriptionBaseResponse.model_config[
                            "json_schema_extra"
                        ]["openapi_examples"]
                    }
                },
            },
            400: {"description": "There was an error parsing the body."},
            413: {"description": "File too large."},
            415: {
                "description": "Uploaded file is invalid or in an unsupported state."
            },
            500: {"description": "Internal Server Error"},
        },
    ),
)
async def upload(
    request: Request,
    file: UploadFile = File(
        ...,
        description="Audio or video file to transcribe.",
    ),
) -> TranscriptionBaseResponse:
    """Upload file for transcription endpoint.

    Args:
        request (Request): The currently performed request.
        file (File): Uploaded file from client.

    Returns:
        TranscriptionBaseResponse: The unique job ID for tracking transcription status
    """
    settings = request.state.settings
    transcription_endpoint = get_transcription_endpoint(settings)
    url: str = f"{transcription_endpoint}/check_transcription_exists"
    url = f"{transcription_endpoint}/upload"

    files = [("file", (file.filename, file.file, file.content_type))]

    request_options = {
        "files": files,
    }
    post_config = settings.inter_service_communication.transcription

    result = await async_post(
        url=url,
        response_model=TranscriptionBaseResponse,
        config=post_config,
        request_options=request_options,
        service_name="Transcription",
    )

    return result
upload_special_terms async
upload_special_terms(request, file=File(..., description='CSV/TXT with headers: Fachbegriff, Bedeutung, Themenbereich'))

Upserts new special terms to database.

PARAMETER DESCRIPTION
request

The currently performed request.

TYPE: Request

file

Uploaded TXT/CSV file.

TYPE: File DEFAULT: File(..., description='CSV/TXT with headers: Fachbegriff, Bedeutung, Themenbereich')

RETURNS DESCRIPTION
UpsertTermsAPIOutput

Number of upsertions and details if there were problems

TYPE: UpsertTermsAPIOutput

Source code in docs/microservices/core/src/endpoints/transcription.py
@router.post(
    "/special_terms/upload",
    summary="Upload/update special terms (CSV/TXT)",
    description="""Accepts a file with columns (Fachbegriff, Bedeutung, Themenbereich) and
     forwards it to the transcription to upsert the special-terms DB.""",
    response_model=UpsertTermsAPIOutput,
    openapi_extra={
        "requestBody": {
            "content": {
                "text/plain": {
                    "examples": {
                        "inline_example": {
                            "summary": "Simple CSV/TXT content",
                            "description": "Example file content with the required headers and a few entries.",
                            "value": (
                                "Fachbegriff, Bedeutung, Themenbereich\n"
                                "API, Schnittstelle, IT\n"
                                "und, Verbindungswort, Konjunktion"
                            ),
                        }
                    }
                }
            }
        }
    },
    responses=add_auth_status_codes_to_responses(
        {
            200: {
                "description": "Upsert completed",
                "content": {
                    "application/json": {
                        "examples": UpsertTermsAPIOutput.model_config[
                            "json_schema_extra"
                        ]["openapi_examples"]
                    }
                },
            },
            400: {"detail": "There was an error parsing the body."},
            422: {"description": "Missing required columns."},
            500: {"description": "Internal Server Error"},
        },
    ),
)
async def upload_special_terms(
    request: Request,
    file: UploadFile = File(
        ..., description="CSV/TXT with headers: Fachbegriff, Bedeutung, Themenbereich"
    ),
) -> UpsertTermsAPIOutput:
    """Upserts new special terms to database.

    Args:
        request (Request): The currently performed request.
        file (File): Uploaded TXT/CSV file.

    Returns:
        UpsertTermsAPIOutput: Number of upsertions and details if there were problems
    """
    settings = request.state.settings
    transcription_endpoint = get_transcription_endpoint(settings)
    upload_url = f"{transcription_endpoint}/special_terms/upload"
    files = [("file", (file.filename, file.file, file.content_type or "text/plain"))]

    post_config = settings.inter_service_communication.transcription

    request_options = {
        "files": files,
    }
    result = await async_post(
        url=upload_url,
        response_model=UpsertTermsAPIOutput,
        config=post_config,
        request_options=request_options,
    )
    return result

feedback

Models and functions to store feedback for chat, RAG and summary.

CLASS DESCRIPTION
TableChatFeedback

Feedback for Chat.

TableGeneralFeedback

General Feedback.

TableRAGFeedback

Feedback for RAG.

TableSummaryFeedback

Feedback for Summary.

FUNCTION DESCRIPTION
cleanup_userfeedback

Delete feedback entries older than given number of days.

create_db_session

Create the basic DB engine and session.

get_db_session

A SQLAlchemy database session for the feedback database bound in a context manager.

save_chat_feedback

Store a chat feedback in the database.

save_general_feedback

Store the general feedback in the database.

save_rag_feedback

Save RAG feedback in the database.

save_summary_feedback

Store a summary feedback in the database.

TableChatFeedback

Bases: Base

Feedback for Chat.

Source code in docs/microservices/core/src/feedback.py
class TableChatFeedback(Base):
    """Feedback for Chat."""

    __tablename__ = "feedback_chat"

    id = Column(Integer, primary_key=True, autoincrement=True)
    timestamp = Column(DateTime(timezone=True), default=func.now())
    feedback = Column(Enum(FeedbackRating))
    feedback_text = Column(Text)

    user_prompt = Column(Text)
    chat_answer = Column(Text)

    llm_label = Column(Text)
    llm_name = Column(Text)
    llm_is_remote = Column(Boolean)
TableGeneralFeedback

Bases: Base

General Feedback.

Source code in docs/microservices/core/src/feedback.py
class TableGeneralFeedback(Base):
    """General Feedback."""

    __tablename__ = "feedback_general"

    id = Column(Integer, primary_key=True, autoincrement=True)
    timestamp = Column(DateTime(timezone=True), default=func.now())
    feedback = Column(Enum(FeedbackRating))
    feedback_text = Column(Text)
TableRAGFeedback

Bases: Base

Feedback for RAG.

Source code in docs/microservices/core/src/feedback.py
class TableRAGFeedback(Base):
    """Feedback for RAG."""

    __tablename__ = "feedback_rag"

    id = Column(Integer, primary_key=True, autoincrement=True)
    timestamp = Column(DateTime(timezone=True), default=func.now())
    feedback = Column(Enum(FeedbackRating))
    feedback_text = Column(Text)

    question = Column(Text)
    source_type = Column(Text)
    rag_answer = Column(Text)
    source_chunks = Column(JSON)

    llm_label = Column(Text)
    llm_name = Column(Text)
    llm_is_remote = Column(Boolean)
TableSummaryFeedback

Bases: Base

Feedback for Summary.

Source code in docs/microservices/core/src/feedback.py
class TableSummaryFeedback(Base):
    """Feedback for Summary."""

    __tablename__ = "feedback_summary"

    id = Column(Integer, primary_key=True, autoincrement=True)
    timestamp = Column(DateTime(timezone=True), default=func.now())
    feedback = Column(Enum(FeedbackRating))
    feedback_text = Column(Text)

    source_type = Column(Text)
    text_length = Column(Integer)
    desired_summary_length = Column(Integer)
    summary_length = Column(Integer)

    llm_label = Column(Text)
    llm_name = Column(Text)
    llm_is_remote = Column(Boolean)
cleanup_userfeedback
cleanup_userfeedback(session, retention_days=90)

Delete feedback entries older than given number of days.

PARAMETER DESCRIPTION
session

A database session

TYPE: Session

retention_days

Number of days to retain entries. Entries older than this will be deleted. Defaults to 90.

TYPE: int DEFAULT: 90

RETURNS DESCRIPTION
bool

True if the commit was successful.

RAISES DESCRIPTION
SQLAlchemyError

Error if the commit fails.

Source code in docs/microservices/core/src/feedback.py
def cleanup_userfeedback(
    session: SessionDep,
    retention_days: int = 90,
) -> bool:
    """Delete feedback entries older than given number of days.

    Args:
        session (Session): A database session
        retention_days (int): Number of days to retain entries.
            Entries older than this will be deleted. Defaults to 90.

    Returns:
        True if the commit was successful.

    Raises:
        SQLAlchemyError: Error if the commit fails.
    """
    tables = [
        TableGeneralFeedback,
        TableChatFeedback,
        TableRAGFeedback,
        TableSummaryFeedback,
    ]

    date_threshold = datetime.now(tz=UTC) - timedelta(days=retention_days)

    for table in tables:
        result = session.query(table).filter(table.timestamp < date_threshold).delete()
        logger.info(
            f"Cleaned up user feedback in Table {table.__tablename__}: {result} datapoints deleted."
        )

    # commit changes or rollback if something went wrong
    try:
        session.commit()
        logger.debug("Old feedback entries successfully removed.")
        return True
    except SQLAlchemyError as e:
        session.rollback()
        logger.error(f"Failed to clean up feedback: {e}")
        raise
    finally:
        session.close()
create_db_session
create_db_session(settings)

Create the basic DB engine and session.

RETURNS DESCRIPTION
Session

A session factory for the feedback database

RAISES DESCRIPTION
SQLAlchemyError

If the connection to the database fails.

Source code in docs/microservices/core/src/feedback.py
def create_db_session(settings: Settings) -> Session:
    """Create the basic DB engine and session.

    Returns:
        A session factory for the feedback database

    Raises:
        SQLAlchemyError: If the connection to the database fails.
    """
    engine = create_engine(settings.feedback_db.url)

    # Check connection
    try:
        with engine.connect() as __:
            logger.debug("Connection to feedback-db OK")

    except SQLAlchemyError as e:
        logger.warning(f"Connection to feedback-db failed with error: {e}")
        raise e

    Base.metadata.create_all(engine)

    return sessionmaker(bind=engine)
get_db_session async
get_db_session(request)

A SQLAlchemy database session for the feedback database bound in a context manager.

Returns: An active SQLAlchemy session connected to the feedback database.

Source code in docs/microservices/core/src/feedback.py
async def get_db_session(request: Request) -> Session:
    """A SQLAlchemy database session for the feedback database bound in a context manager.

    Returns: An active SQLAlchemy session connected to the feedback database.
    """
    session = request.state.db_session

    with session.begin() as this_session:
        yield this_session
save_chat_feedback
save_chat_feedback(session, chat_feedback)

Store a chat feedback in the database.

This function creates a new record in the feedback_chat table. The timestamp field will be automatically set to the current timestamp by SQLAlchemy. The id field will be auto-incremented.

PARAMETER DESCRIPTION
session

A database session

TYPE: Session

chat_feedback

The feedback to be stored.

TYPE: ChatFeedback

RETURNS DESCRIPTION
bool

True if the commit was successful.

RAISES DESCRIPTION
SQLAlchemyError

Error if the commit fails.

Source code in docs/microservices/core/src/feedback.py
def save_chat_feedback(
    session: SessionDep,
    chat_feedback: ChatFeedback,
) -> bool:
    """Store a chat feedback in the database.

    This function creates a new record in the feedback_chat table.
    The timestamp field will be automatically set to the current timestamp by SQLAlchemy.
    The id field will be auto-incremented.

    Args:
        session (Session): A database session
        chat_feedback (ChatFeedback): The feedback to be stored.

    Returns:
        True if the commit was successful.

    Raises:
        SQLAlchemyError: Error if the commit fails.
    """
    # Create a new record; the timestamp will be set automatically by SQLAlchemy to the current timestamp
    # id will be auto-incremented
    new_entry = TableChatFeedback(
        feedback=chat_feedback.feedback,
        feedback_text=chat_feedback.feedback_text,
        user_prompt=chat_feedback.user_prompt,
        chat_answer=chat_feedback.chat_answer,
        llm_label=chat_feedback.llm.label,
        llm_name=chat_feedback.llm.name,
        llm_is_remote=chat_feedback.llm.is_remote,
    )

    # commit changes or rollback if something went wrong
    try:
        session.add(new_entry)
        session.commit()
        logger.debug("Feedback (chat) successfully saved in database")
        return True
    except SQLAlchemyError as e:
        session.rollback()
        logger.error(f"Failed to save feedback: {e}")
        raise
    finally:
        session.close()
save_general_feedback
save_general_feedback(session, general_feedback)

Store the general feedback in the database.

PARAMETER DESCRIPTION
session

A database session

TYPE: Session

general_feedback

The general feedback to be saved.

TYPE: GeneralFeedbackInput

RETURNS DESCRIPTION
bool

True if the commit was successful.

RAISES DESCRIPTION
SQLAlchemyError

Error if the commit fails.

Source code in docs/microservices/core/src/feedback.py
def save_general_feedback(
    session: SessionDep,
    general_feedback: GeneralFeedback,
) -> bool:
    """Store the general feedback in the database.

    Args:
        session (Session): A database session
        general_feedback (GeneralFeedbackInput): The general feedback to be saved.

    Returns:
        True if the commit was successful.

    Raises:
        SQLAlchemyError: Error if the commit fails.
    """
    new_entry = TableGeneralFeedback(
        feedback=general_feedback.feedback,
        feedback_text=general_feedback.feedback_text,
    )

    # commit changes or rollback if something went wrong
    try:
        session.add(new_entry)
        session.commit()
        logger.debug("Feedback (general) successfully saved in database")
        return True
    except SQLAlchemyError as e:
        session.rollback()
        logger.error(f"Failed to save feedback: {e}")
        raise
    finally:
        session.close()
save_rag_feedback
save_rag_feedback(session, rag_feedback)

Save RAG feedback in the database.

PARAMETER DESCRIPTION
session

A database session

TYPE: Session

rag_feedback

The feedback to be stored.

TYPE: RAGFeedback

RETURNS DESCRIPTION
bool

True if the commit was successful.

RAISES DESCRIPTION
SQLAlchemyError

Error if the commit fails.

Source code in docs/microservices/core/src/feedback.py
def save_rag_feedback(
    session: SessionDep,
    rag_feedback: RAGFeedback,
) -> bool:
    """Save RAG feedback in the database.

    Args:
        session (Session): A database session
        rag_feedback (RAGFeedback): The feedback to be stored.

    Returns:
        True if the commit was successful.

    Raises:
        SQLAlchemyError: Error if the commit fails.
    """
    new_entry = TableRAGFeedback(
        feedback=rag_feedback.feedback,
        feedback_text=rag_feedback.feedback_text,
        question=rag_feedback.question,
        rag_answer=rag_feedback.rag_answer,
        source_type=rag_feedback.source_type,
        source_chunks=json.dumps(rag_feedback.source_chunks),
        llm_label=rag_feedback.llm.label,
        llm_name=rag_feedback.llm.name,
        llm_is_remote=rag_feedback.llm.is_remote,
    )

    # commit changes or rollback if something went wrong
    try:
        session.add(new_entry)
        session.commit()
        logger.debug("Feedback (rag) successfully saved in database")
        return True
    except SQLAlchemyError as e:
        session.rollback()
        logger.error(f"Failed to save feedback: {e}")
        raise
    finally:
        session.close()
save_summary_feedback
save_summary_feedback(session, summary_feedback)

Store a summary feedback in the database.

This function creates a new record in the feedback_summary-table The timestamp field will be automatically set to the current timestamp by SQLAlchemy. The id field will be auto-incremented.

PARAMETER DESCRIPTION
session

A database session

TYPE: Session

summary_feedback

The feedback to be stored.

TYPE: SummaryFeedback

RETURNS DESCRIPTION
bool

True if the commit was successful.

RAISES DESCRIPTION
SQLAlchemyError

Error if the commit fails.

Source code in docs/microservices/core/src/feedback.py
def save_summary_feedback(
    session: SessionDep,
    summary_feedback: SummaryFeedback,
) -> bool:
    """Store a summary feedback in the database.

    This function creates a new record in the feedback_summary-table
    The timestamp field will be automatically set to the
    current timestamp by SQLAlchemy.
    The id field will be auto-incremented.

    Args:
        session (Session): A database session
        summary_feedback (SummaryFeedback): The feedback to be stored.

    Returns:
        True if the commit was successful.

    Raises:
        SQLAlchemyError: Error if the commit fails.
    """
    # Create a new record; the timestamp will be set automatically by SQLAlchemy to the current timestamp
    # id will be auto-incremented
    new_entry = TableSummaryFeedback(
        feedback=summary_feedback.feedback,
        feedback_text=summary_feedback.feedback_text,
        source_type=summary_feedback.source_type,
        text_length=summary_feedback.text_length,
        desired_summary_length=summary_feedback.desired_summary_length,
        summary_length=summary_feedback.summary_length,
        llm_label=summary_feedback.llm.label,
        llm_name=summary_feedback.llm.name,
        llm_is_remote=summary_feedback.llm.is_remote,
    )

    # commit changes or rollback if something went wrong
    try:
        session.add(new_entry)
        session.commit()
        logger.debug("Feedback (summary) successfully saved in database")
        return True
    except SQLAlchemyError as e:
        session.rollback()
        logger.error(f"Failed to save feedback: {e}")
        raise
    finally:
        session.close()

models

Data model classes for loading and validating API and configuration parameters.

MODULE DESCRIPTION
api_input

pydantic Models for API input parameters.

api_output

pydantic Models for API output parameters.

general

Load and check Settings from yml.

api_input

pydantic Models for API input parameters.

CLASS DESCRIPTION
ChatFeedback

Model defining the input of the feedback for the chat.

ChatInput

Model defining the input of a valid chat request.

ChatMessage

Message input model used to store the content of chat messages.

CheckTranscriptionExistsRequest

Request body for checking transcription status by job IDs.

CreateWordFilePayload

Input payload describing how to build a Word transcript file.

FeedbackLLM

Model validating metadata for the LLM used in feedback.

FeedbackRating

Enum class specifying possible feedback ratings.

GeneralFeedback

Model defining the structure of feedback for all services.

MetadataFilter

Model defining the structure of a metadata filter.

RAGFeedback

Model defining the input of the feedback for the RAG.

RAGInput

Model defining the input of a valid RAG request.

RestrictValuesForDBModel

Model validating string fields before database insertion.

SpeakerItem

Speaker mapping used when generating word files.

SummaryFeedback

Model defining the input of the feedback for the summary.

SummaryFileAPIInputParameters

Model defining the input parameters as part of a valid Summary request for file endpoint.

SummaryTextAPIInput

Model defining the input of a valid Summary request for text endpoint.

TranslateRequest

Request body for translating free text.

ChatFeedback

Bases: GeneralFeedback

Model defining the input of the feedback for the chat.

ATTRIBUTE DESCRIPTION
user_prompt

The original user prompt.

TYPE: str

chat_answer

The model’s answer being rated.

TYPE: str

llm

Information about the model used.

TYPE: FeedbackLLM

Source code in docs/microservices/core/src/models/api_input.py
class ChatFeedback(GeneralFeedback):
    """Model defining the input of the feedback for the chat.

    Attributes:
        user_prompt (str): The original user prompt.
        chat_answer (str): The model’s answer being rated.
        llm (FeedbackLLM): Information about the model used.
    """

    user_prompt: str
    chat_answer: str
    llm: FeedbackLLM
ChatInput

Bases: BaseModel

Model defining the input of a valid chat request.

ATTRIBUTE DESCRIPTION
new_message

The new user message to be processed.

TYPE: ChatMessage

chat_history

List of previous chat messages forming the conversation context.

TYPE: list[ChatMessage]

language_model

The identifier of the language model to use.

TYPE: str

request_timestamp

Timestamp of the request.

TYPE: int

Source code in docs/microservices/core/src/models/api_input.py
class ChatInput(BaseModel):
    """Model defining the input of a valid chat request.

    Attributes:
        new_message (ChatMessage): The new user message to be processed.
        chat_history (list[ChatMessage]): List of previous chat messages forming the conversation context.
        language_model (str): The identifier of the language model to use.
        request_timestamp (int): Timestamp of the request.
    """

    new_message: ChatMessage
    chat_history: list[ChatMessage] = []
    language_model: str
    request_timestamp: int

    @property
    def as_list(self) -> list[dict[str, str]]:
        """Transforms the chat history plus the new message into a list of dictionaries containing the role and message.

        Returns:
            Each dictionary contains keys 'role' and 'content'.
        """
        chat_history_list = [
            {"role": message.role, "content": message.content}
            for message in self.chat_history
        ]
        chat_history_list.append(
            {"role": self.new_message.role, "content": self.new_message.content}
        )
        return chat_history_list

    model_config = ConfigDict(
        json_schema_extra={
            "openapi_examples": {
                "standard": {
                    "summary": "Simple chat input",
                    "description": "Standard input with short chat history.",
                    "value": {
                        "new_message": {
                            "role": "user",
                            "content": "What's the weather like today?",
                        },
                        "chat_history": [
                            {"role": "user", "content": "Hi"},
                            {
                                "role": "assistant",
                                "content": "Hello! How can I help you today?",
                            },
                        ],
                        "language_model": "test_model_mock",
                        "request_timestamp": 1731252767,
                    },
                }
            }
        }
    )
as_list property
as_list

Transforms the chat history plus the new message into a list of dictionaries containing the role and message.

RETURNS DESCRIPTION
list[dict[str, str]]

Each dictionary contains keys 'role' and 'content'.

ChatMessage

Bases: BaseModel

Message input model used to store the content of chat messages.

ATTRIBUTE DESCRIPTION
content

The textual content of the message.

TYPE: str

role

The role of the message sender. Must be one of "system", "user", "assistant". Defaults to "user".

TYPE: str

Source code in docs/microservices/core/src/models/api_input.py
class ChatMessage(BaseModel):
    """Message input model used to store the content of chat messages.

    Attributes:
        content (str): The textual content of the message.
        role (str): The role of the message sender. Must be one of "system", "user", "assistant".
            Defaults to "user".
    """

    content: str
    role: Literal["user", "system", "assistant"] = "user"
CheckTranscriptionExistsRequest

Bases: RootModel[list[UUID]]

Request body for checking transcription status by job IDs.

ATTRIBUTE DESCRIPTION
root

A list of transcription job identifiers.

TYPE: list[UUID]

Source code in docs/microservices/core/src/models/api_input.py
class CheckTranscriptionExistsRequest(RootModel[list[UUID]]):
    """Request body for checking transcription status by job IDs.

    Attributes:
        root (list[UUID]): A list of transcription job identifiers.
    """

    model_config = ConfigDict(
        json_schema_extra={
            "openapi_examples": {
                "multiple_job_ids": {
                    "summary": "Check two job IDs",
                    "description": "Request a status check for multiple transcription jobs.",
                    "value": [
                        "123e4567-e89b-12d3-a456-426614174000",
                        "550e8400-e29b-41d4-a716-446655440000",
                    ],
                }
            }
        }
    )
CreateWordFilePayload

Bases: BaseModel

Input payload describing how to build a Word transcript file.

ATTRIBUTE DESCRIPTION
uuid

Unique identifier of the transcription job this Word export belongs to.

TYPE: UUID

speaker

Mapping of speaker IDs to their (possibly edited) names used in the document.

TYPE: list[SpeakerItem]

Source code in docs/microservices/core/src/models/api_input.py
class CreateWordFilePayload(BaseModel):
    """Input payload describing how to build a Word transcript file.

    Attributes:
        uuid (UUID): Unique identifier of the transcription job this Word export belongs to.
        speaker (list[SpeakerItem]): Mapping of speaker IDs to their (possibly edited) names used in the document.
    """

    uuid: UUID
    speaker: list[SpeakerItem] = Field(default_factory=list)

    model_config = ConfigDict(
        json_schema_extra={
            "openapi_examples": {
                "single_docx": {
                    "summary": "Single DOCX export",
                    "description": "Request a DOCX export for one transcription job including speaker mapping.",
                    "value": [
                        {
                            "uuid": "123e4567-e89b-12d3-a456-426614174000",
                            "speaker": [
                                {
                                    "speaker-name-ID": "SPEAKER_00",
                                    "speaker-name-edited": "Redner 1",
                                },
                                {
                                    "speaker-name-ID": "SPEAKER_01",
                                    "speaker-name-edited": "Rednerin 2",
                                },
                            ],
                        }
                    ],
                }
            }
        }
    )
FeedbackLLM

Bases: RestrictValuesForDBModel

Model validating metadata for the LLM used in feedback.

ATTRIBUTE DESCRIPTION
is_remote

Indicates whether the model was hosted remotely.

TYPE: bool

name

The internal name of the LLM (e.g. 'llama3_70b_cloud').

TYPE: str

label

The label shown to the user for the model.

TYPE: str

Source code in docs/microservices/core/src/models/api_input.py
class FeedbackLLM(RestrictValuesForDBModel):
    """Model validating metadata for the LLM used in feedback.

    Attributes:
        is_remote (bool): Indicates whether the model was hosted remotely.
        name (str): The internal name of the LLM (e.g. 'llama3_70b_cloud').
        label (str): The label shown to the user for the model.
    """

    is_remote: bool
    name: str
    label: str
FeedbackRating

Bases: StrEnum

Enum class specifying possible feedback ratings.

Source code in docs/microservices/core/src/models/api_input.py
class FeedbackRating(StrEnum):
    """Enum class specifying possible feedback ratings."""

    POSITIVE = "POSITIVE"
    NEUTRAL = "NEUTRAL"
    NEGATIVE = "NEGATIVE"
GeneralFeedback

Bases: RestrictValuesForDBModel

Model defining the structure of feedback for all services.

ATTRIBUTE DESCRIPTION
feedback

The general rating of the output.

TYPE: FeedbackRating

feedback_text

Optional free-text feedback from the user.

TYPE: str

Source code in docs/microservices/core/src/models/api_input.py
class GeneralFeedback(RestrictValuesForDBModel):
    """Model defining the structure of feedback for all services.

    Attributes:
        feedback (FeedbackRating): The general rating of the output.
        feedback_text (str): Optional free-text feedback from the user.
    """

    feedback: FeedbackRating
    feedback_text: str
MetadataFilter

Bases: BaseModel

Model defining the structure of a metadata filter.

ATTRIBUTE DESCRIPTION
source

The data source to filter on.

TYPE: str | None

start_date

The earliest date for filtering documents.

TYPE: date | None

end_date

The latest date for filtering documents.

TYPE: date | None

METHOD DESCRIPTION
serialize_date

Ensure dates are serialized into ISO format.

Source code in docs/microservices/core/src/models/api_input.py
class MetadataFilter(BaseModel):
    """Model defining the structure of a metadata filter.

    Attributes:
        source (str | None): The data source to filter on.
        start_date (datetime.date | None): The earliest date for filtering documents.
        end_date (datetime.date | None): The latest date for filtering documents.
    """

    source: str | None = None
    start_date: datetime.date | None = None
    end_date: datetime.date | None = None

    @field_serializer("start_date", "end_date")
    def serialize_date(self, value: datetime.date) -> str | None:
        """Ensure dates are serialized into ISO format."""
        return value.isoformat() if value else None
serialize_date
serialize_date(value)

Ensure dates are serialized into ISO format.

Source code in docs/microservices/core/src/models/api_input.py
@field_serializer("start_date", "end_date")
def serialize_date(self, value: datetime.date) -> str | None:
    """Ensure dates are serialized into ISO format."""
    return value.isoformat() if value else None
RAGFeedback

Bases: GeneralFeedback

Model defining the input of the feedback for the RAG.

ATTRIBUTE DESCRIPTION
question

The question asked by the user.

TYPE: str

source_type

The data source type.

TYPE: str

rag_answer

The generated answer provided by the RAG system.

TYPE: str

source_chunks

The document chunks used to generate the answer.

TYPE: list[dict]

llm

Information about the model used.

TYPE: FeedbackLLM

Source code in docs/microservices/core/src/models/api_input.py
class RAGFeedback(GeneralFeedback):
    """Model defining the input of the feedback for the RAG.

    Attributes:
        question (str): The question asked by the user.
        source_type (str): The data source type.
        rag_answer (str): The generated answer provided by the RAG system.
        source_chunks (list[dict]): The document chunks used to generate the answer.
        llm (FeedbackLLM): Information about the model used.
    """

    question: str
    source_type: str
    rag_answer: str
    source_chunks: list[dict]
    llm: FeedbackLLM
RAGInput

Bases: BaseModel

Model defining the input of a valid RAG request.

ATTRIBUTE DESCRIPTION
question

The user question to be answered using retrieved sources.

TYPE: str

meta_data_filters

List of metadata filters.

TYPE: list[MetadataFilter] | None

language_model

The identifier of the language model to use.

TYPE: str

request_timestamp

Timestamp of the request.

TYPE: int

max_chunks_to_use

Optional limit for the number of retrieved chunks used in generation.

TYPE: int | None

Source code in docs/microservices/core/src/models/api_input.py
class RAGInput(BaseModel):
    """Model defining the input of a valid RAG request.

    Attributes:
        question (str): The user question to be answered using retrieved sources.
        meta_data_filters (list[MetadataFilter] | None): List of metadata filters.
        language_model (str): The identifier of the language model to use.
        request_timestamp (int): Timestamp of the request.
        max_chunks_to_use (int | None): Optional limit for the number of retrieved chunks used in generation.
    """

    question: str
    #: The Qa Input contains a list of filters where each filter filters per source
    #: Hence, a document needs to fulfill the requirements of at least one of those filters
    #: In other words: The filters are connected with OR statements.
    #: If no filters are given (empty list or None), then the whole database is used
    meta_data_filters: list[MetadataFilter] | None = None
    language_model: str
    request_timestamp: int
    max_chunks_to_use: int | None = None

    model_config = ConfigDict(
        json_schema_extra={
            "openapi_examples": {
                "simple": {
                    "summary": "Simple RAG input",
                    "description": "A minimal example of a RAG request with a question and default model, no filters.",
                    "value": {
                        "question": "What did the political parties decide regarding the pension system?",
                        "meta_data_filters": [],
                        "language_model": "test_model_mock",
                        "request_timestamp": 1731252767,
                        "max_chunks_to_use": None,
                    },
                },
                "with_filters": {
                    "summary": "RAG input with a single metadata filter",
                    "description": "Example of a RAG request that applies a metadata filter to restrict the sources.",
                    "value": {
                        "question": "Between which parties was the coalition agreement concluded?",
                        "meta_data_filters": [
                            {"source": "Coalition Agreement"},
                        ],
                        "language_model": "test_model_mock",
                        "request_timestamp": 1731252767,
                        "max_chunks_to_use": 5,
                    },
                },
            }
        }
    )
RestrictValuesForDBModel

Bases: BaseModel

Model validating string fields before database insertion.

METHOD DESCRIPTION
validate_values

Checks input values for null bytes and strips whitespace.

Source code in docs/microservices/core/src/models/api_input.py
class RestrictValuesForDBModel(BaseModel):
    """Model validating string fields before database insertion."""

    @field_validator("*", mode="before")
    def validate_values(cls, v: object) -> object:  # noqa: N805
        """Checks input values for null bytes and strips whitespace."""
        if isinstance(v, str):
            if "\x00" in v:
                raise ValueError("String contains null bytes, not allowed.")
            v = v.strip()
        return v
validate_values
validate_values(v)

Checks input values for null bytes and strips whitespace.

Source code in docs/microservices/core/src/models/api_input.py
@field_validator("*", mode="before")
def validate_values(cls, v: object) -> object:  # noqa: N805
    """Checks input values for null bytes and strips whitespace."""
    if isinstance(v, str):
        if "\x00" in v:
            raise ValueError("String contains null bytes, not allowed.")
        v = v.strip()
    return v
SpeakerItem

Bases: BaseModel

Speaker mapping used when generating word files.

ATTRIBUTE DESCRIPTION
speaker_name_edited

Human-readable speaker name that may be corrected/edited for output.

TYPE: str

speaker_name_id

Stable identifier used to map transcript segments to a speaker.

TYPE: str

Source code in docs/microservices/core/src/models/api_input.py
class SpeakerItem(BaseModel):
    """Speaker mapping used when generating word files.

    Attributes:
        speaker_name_edited (str): Human-readable speaker name that may be corrected/edited for output.
        speaker_name_id (str): Stable identifier used to map transcript segments to a speaker.
    """

    speaker_name_edited: str = Field("", alias="speaker-name-edited")
    speaker_name_id: str = Field(..., alias="speaker-name-ID")
SummaryFeedback

Bases: GeneralFeedback

Model defining the input of the feedback for the summary.

ATTRIBUTE DESCRIPTION
source_type

Indicates the origin of the summarized text.

TYPE: str

text_length

The number of characters in the parsed input text.

TYPE: conint

desired_summary_length

Desired length of the summary in characters.

TYPE: confloat

summary_length

Actual number of characters in the generated summary.

TYPE: conint

llm

Information about the model used.

TYPE: FeedbackLLM

Source code in docs/microservices/core/src/models/api_input.py
class SummaryFeedback(GeneralFeedback):
    """Model defining the input of the feedback for the summary.

    Attributes:
        source_type (str): Indicates the origin of the summarized text.
        text_length (conint): The number of characters in the parsed input text.
        desired_summary_length (confloat): Desired length of the summary in characters.
        summary_length (conint): Actual number of characters in the generated summary.
        llm (FeedbackLLM): Information about the model used.
    """

    source_type: str

    # len(parsing_output) aus Summary Output
    text_length: conint(ge=0, le=MAX_DB_FLOAT)

    # output_length aus Summary Input ist in pages daher * 4000, um Anzahl der Zeichen zu bekommen
    desired_summary_length: confloat(ge=0, le=MAX_DB_FLOAT)

    # len(summary_length) aus Summary Output
    summary_length: conint(ge=0, le=MAX_DB_FLOAT)
    llm: FeedbackLLM
SummaryFileAPIInputParameters

Bases: BaseModel

Model defining the input parameters as part of a valid Summary request for file endpoint.

Due to technical reasons "file (Uploadfile)" can not be part of this pydantic model

even though it is part of the file endpoints input.

ATTRIBUTE DESCRIPTION
language_model

The name or identifier of the language model to use.

TYPE: str

output_length

Desired length of summary output as number of DIN-A4 pages. Default is 0, which will lead to no summary length restrictions. This is the fastest option since the LLM will decide, which length is the most suitable one.

TYPE: NonNegativeFloat

topics

User input text stating the topics the summary should focus on. Default is None, which will lead to a general summary without any focus topics.

TYPE: str | None

METHOD DESCRIPTION
as_form

Creates an instance of SummaryFileAPIInputParameters from multipart form-data.

Source code in docs/microservices/core/src/models/api_input.py
class SummaryFileAPIInputParameters(BaseModel):
    """Model defining the input parameters as part of a valid Summary request for file endpoint.

    Note: Due to technical reasons "file (Uploadfile)" can not be part of this pydantic model
          even though it is part of the file endpoints input.

    Attributes:
        language_model (str): The name or identifier of the language model to use.
        output_length (NonNegativeFloat, optional): Desired length of summary output as number of DIN-A4 pages.
            Default is 0, which will lead to no summary length restrictions. This is the fastest option since
            the LLM will decide, which length is the most suitable one.
        topics (str | None): User input text stating the topics the summary should focus on.
            Default is None, which will lead to a general summary without any focus topics.
    """

    language_model: str
    output_length: NonNegativeFloat = 0
    topics: str | None = None

    model_config = ConfigDict(
        json_schema_extra={
            "openapi_examples": {
                "simple_summary": {
                    "summary": "Simple summary request",
                    "description": (
                        "Example input for a summary without predefined output length or focus topic settings."
                    ),
                    "value": {
                        "file": "tests/data/txt-testfile.txt",
                        "language_model": "test_model_mock",
                    },
                },
                "fast_summary": {
                    "summary": "Fast summary request",
                    "description": (
                        "Example input for a fast summary. Using a txt file without predefined output length but "
                        "with focus topic setting."
                    ),
                    "value": {
                        "file": "tests/data/txt-testfile.txt",
                        "language_model": "test_model_mock",
                        "topics": "Städte",
                    },
                },
                "output_length_summary": {
                    "summary": "Summary request with output length",
                    "description": "Example input for a summary with predefined output length.",
                    "value": {
                        "file": "tests/data/txt-testfile.txt",
                        "language_model": "test_model_mock",
                        "output_length": 2.5,
                    },
                },
                "focus_topic_summary": {
                    "summary": "Summary request with focus topic",
                    "description": "Example input for a summary with focus on a specific topic.",
                    "value": {
                        "file": "tests/data/pdf-testfile.pdf",
                        "language_model": "test_model_mock",
                        "topics": "Open Source, Community-Gedanke",
                    },
                },
            }
        }
    )

    @classmethod
    def as_form(
        cls,
        language_model: str = Form(
            ...,
            description="The name or identifier of the language model to use.",
            example="test_model_mock",
        ),
        output_length: float = Form(
            0,
            description=(
                "Desired summary length in DIN-A4 pages. "
                "Default 0 = no restriction and high performance."
            ),
            example=3.0,
            ge=0,
        ),
        topics: str | None = Form(
            None,
            description=(
                "Comma-separated topics the summary should focus on. "
                "Default None = general summary without any focus."
            ),
            example="public administration, artificial intelligence, digitization",
        ),
    ) -> Self:
        """Creates an instance of `SummaryFileAPIInputParameters` from multipart form-data.

        FastAPI does not automatically map Pydantic models from form-data in
        multipart requests. This helper enables the model to be used together
        with file uploads by defining how form parameters should be parsed.

        Args:
            language_model (str): Selected language model.
            output_length (float): Desired summary length in pages.
            topics (str | None): User-defined focus topics.

        Returns:
            A validated input parameter set.
        """
        return cls(
            language_model=language_model,
            output_length=output_length,
            topics=topics,
        )
as_form classmethod
as_form(language_model=Form(..., description='The name or identifier of the language model to use.', example='test_model_mock'), output_length=Form(0, description='Desired summary length in DIN-A4 pages. Default 0 = no restriction and high performance.', example=3.0, ge=0), topics=Form(None, description='Comma-separated topics the summary should focus on. Default None = general summary without any focus.', example='public administration, artificial intelligence, digitization'))

Creates an instance of SummaryFileAPIInputParameters from multipart form-data.

FastAPI does not automatically map Pydantic models from form-data in multipart requests. This helper enables the model to be used together with file uploads by defining how form parameters should be parsed.

PARAMETER DESCRIPTION
language_model

Selected language model.

TYPE: str DEFAULT: Form(..., description='The name or identifier of the language model to use.', example='test_model_mock')

output_length

Desired summary length in pages.

TYPE: float DEFAULT: Form(0, description='Desired summary length in DIN-A4 pages. Default 0 = no restriction and high performance.', example=3.0, ge=0)

topics

User-defined focus topics.

TYPE: str | None DEFAULT: Form(None, description='Comma-separated topics the summary should focus on. Default None = general summary without any focus.', example='public administration, artificial intelligence, digitization')

RETURNS DESCRIPTION
Self

A validated input parameter set.

Source code in docs/microservices/core/src/models/api_input.py
@classmethod
def as_form(
    cls,
    language_model: str = Form(
        ...,
        description="The name or identifier of the language model to use.",
        example="test_model_mock",
    ),
    output_length: float = Form(
        0,
        description=(
            "Desired summary length in DIN-A4 pages. "
            "Default 0 = no restriction and high performance."
        ),
        example=3.0,
        ge=0,
    ),
    topics: str | None = Form(
        None,
        description=(
            "Comma-separated topics the summary should focus on. "
            "Default None = general summary without any focus."
        ),
        example="public administration, artificial intelligence, digitization",
    ),
) -> Self:
    """Creates an instance of `SummaryFileAPIInputParameters` from multipart form-data.

    FastAPI does not automatically map Pydantic models from form-data in
    multipart requests. This helper enables the model to be used together
    with file uploads by defining how form parameters should be parsed.

    Args:
        language_model (str): Selected language model.
        output_length (float): Desired summary length in pages.
        topics (str | None): User-defined focus topics.

    Returns:
        A validated input parameter set.
    """
    return cls(
        language_model=language_model,
        output_length=output_length,
        topics=topics,
    )
SummaryTextAPIInput

Bases: BaseModel

Model defining the input of a valid Summary request for text endpoint.

ATTRIBUTE DESCRIPTION
text

The text content to be summarized.

TYPE: str

language_model

The name or identifier of the language model to use.

TYPE: str

output_length

Desired length of summary output as number of DIN-A4 pages. Default is 0, which will lead to no summary length restrictions. This is the fastest option since the LLM will decide, which length is the most suitable one.

TYPE: NonNegativeFloat

topics

User input text stating the topics the summary should focus on. Default is None, which will lead to a general summary without any focus topics.

TYPE: str | None

Source code in docs/microservices/core/src/models/api_input.py
class SummaryTextAPIInput(BaseModel):
    """Model defining the input of a valid Summary request for text endpoint.

    Attributes:
        text (str): The text content to be summarized.

        language_model (str): The name or identifier of the language model to use.
        output_length (NonNegativeFloat, optional): Desired length of summary output as number of DIN-A4 pages.
            Default is 0, which will lead to no summary length restrictions. This is the fastest option since
            the LLM will decide, which length is the most suitable one.
        topics (str | None): User input text stating the topics the summary should focus on.
            Default is None, which will lead to a general summary without any focus topics.
    """

    text: str

    language_model: str
    output_length: NonNegativeFloat = 0
    topics: str | None = None

    model_config = ConfigDict(
        json_schema_extra={
            "openapi_examples": {
                "simple_summary": {
                    "summary": "Simple summary request",
                    "description": (
                        "Example input for a summary without predefined output length or focus topic settings."
                    ),
                    "value": {
                        "text": (
                            "F13 ist eine vollständig souveräne und Modell-agnostische KI-Assistenz, die auf eigener "
                            "Infrastruktur betrieben werden kann und von der Verwaltung für die Verwaltung entwickelt "
                            "wurde. F13 stellt eine Vielzahl modularer, flexibel einsetzbarer KI-Funktionen bereit – "
                            "zum Verarbeiten von Informationen, Erstellen von Texten, oder zum Recherchieren. Nutzende "
                            "können mit F13 gezielt, wirksam und sicher Generative Künstliche Intelligenz bei "
                            "täglichen Büroarbeiten einsetzen. Mit dem Übergang in eine Open-Source-Weiterentwicklung "
                            "wird F13 seit Juli 2025 als gemeinschaftliches Vorhaben geöffnet: Der Quellcode steht "
                            "frei zur Verfügung, die Nutzung ist offen. Interessierte Verwaltungen, Organisationen "
                            "sowie Entwicklerinnen und Entwickler sind eingela-den, das System für eigene Bedarfe "
                            "anzupassen, weiterzuentwickeln und diese Entwicklungen wiederum Open Source "
                            "bereitzustellen."
                        ),
                        "language_model": "test_model_mock",
                    },
                },
                "fast_summary": {
                    "summary": "Fast summary request",
                    "description": (
                        "Example input for a fast summary without predefined output length but with focus topic "
                        "setting."
                    ),
                    "value": {
                        "text": (
                            "F13 ist eine vollständig souveräne und Modell-agnostische KI-Assistenz, die auf eigener "
                            "Infrastruktur betrieben werden kann und von der Verwaltung für die Verwaltung entwickelt "
                            "wurde. F13 stellt eine Vielzahl modularer, flexibel einsetzbarer KI-Funktionen bereit – "
                            "zum Verarbeiten von Informationen, Erstellen von Texten, oder zum Recherchieren. Nutzende "
                            "können mit F13 gezielt, wirksam und sicher Generative Künstliche Intelligenz bei "
                            "täglichen Büroarbeiten einsetzen. Mit dem Übergang in eine Open-Source-Weiterentwicklung "
                            "wird F13 seit Juli 2025 als gemeinschaftliches Vorhaben geöffnet: Der Quellcode steht "
                            "frei zur Verfügung, die Nutzung ist offen. Interessierte Verwaltungen, Organisationen "
                            "sowie Entwicklerinnen und Entwickler sind eingela-den, das System für eigene Bedarfe "
                            "anzupassen, weiterzuentwickeln und diese Entwicklungen wiederum Open Source "
                            "bereitzustellen."
                        ),
                        "language_model": "test_model_mock",
                        "topics": "KI, Papier",
                    },
                },
                "output_length_summary": {
                    "summary": "Summary request with output length",
                    "description": "Example input for a summary with predefined output length",
                    "value": {
                        "text": (
                            "F13 ist eine vollständig souveräne und Modell-agnostische KI-Assistenz, die auf eigener "
                            "Infrastruktur betrieben werden kann und von der Verwaltung für die Verwaltung entwickelt "
                            "wurde. F13 stellt eine Vielzahl modularer, flexibel einsetzbarer KI-Funktionen bereit – "
                            "zum Verarbeiten von Informationen, Erstellen von Texten, oder zum Recherchieren. Nutzende "
                            "können mit F13 gezielt, wirksam und sicher Generative Künstliche Intelligenz bei "
                            "täglichen Büroarbeiten einsetzen. Mit dem Übergang in eine Open-Source-Weiterentwicklung "
                            "wird F13 seit Juli 2025 als gemeinschaftliches Vorhaben geöffnet: Der Quellcode steht "
                            "frei zur Verfügung, die Nutzung ist offen. Interessierte Verwaltungen, Organisationen "
                            "sowie Entwicklerinnen und Entwickler sind eingela-den, das System für eigene Bedarfe "
                            "anzupassen, weiterzuentwickeln und diese Entwicklungen wiederum Open Source "
                            "bereitzustellen."
                        ),
                        "output_length": 2.5,
                        "language_model": "test_model_mock",
                    },
                },
                "focus_topic_summary": {
                    "summary": "Summary request with focus topic",
                    "description": "Example input for a summary with focus on a specific topic.",
                    "value": {
                        "text": (
                            "F13 ist eine vollständig souveräne und Modell-agnostische KI-Assistenz, die auf eigener "
                            "Infrastruktur betrieben werden kann und von der Verwaltung für die Verwaltung entwickelt "
                            "wurde. F13 stellt eine Vielzahl modularer, flexibel einsetzbarer KI-Funktionen bereit – "
                            "zum Verarbeiten von Informationen, Erstellen von Texten, oder zum Recherchieren. Nutzende "
                            "können mit F13 gezielt, wirksam und sicher Generative Künstliche Intelligenz bei "
                            "täglichen Büroarbeiten einsetzen. Mit dem Übergang in eine Open-Source-Weiterentwicklung "
                            "wird F13 seit Juli 2025 als gemeinschaftliches Vorhaben geöffnet: Der Quellcode steht "
                            "frei zur Verfügung, die Nutzung ist offen. Interessierte Verwaltungen, Organisationen "
                            "sowie Entwicklerinnen und Entwickler sind eingeladen, das System für eigene Bedarfe "
                            "anzupassen, weiterzuentwickeln und diese Entwicklungen wiederum Open Source "
                            "bereitzustellen."
                        ),
                        "topics": "Open Source, Community-Gedanke",
                        "language_model": "test_model_mock",
                    },
                },
            }
        }
    )
TranslateRequest

Bases: BaseModel

Request body for translating free text.

ATTRIBUTE DESCRIPTION
text

Raw text that should be translated to the configured target language.

TYPE: str

Source code in docs/microservices/core/src/models/api_input.py
class TranslateRequest(BaseModel):
    """Request body for translating free text.

    Attributes:
        text (str): Raw text that should be translated to the configured target language.
    """

    text: str

    model_config = ConfigDict(
        json_schema_extra={
            "openapi_examples": {
                "translate_to_german": {
                    "summary": "Translate a short text to German",
                    "description": "Minimal request translating free text to the target language (German).",
                    "value": {
                        "text": (
                            "Hello everyone, welcome to our weekly status meeting. "
                            "Please summarize the action items."
                        ),
                    },
                }
            }
        }
    )
api_output

pydantic Models for API output parameters.

CLASS DESCRIPTION
ChatOutput

Chat response model of chat output.

ChatStreamOutput

Chat stream response model of chat output.

InnerTranscriptionReceiveResponse

Inner model for receiving transcription output.

MatchedTermItem

Matched term entry with meaning and occurrence count.

RAGDatabaseSources

Model defining Database-Sources.

RAGOutput

RAG response model defining RAG output.

RAGSourceDocument

Model defining source object of RAG output.

SummaryAPIOutput

Summary response output of summary generation.

TranscriptionBaseResponse

Base model for transcription responses containing a job ID.

TranscriptionDialogItem

Dialog item with speaker metadata, timestamp, and parts.

TranscriptionDialogPart

Single dialog part containing the spoken text and its score.

TranscriptionReceiveResponse

Response model for receiving transcription output.

TranscriptionStatusResponse

Response model for the transcription existence check endpoint.

TranslateAPIOutput

Output of translation API.

UpsertTermsAPIOutput

Output of upsert term API.

ChatOutput

Bases: BaseModel

Chat response model of chat output.

ATTRIBUTE DESCRIPTION
response

The generated chat response.

TYPE: str

reason

Optional reasoning or explanation for the response.

TYPE: str | None

Source code in docs/microservices/core/src/models/api_output.py
class ChatOutput(BaseModel):
    """Chat response model of chat output.

    Attributes:
        response (str): The generated chat response.
        reason (str | None): Optional reasoning or explanation for the response.
    """

    response: str
    reason: str | None = None

    model_config = ConfigDict(
        json_schema_extra={
            "openapi_examples": {
                "simple": {
                    "summary": "Simple chat response",
                    "description": "Used for models that produce a simple response without reasoning.",
                    "value": {
                        "response": "The weather is nice today.",
                    },
                },
                "reasoning": {
                    "summary": "Response with reasoning",
                    "description": "Used for reasoning-enabled models, showing both the answer and its explanation.",
                    "value": {
                        "response": "The weather is nice today.",
                        "reason": "It is sunny outside.",
                    },
                },
            }
        }
    )
ChatStreamOutput

Bases: BaseModel

Chat stream response model of chat output.

ATTRIBUTE DESCRIPTION
type

The kind of output. One of 'reason', 'response', or 'error'.

TYPE: str

content

Partial text content of the stream if type != error.

TYPE: str | None

finish_reason

Optional finish reason from the model.

TYPE: str | None

error

Error message if type == 'error'.

TYPE: str | None

Source code in docs/microservices/core/src/models/api_output.py
class ChatStreamOutput(BaseModel):
    """Chat stream response model of chat output.

    Attributes:
        type (str): The kind of output. One of 'reason', 'response', or 'error'.
        content (str | None): Partial text content of the stream if type != error.
        finish_reason (str | None): Optional finish reason from the model.
        error (str | None): Error message if type == 'error'.
    """

    type: Literal["reason", "response", "error"]
    content: str | None = None
    finish_reason: str | None = None
    error: str | None = None

    model_config = ConfigDict(
        json_schema_extra={
            "openapi_examples": {
                "response": {
                    "summary": "Chat response",
                    "description": "Standard response returned by the chat model when a normal message is processed.",
                    "value": {
                        "type": "response",
                        "content": "Hello, how can I help you today?",
                        "finish_reason": None,
                    },
                },
                "reason": {
                    "summary": "Reason output",
                    "description": "Response including the reasoning or explanation of the model's output.",
                    "value": {
                        "type": "reason",
                        "content": "User said hello. I will answer politely.",
                        "finish_reason": None,
                    },
                },
                "error": {
                    "summary": "Error output",
                    "description": (
                        "Output returned when an error occurs, ",
                        "e.g., an internal server error during processing.",
                    ),
                    "value": {
                        "type": "error",
                        "error": "Internal Error during streaming.",
                    },
                },
            }
        }
    )
InnerTranscriptionReceiveResponse

Bases: TranscriptionBaseResponse

Inner model for receiving transcription output.

ATTRIBUTE DESCRIPTION
job_id

Unique identifier for the transcription job (inherited)

TYPE: UUID

dialog

The transcription data with speakers, timestamps, and parts

TYPE: list[TranscriptionDialogItem]

translation

The translation or None if not available

TYPE: str | None

matched_terms

The matched special terms

TYPE: dict[str, list[MatchedTermItem]] | None

Source code in docs/microservices/core/src/models/api_output.py
class InnerTranscriptionReceiveResponse(TranscriptionBaseResponse):
    """Inner model for receiving transcription output.

    Attributes:
        job_id (UUID): Unique identifier for the transcription job (inherited)
        dialog (list[TranscriptionDialogItem]): The transcription data with speakers, timestamps, and parts
        translation (str | None): The translation or None if not available
        matched_terms (dict[str, list[MatchedTermItem]] | None): The matched special terms
    """

    dialog: list[TranscriptionDialogItem] = Field(
        ...,
        description="The transcription data with speakers, timestamps, and parts",
    )
    translation: str | None = Field(
        None, description="The translation or None if not available"
    )
    matched_terms: dict[str, list[MatchedTermItem]] | None = Field(
        None, description="The matched special terms"
    )
MatchedTermItem

Bases: BaseModel

Matched term entry with meaning and occurrence count.

ATTRIBUTE DESCRIPTION
fachbegriff

The special term.

TYPE: str

bedeutung

The meaning of the special term.

TYPE: str

count

The number of occurrences of the term.

TYPE: int

Source code in docs/microservices/core/src/models/api_output.py
class MatchedTermItem(BaseModel):
    """Matched term entry with meaning and occurrence count.

    Attributes:
        fachbegriff (str): The special term.
        bedeutung (str): The meaning of the special term.
        count (int): The number of occurrences of the term.
    """

    fachbegriff: str = Field(..., alias="Fachbegriff")
    bedeutung: str = Field(..., alias="Bedeutung")
    count: int

    model_config = ConfigDict(populate_by_name=True)
RAGDatabaseSources

Bases: BaseModel

Model defining Database-Sources.

ATTRIBUTE DESCRIPTION
name

Name of the database source.

TYPE: str

date_filter

Indicates whether date-based filtering is applied.

TYPE: bool

Source code in docs/microservices/core/src/models/api_output.py
class RAGDatabaseSources(BaseModel):
    """Model defining Database-Sources.

    Attributes:
        name (str): Name of the database source.
        date_filter (bool): Indicates whether date-based filtering is applied.
    """

    name: str
    date_filter: bool

    model_config = ConfigDict(
        json_schema_extra={
            "openapi_examples": {
                "standard": {
                    "summary": "Standard database source",
                    "description": "A typical database source with date-based filtering enabled.",
                    "value": {
                        "name": "Coalition Agreement",
                        "date_filter": False,
                    },
                },
            }
        }
    )
RAGOutput

Bases: BaseModel

RAG response model defining RAG output.

ATTRIBUTE DESCRIPTION
question

The user question that triggered the retrieval.

TYPE: str

answer

The generated answer based on the retrieved sources.

TYPE: str

sources

List of source documents used in the response.

TYPE: list[RAGSourceDocument]

Source code in docs/microservices/core/src/models/api_output.py
class RAGOutput(BaseModel):
    """RAG response model defining RAG output.

    Attributes:
        question (str): The user question that triggered the retrieval.
        answer (str): The generated answer based on the retrieved sources.
        sources (list[RAGSourceDocument]): List of source documents used in the response.
    """

    question: str
    answer: str
    sources: list[RAGSourceDocument]

    model_config = ConfigDict(
        json_schema_extra={
            "openapi_examples": {
                "simple": {
                    "summary": "RAG output example",
                    "description": (
                        "Example of a RAG response including the user question, ",
                        "answer and retrieved sources.",
                    ),
                    "value": {
                        "question": "What did the political parties decide regarding the pension system?",
                        "answer": "The parties decided to improve the pension system.",
                        "sources": [
                            {
                                "content": "We decided to improve the pension system.",
                                "meta": {
                                    "source": "Coalition Agreement",
                                    "date": "2025-11-07",
                                },
                                "url": "https://example.com/coalitionagreement.pdf",
                            }
                        ],
                    },
                }
            }
        }
    )
RAGSourceDocument

Bases: BaseModel

Model defining source object of RAG output.

ATTRIBUTE DESCRIPTION
content

The text content of the retrieved document.

TYPE: str

meta

Metadata about the document (e.g. title, date, source type).

TYPE: dict

url

Optional URL pointing to the original document.

TYPE: str | None

Source code in docs/microservices/core/src/models/api_output.py
class RAGSourceDocument(BaseModel):
    """Model defining source object of RAG output.

    Attributes:
        content (str): The text content of the retrieved document.
        meta (dict): Metadata about the document (e.g. title, date, source type).
        url (str | None): Optional URL pointing to the original document.
    """

    content: str
    meta: dict
    url: str | None = None
SummaryAPIOutput

Bases: BaseModel

Summary response output of summary generation.

ATTRIBUTE DESCRIPTION
summary

The generated summary text.

TYPE: str

parsed_text

The parsed and preprocessed source text used for summarization.

TYPE: str

warning_msg

Optional warning message (e.g. about text length of the generated summary).

TYPE: str

Source code in docs/microservices/core/src/models/api_output.py
class SummaryAPIOutput(BaseModel):
    """Summary response output of summary generation.

    Attributes:
        summary (str): The generated summary text.
        parsed_text (str): The parsed and preprocessed source text used for summarization.
        warning_msg (str): Optional warning message (e.g. about text length of the generated summary).
    """

    summary: str
    parsed_text: str
    warning_msg: str

    model_config = ConfigDict(
        json_schema_extra={
            "openapi_examples": {
                "summary_output": {
                    "summary": "Summary output with warning",
                    "description": "Example showing a summary with a warning message.",
                    "value": {
                        "summary": "This is the generated summary of the document.",
                        "parsed_text": "Original source text preprocessed for summarization.",
                        "warning_msg": "This is a message to the user encompassing hints or warnings.",
                    },
                },
            }
        }
    )
TranscriptionBaseResponse

Bases: BaseModel

Base model for transcription responses containing a job ID.

ATTRIBUTE DESCRIPTION
job_id

Unique identifier for the job

TYPE: UUID

Source code in docs/microservices/core/src/models/api_output.py
class TranscriptionBaseResponse(BaseModel):
    """Base model for transcription responses containing a job ID.

    Attributes:
        job_id (UUID): Unique identifier for the job
    """

    job_id: UUID

    model_config = ConfigDict(
        json_schema_extra={
            "openapi_examples": {
                "standard": {
                    "summary": "Simple upload output.",
                    "description": "Example of a standard upload response.",
                    "value": {
                        "job_id": "123e4567-e89b-12d3-a456-426614174000",
                    },
                },
            }
        }
    )
TranscriptionDialogItem

Bases: BaseModel

Dialog item with speaker metadata, timestamp, and parts.

ATTRIBUTE DESCRIPTION
speaker_name

The name of the speaker.

TYPE: str

time_stamp

The timestamp of the dialog item.

TYPE: str

part

The parts of the dialog item.

TYPE: list[TranscriptionDialogPart]

Source code in docs/microservices/core/src/models/api_output.py
class TranscriptionDialogItem(BaseModel):
    """Dialog item with speaker metadata, timestamp, and parts.

    Attributes:
        speaker_name (str): The name of the speaker.
        time_stamp (str): The timestamp of the dialog item.
        part (list[TranscriptionDialogPart]): The parts of the dialog item.
    """

    speaker_name: str = Field(..., alias="speaker-name")
    time_stamp: str = Field(..., alias="time-stamp")
    part: list[TranscriptionDialogPart]

    model_config = ConfigDict(populate_by_name=True)
TranscriptionDialogPart

Bases: BaseModel

Single dialog part containing the spoken text and its score.

ATTRIBUTE DESCRIPTION
text

The spoken text of this part.

TYPE: str

score

The confidence score of the transcription for this part.

TYPE: int

Source code in docs/microservices/core/src/models/api_output.py
class TranscriptionDialogPart(BaseModel):
    """Single dialog part containing the spoken text and its score.

    Attributes:
        text (str): The spoken text of this part.
        score (int): The confidence score of the transcription for this part.
    """

    text: str
    score: int
TranscriptionReceiveResponse

Bases: BaseModel

Response model for receiving transcription output.

ATTRIBUTE DESCRIPTION
transcript

The transcription payload.

TYPE: InnerTranscriptionReceiveResponse

Source code in docs/microservices/core/src/models/api_output.py
class TranscriptionReceiveResponse(BaseModel):
    """Response model for receiving transcription output.

    Attributes:
        transcript (InnerTranscriptionReceiveResponse): The transcription payload.
    """

    transcript: InnerTranscriptionReceiveResponse

    model_config = ConfigDict(
        json_schema_extra={
            "openapi_examples": {
                "standard": {
                    "summary": "Simple receive transcription response.",
                    "description": "Example of a standard receive transcription response.",
                    "value": {
                        "transcript": {
                            "job_id": "Loremui32423",
                            "dialog": [
                                {
                                    "speaker-name": "",
                                    "time-stamp": "0:00-0:00",
                                    "part": [
                                        {
                                            "text": "No audio found in this file",
                                            "score": 100,
                                        }
                                    ],
                                }
                            ],
                            "translation": "Übersetztes Transkript",
                            "matched_terms": {
                                "IT": [
                                    {
                                        "Fachbegriff": "API",
                                        "Bedeutung": "Schnittstelle",
                                        "count": 3,
                                    },
                                ]
                            },
                        }
                    },
                },
            }
        }
    )
TranscriptionStatusResponse

Bases: BaseModel

Response model for the transcription existence check endpoint.

ATTRIBUTE DESCRIPTION
response

Mapping of job IDs to their status

TYPE: dict

Source code in docs/microservices/core/src/models/api_output.py
class TranscriptionStatusResponse(BaseModel):
    """Response model for the transcription existence check endpoint.

    Attributes:
        response (dict): Mapping of job IDs to their status
    """

    response: dict[UUID, str]

    model_config = ConfigDict(
        json_schema_extra={
            "openapi_examples": {
                "standard": {
                    "summary": "Simple status response output.",
                    "description": "Example of a standard status response.",
                    "value": {
                        "response": {
                            "123e4567-e89b-12d3-a456-426614174000": "done",
                            "550e8400-e29b-41d4-a716-446655440000": "non_existent",
                        },
                    },
                },
            }
        }
    )
TranslateAPIOutput

Bases: BaseModel

Output of translation API.

ATTRIBUTE DESCRIPTION
translation

The translated text.

TYPE: str

Source code in docs/microservices/core/src/models/api_output.py
class TranslateAPIOutput(BaseModel):
    """Output of translation API.

    Attributes:
        translation (str): The translated text.
    """

    translation: str

    model_config = ConfigDict(
        json_schema_extra={
            "openapi_examples": {
                "standard": {
                    "summary": "Simple translation output.",
                    "description": "Example of a standard translation response.",
                    "value": {
                        "translation": "Übersetzter Gesamttext…",
                    },
                },
            }
        }
    )
UpsertTermsAPIOutput

Bases: BaseModel

Output of upsert term API.

ATTRIBUTE DESCRIPTION
updated

Number of terms that were updated.

TYPE: int

detail

Optional detail message, especially in case of errors.

TYPE: str | None

Source code in docs/microservices/core/src/models/api_output.py
class UpsertTermsAPIOutput(BaseModel):
    """Output of upsert term API.

    Attributes:
        updated (int): Number of terms that were updated.
        detail (str | None): Optional detail message, especially in case of errors.
    """

    updated: int
    detail: str | None = None

    model_config = ConfigDict(
        json_schema_extra={
            "openapi_examples": {
                "standard": {
                    "summary": "Simple upsert terms output.",
                    "description": "Example of a standard upsert terms response.",
                    "value": {
                        "updated": 3,
                        "detail": None,
                    },
                },
            }
        }
    )
general

Load and check Settings from yml.

CLASS DESCRIPTION
Authentication

Authentication settings group.

EnabledRoutes

Configuration for enabling/disabling optional routes.

FeedbackDbSettings

Settings for the Feedback-Database.

InterServiceCommunication

Configuration of all microservice communications.

LogLevel

Enum class specifying possible log levels.

PostConfig

Configuration for async_post request to other microservices.

Settings

General Settings for the service.

Authentication

Bases: BaseModel

Authentication settings group.

ATTRIBUTE DESCRIPTION
model_config

Used to ignore other services, which are defined in the config.

TYPE: ConfigDict

guest_mode

Disable authentication

TYPE: bool

keycloak_base_url

Base URL of the Keycloak identity provider.

TYPE: AnyHttpUrl

keycloak_realm

Keycloak realm to use for authentication.

TYPE: str

audience

Expected JWT audience ("aud") claim; used to validate access tokens.

TYPE: str

openid_configuration_url

The endpoint to query the OpenID-Connect configuration. This is not needed when 'guest_mode' is set to 'True'.

TYPE: AnyHttpUrl | None

Source code in docs/microservices/core/src/models/general.py
class Authentication(BaseModel):
    """Authentication settings group.

    Attributes:
        model_config (ConfigDict): Used to ignore other services, which are defined in the config.
        guest_mode (bool): Disable authentication
        keycloak_base_url (AnyHttpUrl): Base URL of the Keycloak identity provider.
        keycloak_realm (str): Keycloak realm to use for authentication.
        audience (str): Expected JWT audience ("aud") claim; used to validate access tokens.
        openid_configuration_url (AnyHttpUrl | None): The endpoint to query the OpenID-Connect configuration.
            This is not needed when 'guest_mode' is set to 'True'.
    """

    model_config = ConfigDict(extra="ignore", frozen=True)

    guest_mode: bool = True
    keycloak_base_url: AnyHttpUrl = "http://keycloak:8080"
    keycloak_realm: str = "f13"
    audience: None | str = "f13-api"
    keycloak_client: str = "f13-api"

    @property
    def openid_configuration_url(self) -> AnyHttpUrl | None:
        """Return the OpenID-Connect configuration endpoint.

        Returns:
            the URL where Keycloak serves its OpenID-Connect configuration or `None` when `guest_mode` is `True`.
        """
        if self.guest_mode:
            return None

        return HttpUrl(
            f"{self.keycloak_base_url}/realms/{self.keycloak_realm}/.well-known/openid-configuration"
        )

    @property
    def openid_token_url(self) -> None | AnyHttpUrl:
        """Return the OpenID Connect token endpoint URL.

        Returns:
            The Keycloak token endpoint URL, or None if in guest mode.
        """
        if self.guest_mode:
            return None

        return HttpUrl(
            f"{self.keycloak_base_url}/realms/{self.keycloak_realm}/protocol/openid-connect/token"
        )
openid_configuration_url property
openid_configuration_url

Return the OpenID-Connect configuration endpoint.

RETURNS DESCRIPTION
AnyHttpUrl | None

the URL where Keycloak serves its OpenID-Connect configuration or None when guest_mode is True.

openid_token_url property
openid_token_url

Return the OpenID Connect token endpoint URL.

RETURNS DESCRIPTION
None | AnyHttpUrl

The Keycloak token endpoint URL, or None if in guest mode.

EnabledRoutes

Bases: BaseModel

Configuration for enabling/disabling optional routes.

Note: The 'core' route is always enabled and not configurable.

ATTRIBUTE DESCRIPTION
chat

Enable/disable the chat endpoint

TYPE: bool

rag

Enable/disable the RAG (Retrieval-Augmented Generation) endpoint

TYPE: bool

summary

Enable/disable the summary endpoint

TYPE: bool

feedback

Enable/disable the feedback endpoint

TYPE: bool

transcription

Enable/disable the transcription endpoint

TYPE: bool

Source code in docs/microservices/core/src/models/general.py
class EnabledRoutes(BaseModel):
    """Configuration for enabling/disabling optional routes.

    Note: The 'core' route is always enabled and not configurable.

    Attributes:
        chat (bool): Enable/disable the chat endpoint
        rag (bool): Enable/disable the RAG (Retrieval-Augmented Generation) endpoint
        summary (bool): Enable/disable the summary endpoint
        feedback (bool): Enable/disable the feedback endpoint
        transcription (bool): Enable/disable the transcription endpoint
    """

    model_config = ConfigDict(extra="ignore", frozen=True)

    chat: bool = True
    rag: bool = True
    summary: bool = True
    feedback: bool = True
    transcription: bool = True
FeedbackDbSettings

Bases: BaseModel

Settings for the Feedback-Database.

ATTRIBUTE DESCRIPTION
server

Hostname of the database server

TYPE: str

port

Port number on which the database server is listening

TYPE: PositiveInt

dialect

Database dialect type. Restricted to PostgreSQL since this is the only system we have tested and gained experience with so far.

TYPE: Literal['postgresql']

db_name

Name of the database

TYPE: str

username

Username for authenticating with the database

TYPE: str

path_secret

Path to the secret file containing the database password

TYPE: FilePath

Source code in docs/microservices/core/src/models/general.py
class FeedbackDbSettings(BaseModel):
    """Settings for the Feedback-Database.

    Attributes:
        server (str): Hostname of the database server
        port (PositiveInt): Port number on which the database server is listening
        dialect (Literal["postgresql"]): Database dialect type. Restricted to PostgreSQL
            since this is the only system we have tested and gained experience with so far.
        db_name (str): Name of the database
        username (str): Username for authenticating with the database
        path_secret (FilePath): Path to the secret file containing the database password
    """

    model_config = ConfigDict(extra="ignore", frozen=True)

    server: str = "feedback-db"
    port: PositiveInt = 5432
    dialect: Literal["postgresql"] = "postgresql"
    db_name: str = "feedback"
    username: str = "member"
    path_secret: FilePath = Path("/core/secrets/feedback_db.secret")

    @property
    def url(self) -> str:
        """Construct the database URL using credentials from a secret file."""
        with open(self.path_secret) as f:
            password = f.read().splitlines()[0]

        return f"{self.dialect}://{self.username}:{password}@{self.server}:{self.port}/{self.db_name}"
url property
url

Construct the database URL using credentials from a secret file.

InterServiceCommunication

Bases: BaseModel

Configuration of all microservice communications.

ATTRIBUTE DESCRIPTION
parser

Default configuration for parsing microservice.

TYPE: PostConfig

summary

Default configuration for summary microservice.

TYPE: PostConfig

chat

Default configuration for chat microservice.

TYPE: PostConfig

rag

Default configuration for rag microservice.

TYPE: PostConfig

Source code in docs/microservices/core/src/models/general.py
class InterServiceCommunication(BaseModel):
    """Configuration of all microservice communications.

    Attributes:
        parser (PostConfig): Default configuration for parsing microservice.
        summary (PostConfig): Default configuration for summary microservice.
        chat (PostConfig): Default configuration for chat microservice.
        rag (PostConfig): Default configuration for rag microservice.
    """

    parser: PostConfig = PostConfig(timeout_in_s=200)
    summary: PostConfig = PostConfig(timeout_in_s=600)
    chat: PostConfig = PostConfig()
    rag: PostConfig = PostConfig()
    transcription: PostConfig = PostConfig(timeout_in_s=600)
LogLevel

Bases: StrEnum

Enum class specifying possible log levels.

Source code in docs/microservices/core/src/models/general.py
class LogLevel(StrEnum):
    """Enum class specifying possible log levels."""

    CRITICAL = "CRITICAL"
    ERROR = "ERROR"
    WARNING = "WARNING"
    INFO = "INFO"
    DEBUG = "DEBUG"

    @classmethod
    def _missing_(cls, value: object) -> None:
        """Convert strings to uppercase and recheck for existance."""
        if isinstance(value, str):
            value = value.upper()
            for level in cls:
                if level == value:
                    return level
        return None
PostConfig

Bases: BaseModel

Configuration for async_post request to other microservices.

The default values in this class can be overwritten by those values stated in configs/general.yml.

ATTRIBUTE DESCRIPTION
model_config

Used to ignore other services, which are defined in the config.

TYPE: ConfigDict

max_attempts

Maximal number of requests before returning status code 424.

TYPE: PositiveInt

timeout_in_s

Maximum waiting duration before timeout (in seconds).

TYPE: PositiveInt

Source code in docs/microservices/core/src/models/general.py
class PostConfig(BaseModel):
    """Configuration for async_post request to other microservices.

    The default values in this class can be overwritten by those values stated in configs/general.yml.

    Attributes:
        model_config (ConfigDict): Used to ignore other services, which are defined in the config.
        max_attempts (PositiveInt): Maximal number of requests before returning status code 424.
        timeout_in_s (PositiveInt):  Maximum waiting duration before timeout (in seconds).
    """

    model_config = ConfigDict(extra="ignore")
    max_attempts: PositiveInt = 3
    timeout_in_s: PositiveInt = 180
Settings

Bases: BaseModel

General Settings for the service.

ATTRIBUTE DESCRIPTION
model_config

Used to ignore other services, which are defined in the config.

TYPE: ConfigDict

service_name

Name of service, i.e. 'core'

TYPE: str

authentication

Authentication settings

TYPE: Authentication

n_uvicorn_workers

Number of parallel uvicorn instances.

TYPE: PositiveInt

service_endpoints

URLs of required services (e.g. rag, chat, feedback-db,...)

TYPE: dict[str, AnyHttpUrl]

feedback_db

Settings for feedback database

TYPE: FeedbackDbSettings

log_level

Minimal level of logging output given.

TYPE: LogLevel

log_file_max_bytes

(PositiveInt): Max file size for logfile

TYPE: PositiveInt

log_file_backup_count

Number of log-files to loop over

TYPE: PositiveInt

log_file

Write logfile there.

TYPE: FilePath

inter_service_communication

Configuration of communication with other services.

TYPE: InterServiceCommunication

enabled_routes

Configuration for enabling/disabling optional routes.

TYPE: EnabledRoutes

METHOD DESCRIPTION
ensure_log_dir

Create the log directory after validation.

Source code in docs/microservices/core/src/models/general.py
class Settings(BaseModel):
    """General Settings for the service.

    Attributes:
        model_config (ConfigDict): Used to ignore other services, which are defined in the config.
        service_name (str): Name of service, i.e. 'core'
        authentication (Authentication): Authentication settings
        n_uvicorn_workers (PositiveInt): Number of parallel uvicorn instances.
        service_endpoints (dict[str, AnyHttpUrl]): URLs of required services (e.g. rag, chat, feedback-db,...)
        feedback_db (FeedbackDbSettings): Settings for feedback database
        log_level (LogLevel): Minimal level of logging output given.
        log_file_max_bytes: (PositiveInt): Max file size for logfile
        log_file_backup_count (PositiveInt): Number of log-files to loop over
        log_file (FilePath): Write logfile there.
        inter_service_communication (InterServiceCommunication): Configuration of communication with other services.
        enabled_routes (EnabledRoutes): Configuration for enabling/disabling optional routes.
    """

    model_config = ConfigDict(extra="ignore", frozen=True)

    service_name: str = "Core"
    service_descripton: str = (
        "Coordination of the microservices and provision of central functions"
    )

    authentication: Authentication = Authentication()
    allow_origins: list[AnyHttpUrl] = [HttpUrl("http://localhost:9999")]

    n_uvicorn_workers: PositiveInt = 1

    service_endpoints: dict[str, AnyHttpUrl]
    feedback_db: FeedbackDbSettings = FeedbackDbSettings()

    log_level: LogLevel = LogLevel.INFO
    log_file_max_bytes: PositiveInt = 1 * 1024 * 1024
    log_file_backup_count: PositiveInt = 3
    log_file: FilePath = Path("/core/logs/log")

    inter_service_communication: InterServiceCommunication = InterServiceCommunication()
    enabled_routes: EnabledRoutes = EnabledRoutes()

    @model_validator(mode="after")
    def ensure_log_dir(self) -> "Settings":
        """Create the log directory after validation."""
        self.log_file.parent.mkdir(parents=True, exist_ok=True)
        return self
ensure_log_dir
ensure_log_dir()

Create the log directory after validation.

Source code in docs/microservices/core/src/models/general.py
@model_validator(mode="after")
def ensure_log_dir(self) -> "Settings":
    """Create the log directory after validation."""
    self.log_file.parent.mkdir(parents=True, exist_ok=True)
    return self

settings

Load all settings from a central place, not hidden in utils.

FUNCTION DESCRIPTION
get_config_dir

Get the default config dir.

get_general_settings

Read the file 'general.yml' from the config dir.

get_config_dir
get_config_dir()

Get the default config dir.

All needed configuration files are supposed to be found here. Directories are checked for existence and an 'Exception' is thrown if the check is negative.

Source code in docs/microservices/core/src/settings.py
def get_config_dir() -> Path:
    """Get the default config dir.

    All needed configuration files are supposed to be found here. Directories are checked for
    existence and an 'Exception' is thrown if the check is negative.
    """
    path_to_config_dir = Path(__file__).parent / ".." / "configs"
    if not path_to_config_dir.exists():
        logger.fatal(f"Invalid path to config dir: '{path_to_config_dir}'")
        raise Exception(f"Invalid path to config dir: '{path_to_config_dir}'")

    return path_to_config_dir
get_general_settings
get_general_settings()

Read the file 'general.yml' from the config dir.

Source code in docs/microservices/core/src/settings.py
def get_general_settings() -> Settings:
    """Read the file 'general.yml' from the config dir."""
    general_config_paths = get_config_dir() / "general.yml"
    if not general_config_paths.exists():
        logger.fatal(f"Invalid path to 'general.yml': '{general_config_paths}'")
        raise Exception(f"Invalid path to 'general.yml': '{general_config_paths}'")

    settings = load_all_configs(general_config_paths)
    logger.debug("Loaded settings:%r", pprint.pformat(settings))
    return settings

utils

Utils functions for logging, LLM availability check and configuration processing.

MODULE DESCRIPTION
base_logger

Set up the root logger for the entire application. This logger will log messages to the console and a file.

process_configs

Methods to load and config and start checks of config integrity.

base_logger

Set up the root logger for the entire application. This logger will log messages to the console and a file.

FUNCTION DESCRIPTION
setup_logger

Initialize the logger with the desired log level and add handlers.

setup_logger
setup_logger(settings)

Initialize the logger with the desired log level and add handlers.

Sets up the root logger, which all other loggers inherit from. Adds file, console and exit handlers to the logger and sets the format.

Source code in docs/microservices/core/src/utils/base_logger.py
def setup_logger(settings: Settings) -> None:
    """Initialize the logger with the desired log level and add handlers.

    Sets up the root logger, which all other loggers inherit from.
    Adds file, console and exit handlers to the logger and sets the format.
    """
    # root logger, all other loggers inherit from this
    logger = logging.getLogger()

    # create different handlers for log file and console
    file_handler = logging.handlers.RotatingFileHandler(
        filename=settings.log_file,
        maxBytes=settings.log_file_max_bytes,
        backupCount=settings.log_file_backup_count,
    )
    console_handler = logging.StreamHandler()

    # define log format and set for each handler
    formatter = logging.Formatter(
        fmt="%(asctime)s - %(levelname)8s - %(module)s - %(funcName)s: %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S%z",
    )
    file_handler.setFormatter(formatter)
    console_handler.setFormatter(formatter)

    # add handlers to the logger
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)

    logger.setLevel(settings.log_level)
process_configs

Methods to load and config and start checks of config integrity.

FUNCTION DESCRIPTION
load_all_configs

Load config settings from respective paths.

load_from_yml_in_pydantic_model

Load config from 'list_of_yaml_paths' into given pydantic-Model.

load_yaml

Load yaml.

load_all_configs
load_all_configs(general_config_paths)

Load config settings from respective paths.

PARAMETER DESCRIPTION
general_config_paths

Path to config, matching 'Settings'

TYPE: Path

RETURNS DESCRIPTION
Settings

Config loaded into their Pydantic Model.

Source code in docs/microservices/core/src/utils/process_configs.py
def load_all_configs(
    general_config_paths: Path,
) -> Settings:
    """Load config settings from respective paths.

    Args:
        general_config_paths (Path): Path to config, matching 'Settings'

    Returns:
        Config loaded into their Pydantic Model.
    """
    settings = load_from_yml_in_pydantic_model(general_config_paths, Settings)

    return settings
load_from_yml_in_pydantic_model
load_from_yml_in_pydantic_model(yaml_path, pydantic_reference_model)

Load config from 'list_of_yaml_paths' into given pydantic-Model.

PARAMETER DESCRIPTION
yaml_path

Yaml to load

TYPE: Path

pydantic_reference_model

pydantic model to load yaml into

TYPE: BaseModel

RETURNS DESCRIPTION
BaseModel

BaseModel derived pydantic data class.

Source code in docs/microservices/core/src/utils/process_configs.py
def load_from_yml_in_pydantic_model(
    yaml_path: Path, pydantic_reference_model: BaseModel
) -> BaseModel:
    """Load config from 'list_of_yaml_paths' into given pydantic-Model.

    Args:
        yaml_path (Path): Yaml to load
        pydantic_reference_model (BaseModel): pydantic model to load yaml into

    Returns:
        BaseModel derived pydantic data class.
    """
    data = load_yaml(yaml_path)

    try:
        pydantic_class = pydantic_reference_model(**data)
        logger.info(f"Config loaded from: '{yaml_path}'")
        return pydantic_class

    except ValidationError as e:
        logger.critical(f"Error loading config: '{e}'")
        raise e
load_yaml
load_yaml(yaml_path)

Load yaml.

PARAMETER DESCRIPTION
yaml_path

Path to yaml

TYPE: list[Path]

RETURNS DESCRIPTION
dict[str, Any]

Content of loaded yaml.

Source code in docs/microservices/core/src/utils/process_configs.py
def load_yaml(yaml_path: Path) -> dict[str, Any]:
    """Load yaml.

    Args:
        yaml_path (list[Path]): Path to yaml

    Returns:
        Content of loaded yaml.
    """
    if not yaml_path.exists():
        logger.error(f"Invalid path: '{yaml_path}'")
        raise FileNotFoundError

    with open(yaml_path) as file:
        return yaml.safe_load(file)