Skip to content

Client API Reference

lauren_mcp

lauren-mcp — Model Context Protocol server and client for Lauren applications.

McpServer

Static factory for creating MCP clients for different transports.

Usage::

# stdio subprocess
client = McpServer.stdio(["python", "-m", "my_mcp_server"])

# WebSocket (requires lauren-mcp[ws])
client = McpServer.ws("ws://localhost:8000/mcp/ws")

# HTTP + SSE, legacy 2024-11-05 transport (requires lauren-mcp[sse])
client = McpServer.http("http://localhost:8000/mcp")

# Streamable HTTP, 2025-03-26 transport (requires lauren-mcp[sse])
client = McpServer.streamable_http("http://localhost:8000/mcp")

await client.connect()
tools = await client.list_tools()
await client.close()

All factories accept these optional keyword arguments:

protocol_version Protocol version to request during the handshake (defaults to the latest the library supports). roots Static list of :class:~lauren_mcp.Root or a callable returning the current roots; advertised via the roots capability. progress_handler / log_handler / list_changed_handler Callbacks invoked when the server pushes the matching notification. resource_updated_handler Callback invoked with the resource URI string whenever the server pushes a notifications/resources/updated notification. Only fired for resources that the client has subscribed to via :meth:~McpClientProtocol.subscribe_resource. sampling_handler / elicitation_handler Callbacks answering server-initiated sampling/createMessage / elicitation/create requests. sampling_tools When True and a sampling_handler is provided, advertises {"tools": True} in the sampling capability during the handshake, indicating the client can handle tool-use sampling loops.

After connect(), all clients expose:

set_logging_level(level) Send logging/setLevel to the server. level is one of "debug", "info", "notice", "warning", "error", "critical", "alert", "emergency". subscribe_resource(uri) / unsubscribe_resource(uri) Subscribe / unsubscribe to notifications/resources/updated for the given URI. complete(ref, argument) Request completion suggestions (completion/complete).

stdio staticmethod

stdio(
    command,
    *,
    max_retries=3,
    startup_timeout=10.0,
    **feature_kwargs,
)

Create an MCP stdio client that launches command as a subprocess.

Parameters

command: Argv sequence, e.g. ["python", "-m", "myserver"]. max_retries: Subprocess restart attempts on unexpected EOF. startup_timeout: Seconds to wait for the initialize handshake response.

Source code in src/lauren_mcp/_client/_factory.py
@staticmethod
def stdio(
    command: list[str] | tuple[str, ...],
    *,
    max_retries: int = 3,
    startup_timeout: float = 10.0,
    **feature_kwargs: Any,
) -> McpClientProtocol:
    """Create an MCP stdio client that launches *command* as a subprocess.

    Parameters
    ----------
    command:
        Argv sequence, e.g. ``["python", "-m", "myserver"]``.
    max_retries:
        Subprocess restart attempts on unexpected EOF.
    startup_timeout:
        Seconds to wait for the ``initialize`` handshake response.
    """
    from ._stdio import McpStdioClient

    return McpStdioClient(
        command,
        max_retries=max_retries,
        startup_timeout=startup_timeout,
        **feature_kwargs,
    )

ws staticmethod

ws(
    url,
    *,
    headers=None,
    max_retries=3,
    startup_timeout=10.0,
    **feature_kwargs,
)

Create an MCP WebSocket client.

Requires pip install 'lauren-mcp[ws]'.

Parameters

url: Full WebSocket URL, e.g. "ws://localhost:8000/mcp/ws". headers: Optional extra HTTP headers sent during the upgrade handshake. max_retries: Reconnect attempts after unexpected disconnect. startup_timeout: Seconds to wait for the initialize handshake response.

Source code in src/lauren_mcp/_client/_factory.py
@staticmethod
def ws(
    url: str,
    *,
    headers: dict[str, str] | None = None,
    max_retries: int = 3,
    startup_timeout: float = 10.0,
    **feature_kwargs: Any,
) -> McpClientProtocol:
    """Create an MCP WebSocket client.

    Requires ``pip install 'lauren-mcp[ws]'``.

    Parameters
    ----------
    url:
        Full WebSocket URL, e.g. ``"ws://localhost:8000/mcp/ws"``.
    headers:
        Optional extra HTTP headers sent during the upgrade handshake.
    max_retries:
        Reconnect attempts after unexpected disconnect.
    startup_timeout:
        Seconds to wait for the ``initialize`` handshake response.
    """
    from ._ws import McpWebSocketClient

    return McpWebSocketClient(
        url,
        headers=headers,
        max_retries=max_retries,
        startup_timeout=startup_timeout,
        **feature_kwargs,
    )

http staticmethod

http(
    url,
    *,
    headers=None,
    auth=None,
    max_retries=3,
    startup_timeout=10.0,
    **feature_kwargs,
)

Create an MCP HTTP+SSE client (legacy 2024-11-05 transport).

Requires pip install 'lauren-mcp[sse]'.

For servers speaking the 2025-03-26 Streamable HTTP transport use :meth:streamable_http instead.

Parameters

url: Base URL of the MCP HTTP+SSE server, e.g. "http://localhost:8000/mcp". headers: Optional extra HTTP headers included in every request. auth: Optional httpx-compatible auth object (e.g. :class:~lauren_mcp._client._oauth.ClientCredentialsProvider). max_retries: Reconnect attempts after SSE stream closes unexpectedly. startup_timeout: Seconds to wait for the initialize handshake response.

Source code in src/lauren_mcp/_client/_factory.py
@staticmethod
def http(
    url: str,
    *,
    headers: dict[str, str] | None = None,
    auth: Any = None,
    max_retries: int = 3,
    startup_timeout: float = 10.0,
    **feature_kwargs: Any,
) -> McpClientProtocol:
    """Create an MCP HTTP+SSE client (legacy 2024-11-05 transport).

    Requires ``pip install 'lauren-mcp[sse]'``.

    For servers speaking the 2025-03-26 Streamable HTTP transport use
    :meth:`streamable_http` instead.

    Parameters
    ----------
    url:
        Base URL of the MCP HTTP+SSE server, e.g.
        ``"http://localhost:8000/mcp"``.
    headers:
        Optional extra HTTP headers included in every request.
    auth:
        Optional ``httpx``-compatible auth object (e.g.
        :class:`~lauren_mcp._client._oauth.ClientCredentialsProvider`).
    max_retries:
        Reconnect attempts after SSE stream closes unexpectedly.
    startup_timeout:
        Seconds to wait for the ``initialize`` handshake response.
    """
    from ._sse import McpHttpSseClient

    return McpHttpSseClient(
        url,
        headers=headers,
        auth=auth,
        max_retries=max_retries,
        startup_timeout=startup_timeout,
        **feature_kwargs,
    )

streamable_http staticmethod

streamable_http(
    url,
    *,
    headers=None,
    auth=None,
    max_retries=3,
    startup_timeout=10.0,
    **feature_kwargs,
)

Create an MCP Streamable HTTP client (2025-03-26 transport).

Requires pip install 'lauren-mcp[sse]'.

Parameters

url: Base URL of the MCP endpoint, e.g. "http://localhost:8000/mcp". headers: Optional extra HTTP headers included in every request. auth: Optional httpx-compatible auth object (e.g. :class:~lauren_mcp._client._oauth.ClientCredentialsProvider). max_retries: Reconnect attempts after the connection drops. startup_timeout: Seconds to wait for the initialize handshake response.

Source code in src/lauren_mcp/_client/_factory.py
@staticmethod
def streamable_http(
    url: str,
    *,
    headers: dict[str, str] | None = None,
    auth: Any = None,
    max_retries: int = 3,
    startup_timeout: float = 10.0,
    **feature_kwargs: Any,
) -> McpClientProtocol:
    """Create an MCP Streamable HTTP client (2025-03-26 transport).

    Requires ``pip install 'lauren-mcp[sse]'``.

    Parameters
    ----------
    url:
        Base URL of the MCP endpoint, e.g. ``"http://localhost:8000/mcp"``.
    headers:
        Optional extra HTTP headers included in every request.
    auth:
        Optional ``httpx``-compatible auth object (e.g.
        :class:`~lauren_mcp._client._oauth.ClientCredentialsProvider`).
    max_retries:
        Reconnect attempts after the connection drops.
    startup_timeout:
        Seconds to wait for the ``initialize`` handshake response.
    """
    from ._streamable import McpStreamableHttpClient

    return McpStreamableHttpClient(
        url,
        headers=headers,
        auth=auth,
        max_retries=max_retries,
        startup_timeout=startup_timeout,
        **feature_kwargs,
    )

McpClientProtocol

Bases: ABC

Abstract interface for all MCP transport clients.

Concrete implementations must provide transport-specific connect / close logic and override all abstract methods. The protocol methods map one-to-one to MCP JSON-RPC methods.

connect abstractmethod async

connect()

Establish the transport connection and complete the MCP handshake.

Must be called before any protocol method. Calling connect() on an already-connected client has implementation-defined behaviour (either a no-op or a re-connect).

Source code in src/lauren_mcp/_client/_protocol.py
@abstractmethod
async def connect(self) -> None:
    """Establish the transport connection and complete the MCP handshake.

    Must be called before any protocol method.  Calling connect()
    on an already-connected client has implementation-defined
    behaviour (either a no-op or a re-connect).
    """

close abstractmethod async

close()

Tear down the transport connection gracefully.

Cancels any pending in-flight requests, closes the underlying socket / pipe, and cleans up background tasks.

Source code in src/lauren_mcp/_client/_protocol.py
@abstractmethod
async def close(self) -> None:
    """Tear down the transport connection gracefully.

    Cancels any pending in-flight requests, closes the underlying
    socket / pipe, and cleans up background tasks.
    """

list_tools abstractmethod async

list_tools()

Retrieve the server's tool catalogue (tools/list).

Returns a list of :class:~lauren_mcp._types.ToolSchema objects describing each available tool.

Source code in src/lauren_mcp/_client/_protocol.py
@abstractmethod
async def list_tools(self) -> list[ToolSchema]:
    """Retrieve the server's tool catalogue (``tools/list``).

    Returns a list of :class:`~lauren_mcp._types.ToolSchema` objects
    describing each available tool.
    """

call_tool abstractmethod async

call_tool(name, arguments=None)

Invoke a tool on the server (tools/call).

Parameters

name: The tool name as reported by :meth:list_tools. arguments: Keyword arguments to pass to the tool. Must conform to the tool's inputSchema.

Returns

Any The raw result value from the server's response.

Source code in src/lauren_mcp/_client/_protocol.py
@abstractmethod
async def call_tool(
    self,
    name: str,
    arguments: dict[str, Any] | None = None,
) -> Any:
    """Invoke a tool on the server (``tools/call``).

    Parameters
    ----------
    name:
        The tool name as reported by :meth:`list_tools`.
    arguments:
        Keyword arguments to pass to the tool.  Must conform to the
        tool's ``inputSchema``.

    Returns
    -------
    Any
        The raw result value from the server's response.
    """

list_resources abstractmethod async

list_resources()

Retrieve the server's resource catalogue (resources/list).

Returns a list of :class:~lauren_mcp._types.ResourceSchema objects.

Source code in src/lauren_mcp/_client/_protocol.py
@abstractmethod
async def list_resources(self) -> list[ResourceSchema]:
    """Retrieve the server's resource catalogue (``resources/list``).

    Returns a list of :class:`~lauren_mcp._types.ResourceSchema`
    objects.
    """

read_resource abstractmethod async

read_resource(uri)

Read a resource by URI (resources/read).

Parameters

uri: The exact URI of the resource to read, which may be a concrete instantiation of a URI template returned by :meth:list_resources.

Returns

Any The raw contents returned by the server.

Source code in src/lauren_mcp/_client/_protocol.py
@abstractmethod
async def read_resource(self, uri: str) -> Any:
    """Read a resource by URI (``resources/read``).

    Parameters
    ----------
    uri:
        The exact URI of the resource to read, which may be a
        concrete instantiation of a URI template returned by
        :meth:`list_resources`.

    Returns
    -------
    Any
        The raw contents returned by the server.
    """

subscribe_resource abstractmethod async

subscribe_resource(uri)

Subscribe to change notifications for the resource at uri.

Sends resources/subscribe with {"uri": uri}. After a successful subscription the server will push notifications/resources/updated whenever the resource changes.

The server returns METHOD_NOT_FOUND if it does not support subscriptions; this is surfaced as :class:McpCallError with code == -32601.

Parameters

uri: The exact resource URI previously returned by :meth:list_resources.

Source code in src/lauren_mcp/_client/_protocol.py
@abstractmethod
async def subscribe_resource(self, uri: str) -> None:
    """Subscribe to change notifications for the resource at *uri*.

    Sends ``resources/subscribe`` with ``{"uri": uri}``.  After a
    successful subscription the server will push
    ``notifications/resources/updated`` whenever the resource changes.

    The server returns ``METHOD_NOT_FOUND`` if it does not support
    subscriptions; this is surfaced as :class:`McpCallError` with
    ``code == -32601``.

    Parameters
    ----------
    uri:
        The exact resource URI previously returned by :meth:`list_resources`.
    """

unsubscribe_resource abstractmethod async

unsubscribe_resource(uri)

Cancel a previously established resource subscription.

Sends resources/unsubscribe with {"uri": uri}. The server will stop pushing notifications/resources/updated for this URI.

Parameters

uri: The URI that was passed to :meth:subscribe_resource.

Source code in src/lauren_mcp/_client/_protocol.py
@abstractmethod
async def unsubscribe_resource(self, uri: str) -> None:
    """Cancel a previously established resource subscription.

    Sends ``resources/unsubscribe`` with ``{"uri": uri}``.  The server
    will stop pushing ``notifications/resources/updated`` for this URI.

    Parameters
    ----------
    uri:
        The URI that was passed to :meth:`subscribe_resource`.
    """

list_prompts abstractmethod async

list_prompts()

Retrieve the server's prompt catalogue (prompts/list).

Returns a list of :class:~lauren_mcp._types.PromptSchema objects.

Source code in src/lauren_mcp/_client/_protocol.py
@abstractmethod
async def list_prompts(self) -> list[PromptSchema]:
    """Retrieve the server's prompt catalogue (``prompts/list``).

    Returns a list of :class:`~lauren_mcp._types.PromptSchema`
    objects.
    """

get_prompt abstractmethod async

get_prompt(name, arguments=None)

Retrieve a rendered prompt from the server (prompts/get).

Parameters

name: The prompt name as reported by :meth:list_prompts. arguments: String arguments to substitute into the prompt template.

Returns

Any The raw GetPromptResult value from the server.

Source code in src/lauren_mcp/_client/_protocol.py
@abstractmethod
async def get_prompt(
    self,
    name: str,
    arguments: dict[str, str] | None = None,
) -> Any:
    """Retrieve a rendered prompt from the server (``prompts/get``).

    Parameters
    ----------
    name:
        The prompt name as reported by :meth:`list_prompts`.
    arguments:
        String arguments to substitute into the prompt template.

    Returns
    -------
    Any
        The raw GetPromptResult value from the server.
    """

ping abstractmethod async

ping()

Send a ping request and await the empty {} response.

Useful for connection health-checks and keep-alive probing. Raises :class:McpCallError (or a subclass) on failure.

Source code in src/lauren_mcp/_client/_protocol.py
@abstractmethod
async def ping(self) -> None:
    """Send a ``ping`` request and await the empty ``{}`` response.

    Useful for connection health-checks and keep-alive probing.
    Raises :class:`McpCallError` (or a subclass) on failure.
    """

set_logging_level abstractmethod async

set_logging_level(level)

Ask the server to change its minimum log-notification threshold.

Sends logging/setLevel with {"level": level}. The server will suppress notifications/message entries below level from that point forward.

Parameters

level: One of "debug", "info", "notice", "warning", "error", "critical", "alert", "emergency".

Raises

ValueError If level is not one of the accepted strings. McpCallError If the server returns a JSON-RPC error response.

Source code in src/lauren_mcp/_client/_protocol.py
@abstractmethod
async def set_logging_level(self, level: str) -> None:
    """Ask the server to change its minimum log-notification threshold.

    Sends ``logging/setLevel`` with ``{"level": level}``.  The server
    will suppress ``notifications/message`` entries below *level* from
    that point forward.

    Parameters
    ----------
    level:
        One of ``"debug"``, ``"info"``, ``"notice"``, ``"warning"``,
        ``"error"``, ``"critical"``, ``"alert"``, ``"emergency"``.

    Raises
    ------
    ValueError
        If *level* is not one of the accepted strings.
    McpCallError
        If the server returns a JSON-RPC error response.
    """

complete abstractmethod async

complete(ref, argument)

Request completion suggestions from the server (completion/complete).

Parameters

ref: A reference object identifying the completable item, e.g. {"type": "ref/prompt", "name": "greet"}. argument: The argument being completed, e.g. {"name": "nam", "value": "Jo"}.

Returns

Any The raw completion result from the server.

Source code in src/lauren_mcp/_client/_protocol.py
@abstractmethod
async def complete(self, ref: dict[str, Any], argument: dict[str, Any]) -> Any:
    """Request completion suggestions from the server (``completion/complete``).

    Parameters
    ----------
    ref:
        A reference object identifying the completable item, e.g.
        ``{"type": "ref/prompt", "name": "greet"}``.
    argument:
        The argument being completed, e.g.
        ``{"name": "nam", "value": "Jo"}``.

    Returns
    -------
    Any
        The raw completion result from the server.
    """

McpServerConfig dataclass

McpServerConfig(alias, client)

Named wrapper pairing an alias string with an MCP client.

McpToolBridge

McpToolBridge(servers)

SINGLETON that manages MCP client lifecycles and populates ToolRegistry.

This class is optional — it is only usable when lauren-ai is installed. Instantiate via AgentModule.for_root(mcp_servers=[...]).

Note: this class deliberately does NOT use @injectable from lauren so that it can be used without lauren-ai as a standalone orchestration helper.

Source code in src/lauren_mcp/_bridge.py
def __init__(self, servers: list[McpServerConfig]) -> None:
    self._servers = servers
    self._registry: Any = None  # set via set_registry before connect_all
    self._watch_tasks: list[asyncio.Task[None]] = []

set_registry

set_registry(registry)

Attach a ToolRegistry (or any object with register_mcp_server) to this bridge.

Must be called before :meth:connect_all if tool registration is desired.

Source code in src/lauren_mcp/_bridge.py
def set_registry(self, registry: Any) -> None:
    """Attach a ToolRegistry (or any object with register_mcp_server) to this bridge.

    Must be called before :meth:`connect_all` if tool registration is desired.
    """
    self._registry = registry

connect_all async

connect_all()

Connect every configured MCP server and load tools into the registry.

For each configured server the method:

  1. Calls client.connect() to perform the MCP initialize handshake.
  2. Calls client.list_tools() to retrieve all available tool schemas.
  3. Registers those tools via registry.register_mcp_server(alias, tools, client) (skipped when no registry has been attached via :meth:set_registry).

Failures in individual servers are caught and logged at ERROR level so that a single broken server does not prevent the remaining servers from loading.

Source code in src/lauren_mcp/_bridge.py
async def connect_all(self) -> None:
    """Connect every configured MCP server and load tools into the registry.

    For each configured server the method:

    1. Calls ``client.connect()`` to perform the MCP initialize handshake.
    2. Calls ``client.list_tools()`` to retrieve all available tool schemas.
    3. Registers those tools via ``registry.register_mcp_server(alias, tools, client)``
       (skipped when no registry has been attached via :meth:`set_registry`).

    Failures in individual servers are caught and logged at ERROR level so
    that a single broken server does not prevent the remaining servers from
    loading.
    """
    for cfg in self._servers:
        try:
            await cfg.client.connect()
            tools = await cfg.client.list_tools()
            if self._registry is not None:
                self._registry.register_mcp_server(cfg.alias, tools, cfg.client)
            logger.info(
                "MCP bridge: loaded %d tools from '%s'",
                len(tools),
                cfg.alias,
            )
            for tool in tools:
                logger.info("  %s__%s", cfg.alias, tool.name)
        except Exception as exc:  # noqa: BLE001
            logger.error(
                "MCP bridge: failed to connect '%s': %s",
                cfg.alias,
                exc,
            )

disconnect_all async

disconnect_all()

Cancel all watch tasks and close every configured MCP client.

Exceptions raised by individual client.close() calls are silently suppressed so that all clients receive a close attempt regardless of whether earlier ones failed.

Source code in src/lauren_mcp/_bridge.py
async def disconnect_all(self) -> None:
    """Cancel all watch tasks and close every configured MCP client.

    Exceptions raised by individual ``client.close()`` calls are silently
    suppressed so that all clients receive a close attempt regardless of
    whether earlier ones failed.
    """
    for task in self._watch_tasks:
        task.cancel()
    for cfg in self._servers:
        try:  # noqa: SIM105
            await cfg.client.close()
        except Exception:  # noqa: BLE001
            pass

_ClientFeaturesMixin

Notification handlers, roots, and protocol-version state for clients.

Consuming classes must provide _send_raw(obj) -> Awaitable[None].

protocol_version property

protocol_version

The protocol version negotiated with the server.

Raises RuntimeError before :meth:connect completes.

on_progress

on_progress(handler)

Register a handler for notifications/progress.

Source code in src/lauren_mcp/_client/_features.py
def on_progress(self, handler: NotificationHandler) -> Unsubscribe:
    """Register a handler for ``notifications/progress``."""
    self._progress_handlers.append(handler)
    return lambda: self._discard(self._progress_handlers, handler)

on_log

on_log(handler)

Register a handler for notifications/message (server logs).

Source code in src/lauren_mcp/_client/_features.py
def on_log(self, handler: NotificationHandler) -> Unsubscribe:
    """Register a handler for ``notifications/message`` (server logs)."""
    self._log_handlers.append(handler)
    return lambda: self._discard(self._log_handlers, handler)

on_list_changed

on_list_changed(handler)

Register a handler for tool/resource/prompt list_changed.

Source code in src/lauren_mcp/_client/_features.py
def on_list_changed(self, handler: ListChangedHandler) -> Unsubscribe:
    """Register a handler for tool/resource/prompt ``list_changed``."""
    self._list_changed_handlers.append(handler)
    return lambda: self._discard(self._list_changed_handlers, handler)

on_resource_updated

on_resource_updated(handler)

Register a handler for notifications/resources/updated.

Returns a zero-arg callable that removes the handler when called.

Parameters

handler: Callable invoked with the resource URI string whenever a subscribed resource changes.

Source code in src/lauren_mcp/_client/_features.py
def on_resource_updated(self, handler: ResourceUpdatedHandler) -> Unsubscribe:
    """Register a handler for ``notifications/resources/updated``.

    Returns a zero-arg callable that removes the handler when called.

    Parameters
    ----------
    handler:
        Callable invoked with the resource URI string whenever a subscribed
        resource changes.
    """
    self._resource_updated_handlers.append(handler)
    return lambda: self._discard(self._resource_updated_handlers, handler)

notify_roots_changed async

notify_roots_changed()

Send notifications/roots/list_changed to the server.

Only meaningful when dynamic roots (a callable) were supplied.

Source code in src/lauren_mcp/_client/_features.py
async def notify_roots_changed(self) -> None:
    """Send ``notifications/roots/list_changed`` to the server.

    Only meaningful when dynamic roots (a callable) were supplied.
    """
    if self._roots is None:
        raise RuntimeError("notify_roots_changed() requires roots to be configured")
    await self._send_raw(  # type: ignore[attr-defined]
        {"jsonrpc": "2.0", "method": "notifications/roots/list_changed"}
    )

ClientCredentialsProvider

ClientCredentialsProvider(
    token_endpoint,
    client_id,
    client_secret,
    scopes=None,
    storage=None,
    extra_params=None,
)

httpx.AsyncAuth-compatible OAuth 2.0 client-credentials token provider.

Fetches a bearer token from token_endpoint using the client_credentials grant, caches it in storage, and automatically refreshes it when the cache misses (or is about to expire). On a 401 response the provider invalidates the cache and retries the request once.

Usage::

auth = ClientCredentialsProvider(
    token_endpoint="https://auth.example.com/oauth/token",
    client_id="my-service",
    client_secret="s3cr3t",
    scopes=["mcp.read", "mcp.write"],
)
client = McpServer.streamable_http("https://api.example.com/mcp", auth=auth)
await client.connect()

Parameters

token_endpoint: Full URL of the token endpoint. client_id: OAuth client identifier. client_secret: OAuth client secret. scopes: Optional list of scope strings to request. storage: Token cache backend. Defaults to :class:InMemoryTokenStorage. extra_params: Additional form fields to include in the token request body (e.g. {"audience": "https://api.example.com"} for Auth0).

Source code in src/lauren_mcp/_client/_oauth.py
def __init__(
    self,
    token_endpoint: str,
    client_id: str,
    client_secret: str,
    scopes: list[str] | None = None,
    storage: TokenStorage | None = None,
    extra_params: dict[str, str] | None = None,
) -> None:
    if not _HTTPX_AVAILABLE:
        raise ImportError(
            "Install lauren-mcp[sse] to use ClientCredentialsProvider: "
            "pip install 'lauren-mcp[sse]'"
        )
    self._token_endpoint = token_endpoint
    self._client_id = client_id
    self._client_secret = client_secret
    self._scopes = scopes or []
    self._storage: TokenStorage = storage or InMemoryTokenStorage()
    self._extra_params = extra_params or {}
    self._lock = asyncio.Lock()

get_token async

get_token()

Return a valid bearer token, fetching a new one if necessary.

Source code in src/lauren_mcp/_client/_oauth.py
async def get_token(self) -> str:
    """Return a valid bearer token, fetching a new one if necessary."""
    cached = await self._storage.get_token()
    if cached is not None:
        return cached
    async with self._lock:
        # Double-checked locking: another coroutine may have fetched it.
        cached = await self._storage.get_token()
        if cached is not None:
            return cached
        return await self._fetch_token()

async_auth_flow async

async_auth_flow(request)

Attach a Bearer token; on 401 invalidate and retry once.

Source code in src/lauren_mcp/_client/_oauth.py
async def async_auth_flow(self, request: Any) -> AsyncGenerator[Any, Any]:
    """Attach a Bearer token; on ``401`` invalidate and retry once."""
    token = await self.get_token()
    request.headers["Authorization"] = f"Bearer {token}"
    response = yield request
    # On 401, flush the cache and retry once with a fresh token.
    if response is not None and response.status_code == 401:
        await self._storage.set_token("", expires_in=0)
        token = await self._fetch_token()
        request.headers["Authorization"] = f"Bearer {token}"
        yield request

InMemoryTokenStorage

InMemoryTokenStorage()

Simple in-process token cache with TTL tracking.

Source code in src/lauren_mcp/_client/_oauth.py
def __init__(self) -> None:
    self._token: str | None = None
    self._expires_at: float | None = None  # absolute monotonic seconds

McpStreamableHttpClient

McpStreamableHttpClient(
    url,
    *,
    headers=None,
    auth=None,
    max_retries=3,
    startup_timeout=10.0,
    client_info=None,
    **feature_kwargs,
)

Bases: _McpBaseRemoteClient

MCP client speaking the 2025-03-26 Streamable HTTP transport.

All messages POST to the single MCP endpoint. Responses arrive either as direct application/json bodies or as text/event-stream bodies (when the server streams notifications before the final response). A background GET stream is opened after the handshake to receive server-push notifications.

Requires httpx::

pip install 'lauren-mcp[sse]'
Source code in src/lauren_mcp/_client/_streamable.py
def __init__(
    self,
    url: str,
    *,
    headers: dict[str, str] | None = None,
    auth: Any = None,
    max_retries: int = 3,
    startup_timeout: float = 10.0,
    client_info: Implementation | None = None,
    **feature_kwargs: Any,
) -> None:
    if not _HTTPX_AVAILABLE:
        raise ImportError(
            "Install lauren-mcp[sse] to use Streamable HTTP MCP transport: "
            "pip install 'lauren-mcp[sse]'"
        )
    super().__init__(
        client_info=client_info,
        headers=headers,
        max_retries=max_retries,
        startup_timeout=startup_timeout,
        **feature_kwargs,
    )
    self._url = url.rstrip("/")
    self._auth = auth
    self._session_id: str | None = None
    self._http_client: httpx.AsyncClient | None = None
    self._push_task: asyncio.Task[None] | None = None

connect async

connect()

Open the transport and complete the MCP handshake.

Source code in src/lauren_mcp/_client/_streamable.py
async def connect(self) -> None:
    """Open the transport and complete the MCP handshake."""
    await self._start_connection()
    await self._handshake()
    # Open the optional server-push channel once a session exists.
    if self._session_id is not None:
        self._push_task = asyncio.create_task(self._push_loop())

close async

close()

Terminate the session and close the HTTP client.

Source code in src/lauren_mcp/_client/_streamable.py
async def close(self) -> None:
    """Terminate the session and close the HTTP client."""
    self._fail_all_pending("Client closed")
    await self._close_connection()