Skip to content

Server API Reference

server

MCP server decorator API.

McpServerModule

Namespace for the :meth:for_root class factory.

for_root staticmethod

for_root(
    server_cls,
    *,
    transport="ws",
    server_info=None,
    capabilities=None,
    providers=None,
    imports=None,
    exports=None,
    log_level="debug",
    mounts=None,
    proxies=None,
    instrument_otel=None,
)

Build a Lauren @module that wires server_cls into the MCP stack.

Parameters

server_cls: A class decorated with @mcp_server. Must have a __mcp_server_meta__ attribute attached by the decorator. transport: "ws" — WebSocket only (default). "sse" — legacy HTTP+SSE only (MCP 2024-11-05). "streamable" — Streamable HTTP only (MCP 2025-03-26). "both" — WebSocket + legacy HTTP+SSE. "all" — WebSocket + Streamable HTTP. (Legacy SSE and Streamable HTTP share the POST / route, so they cannot be mounted together on one path.) server_info: Optional :class:~lauren_mcp._types.Implementation describing this server; defaults to Implementation(name=server_cls.__name__, version="1.0.0"). capabilities: Optional :class:~lauren_mcp._types.ServerCapabilities override. When None the capabilities are inferred from which @mcp_tool / @mcp_resource / @mcp_prompt methods the class exposes (with listChanged: True and logging enabled). providers: Extra Lauren providers to add to the generated module. Use this to make services visible to server_cls via constructor injection. imports: Extra Lauren @module classes to import into the generated module. exports: Extra types to export from the generated module. log_level: Minimum severity for client-bound log notifications emitted via ctx.log() ("debug" | "info" | "warning" | "error"). Clients may raise it at runtime with logging/setLevel. mounts: [(OtherServerCls, "prefix_"), ...] — expose another @mcp_server class's tools / resources / prompts through this server, names prefixed to avoid collisions. Colliding names raise :class:~lauren_mcp.McpToolNameCollision at startup. proxies: [(client, "prefix_"), ...] — connect each :class:~lauren_mcp.McpClientProtocol at startup and re-export the remote server's tools under the prefix. Calls are forwarded over the client; connections close at shutdown. instrument_otel: True — always instrument with OpenTelemetry (raises ImportError if opentelemetry-api is not installed). False — never instrument. None — auto-detect: instrument if opentelemetry-api is installed (default).

Returns

type A @module class ready to be imported by the root application module.

Raises

TypeError If server_cls was not decorated with @mcp_server. ValueError If transport is not one of the accepted spellings.

Source code in src/lauren_mcp/server/_module.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
@staticmethod
def for_root(
    server_cls: type,
    *,
    transport: str = "ws",
    server_info: Implementation | None = None,
    capabilities: ServerCapabilities | None = None,
    providers: list[Any] | None = None,
    imports: list[Any] | None = None,
    exports: list[Any] | None = None,
    log_level: str = "debug",
    mounts: list[tuple[type, str]] | None = None,
    proxies: list[tuple[Any, str]] | None = None,
    instrument_otel: bool | None = None,
) -> type:
    """Build a Lauren ``@module`` that wires *server_cls* into the MCP stack.

    Parameters
    ----------
    server_cls:
        A class decorated with ``@mcp_server``.  Must have a
        ``__mcp_server_meta__`` attribute attached by the decorator.
    transport:
        ``"ws"`` — WebSocket only (default).
        ``"sse"`` — legacy HTTP+SSE only (MCP 2024-11-05).
        ``"streamable"`` — Streamable HTTP only (MCP 2025-03-26).
        ``"both"`` — WebSocket + legacy HTTP+SSE.
        ``"all"`` — WebSocket + Streamable HTTP.  (Legacy SSE and
        Streamable HTTP share the ``POST /`` route, so they cannot be
        mounted together on one path.)
    server_info:
        Optional :class:`~lauren_mcp._types.Implementation` describing this
        server; defaults to ``Implementation(name=server_cls.__name__,
        version="1.0.0")``.
    capabilities:
        Optional :class:`~lauren_mcp._types.ServerCapabilities` override.
        When ``None`` the capabilities are inferred from which
        ``@mcp_tool`` / ``@mcp_resource`` / ``@mcp_prompt`` methods the
        class exposes (with ``listChanged: True`` and logging enabled).
    providers:
        Extra Lauren providers to add to the generated module.  Use this
        to make services visible to *server_cls* via constructor injection.
    imports:
        Extra Lauren ``@module`` classes to import into the generated module.
    exports:
        Extra types to export from the generated module.
    log_level:
        Minimum severity for client-bound log notifications emitted via
        ``ctx.log()`` (``"debug"`` | ``"info"`` | ``"warning"`` |
        ``"error"``).  Clients may raise it at runtime with
        ``logging/setLevel``.
    mounts:
        ``[(OtherServerCls, "prefix_"), ...]`` — expose another
        ``@mcp_server`` class's tools / resources / prompts through this
        server, names prefixed to avoid collisions.  Colliding names
        raise :class:`~lauren_mcp.McpToolNameCollision` at startup.
    proxies:
        ``[(client, "prefix_"), ...]`` — connect each
        :class:`~lauren_mcp.McpClientProtocol` at startup and re-export
        the remote server's tools under the prefix.  Calls are forwarded
        over the client; connections close at shutdown.
    instrument_otel:
        ``True``  — always instrument with OpenTelemetry (raises
        ``ImportError`` if ``opentelemetry-api`` is not installed).
        ``False`` — never instrument.
        ``None``  — auto-detect: instrument if ``opentelemetry-api`` is
        installed (default).

    Returns
    -------
    type
        A ``@module`` class ready to be imported by the root application
        module.

    Raises
    ------
    TypeError
        If *server_cls* was not decorated with ``@mcp_server``.
    ValueError
        If *transport* is not one of the accepted spellings.
    """
    # ------------------------------------------------------------------
    # 1. Validate server_cls and transport
    # ------------------------------------------------------------------
    server_meta: McpServerMeta | None = getattr(server_cls, MCP_SERVER_META, None)
    if server_meta is None:
        raise TypeError(
            f"{server_cls!r} is not an MCP server class. "
            "Decorate it with @mcp_server before passing to McpServerModule.for_root()."
        )
    effective_transport = transport or server_meta.transport
    if effective_transport not in _TRANSPORTS:
        raise ValueError(
            f"Unknown transport {effective_transport!r}; expected one of {_TRANSPORTS}"
        )

    # ------------------------------------------------------------------
    # 2. Collect tool / resource / prompt / lifespan / completion metadata
    # ------------------------------------------------------------------
    tools: list[McpToolMeta] = []
    resources: list[McpResourceMeta] = []
    prompts: list[McpPromptMeta] = []
    completions: list[McpCompletionMeta] = []
    lifespan_meta: McpLifespanMeta | None = None

    # Import decorator-reader helper once for the loop below.
    from ._decorators import _read_method_decorators as _rmd  # noqa: PLC0415

    for attr_name in dir(server_cls):
        try:
            attr = getattr(server_cls, attr_name)
        except AttributeError:
            continue

        tool_meta: McpToolMeta | None = getattr(attr, MCP_TOOL_META, None)
        if tool_meta is not None:
            # Re-read all 4 decorator attrs at for_root() time from the
            # fully-decorated method (all outer decorators already applied).
            _deco = _rmd(attr)
            if _deco["guards"] and not tool_meta.guards:
                tool_meta.guards = _deco["guards"]
            if _deco["interceptors"] and not tool_meta.interceptors:
                tool_meta.interceptors = _deco["interceptors"]
            if _deco["exception_handlers"] and not tool_meta.exception_handlers:
                tool_meta.exception_handlers = _deco["exception_handlers"]
            if _deco["tool_metadata"] and not tool_meta.tool_metadata:
                tool_meta.tool_metadata = _deco["tool_metadata"]
            tools.append(tool_meta)

        resource_meta: McpResourceMeta | None = getattr(attr, MCP_RESOURCE_META, None)
        if resource_meta is not None:
            _deco = _rmd(attr)
            if _deco["guards"] and not resource_meta.guards:
                resource_meta.guards = _deco["guards"]
            if _deco["interceptors"] and not resource_meta.interceptors:
                resource_meta.interceptors = _deco["interceptors"]
            if _deco["exception_handlers"] and not resource_meta.exception_handlers:
                resource_meta.exception_handlers = _deco["exception_handlers"]
            if _deco["tool_metadata"] and not resource_meta.tool_metadata:
                resource_meta.tool_metadata = _deco["tool_metadata"]
            resources.append(resource_meta)

        prompt_meta: McpPromptMeta | None = getattr(attr, MCP_PROMPT_META, None)
        if prompt_meta is not None:
            _deco = _rmd(attr)
            if _deco["guards"] and not prompt_meta.guards:
                prompt_meta.guards = _deco["guards"]
            if _deco["interceptors"] and not prompt_meta.interceptors:
                prompt_meta.interceptors = _deco["interceptors"]
            if _deco["exception_handlers"] and not prompt_meta.exception_handlers:
                prompt_meta.exception_handlers = _deco["exception_handlers"]
            if _deco["tool_metadata"] and not prompt_meta.tool_metadata:
                prompt_meta.tool_metadata = _deco["tool_metadata"]
            prompts.append(prompt_meta)

        completion_meta_val: McpCompletionMeta | None = getattr(attr, MCP_COMPLETION_META, None)
        if completion_meta_val is not None:
            completions.append(completion_meta_val)

        ls_meta: McpLifespanMeta | None = getattr(attr, MCP_LIFESPAN_META, None)
        if ls_meta is not None:
            if lifespan_meta is not None:
                raise TypeError(
                    f"{server_cls.__name__} declares more than one @mcp_lifespan "
                    "method; merge them into a single generator."
                )
            lifespan_meta = ls_meta

    # ------------------------------------------------------------------
    # 3. Build ServerCapabilities (auto or caller-supplied)
    # ------------------------------------------------------------------
    if capabilities is None:
        resolved_caps = ServerCapabilities(
            tools={"listChanged": True} if tools else None,
            resources={"listChanged": True, "subscribe": True} if resources else None,
            prompts={"listChanged": True} if prompts else None,
            logging={},
        )
    else:
        resolved_caps = capabilities

    # Whether to include completions capability in the initialize response.
    # Stored separately because ServerCapabilities doesn't (yet) have a
    # completions field — avoids touching _types.py.
    _has_completions = bool(completions)

    # ------------------------------------------------------------------
    # 4. Resolve server_info
    # ------------------------------------------------------------------
    resolved_server_info: Implementation = server_info or Implementation(
        name=server_cls.__name__,
        version="1.0.0",
    )

    # ------------------------------------------------------------------
    # 5. Build transport controller(s)
    #
    # All Lauren ``@use_*`` metadata declared on *server_cls* — guards,
    # interceptors, middlewares, encoder, exception_handlers, and user
    # metadata (@set_metadata) — is propagated onto the generated
    # transport controllers via ``propagate_metadata(server_cls)``.
    # ------------------------------------------------------------------
    path: str = server_meta.path

    controllers: list[type] = []
    if effective_transport in ("ws", "both", "all"):
        controllers.append(mcp_ws_controller(path, source=server_cls))
    if effective_transport in ("sse", "both"):
        controllers.append(mcp_http_sse_controller(path, source=server_cls))
    if effective_transport in ("streamable", "all"):
        controllers.append(mcp_streamable_http_controller(path, source=server_cls))

    # ------------------------------------------------------------------
    # 6. Capture all resolved values in closure-friendly locals
    # ------------------------------------------------------------------
    _tools = tools
    _resources = resources
    _prompts = prompts
    _completions = completions
    _lifespan_meta = lifespan_meta
    _resolved_caps = resolved_caps
    _resolved_server_info = resolved_server_info
    _log_level = log_level
    _instrument_otel = instrument_otel
    _server_metadata: dict[str, Any] = dict(
        getattr(server_cls, "__lauren_metadata__", None) or {}
    )

    # ------------------------------------------------------------------
    # 7. Build the handler-registrar injectable.
    # ------------------------------------------------------------------
    @injectable(scope=Scope.SINGLETON)
    class _McpHandlerRegistrar:
        """Singleton that wires handler coroutines onto the dispatcher."""

        def __init__(
            self,
            dispatcher: McpDispatcher,
            registry: McpConnectionRegistry,
            catalog: McpCatalogManager,
            subscriptions: ResourceSubscriptionManager,
            server_instance: server_cls,  # type: ignore[valid-type]
        ) -> None:
            self._dispatcher = dispatcher
            self._registry = registry
            self._catalog = catalog
            self._subscriptions = subscriptions
            self._server_instance = server_instance
            self._lifespan_gen: Any = None
            self._lifespan_ctx: dict[str, Any] = {}
            self._log_state = LogLevelState(_log_level)
            self._container: Any = None  # populated at @post_construct time via gc

        @post_construct
        async def _register_handlers(self) -> None:
            """Run the lifespan hook and wire all MCP handlers."""
            dispatcher = self._dispatcher
            srv = self._server_instance
            catalog = self._catalog
            sub_mgr = self._subscriptions

            # Discover the DI container that created this singleton via gc.
            # This runs once at startup and is O(n) in live objects — acceptable
            # for initialisation cost.  Allows per-tool guards/interceptors to
            # resolve their DI dependencies without any user-side wiring.
            try:
                import gc  # noqa: PLC0415

                from lauren import DIContainer  # noqa: PLC0415

                for _obj in gc.get_objects():
                    if isinstance(_obj, DIContainer) and any(
                        v is self for v in _obj._singletons.values()
                    ):
                        self._container = _obj
                        break
            except Exception:  # noqa: BLE001
                pass  # Container discovery failed — guards will be skipped

            # --- lifespan ---
            if _lifespan_meta is not None:
                gen = getattr(srv, _lifespan_meta.method_name)()
                ctx = await gen.__anext__()
                if ctx is None:
                    ctx = {}
                if not isinstance(ctx, dict):
                    raise TypeError(
                        "@mcp_lifespan generator must yield a dict (or None), "
                        f"got {type(ctx).__name__}"
                    )
                self._lifespan_gen = gen
                self._lifespan_ctx = ctx

            # --- catalog seeding (silent: broadcast fn not yet attached) ---
            for t in _tools:
                catalog.register_tool(t)
            for r in _resources:
                catalog.register_resource(r)
            for p in _prompts:
                catalog.register_prompt(p)
            catalog.set_broadcast_fn(self._registry.broadcast_method)

            # --- initialize ---
            _si = _resolved_server_info
            _sc = _resolved_caps

            async def _initialize_handler(params: dict[str, Any] | None) -> dict[str, Any]:
                params = params or {}
                client_caps_raw = params.get("capabilities") or {}
                client_info_raw = params.get("clientInfo") or {}
                client_caps = ClientCapabilities(
                    roots=client_caps_raw.get("roots"),
                    sampling=client_caps_raw.get("sampling"),
                    elicitation=client_caps_raw.get("elicitation"),
                    experimental=client_caps_raw.get("experimental"),
                )
                client_info = Implementation(
                    name=client_info_raw.get("name", "unknown"),
                    version=client_info_raw.get("version", "0.0.0"),
                )
                init_params = InitializeParams(
                    protocolVersion=params.get("protocolVersion", "2024-11-05"),
                    capabilities=client_caps,
                    clientInfo=client_info,
                )
                result = build_initialize_result(init_params, _si, _sc)
                caps_dict: dict[str, Any] = {}
                if result.capabilities.tools is not None:
                    caps_dict["tools"] = result.capabilities.tools
                if result.capabilities.resources is not None:
                    caps_dict["resources"] = result.capabilities.resources
                if result.capabilities.prompts is not None:
                    caps_dict["prompts"] = result.capabilities.prompts
                if result.capabilities.logging is not None:
                    caps_dict["logging"] = result.capabilities.logging
                if result.capabilities.experimental is not None:
                    caps_dict["experimental"] = result.capabilities.experimental
                # completions is not a field on ServerCapabilities yet; handle separately
                if _has_completions:
                    caps_dict["completions"] = {}
                return {
                    "protocolVersion": result.protocolVersion,
                    "capabilities": caps_dict,
                    "serverInfo": {
                        "name": result.serverInfo.name,
                        "version": result.serverInfo.version,
                    },
                    **(
                        {"instructions": result.instructions}
                        if result.instructions is not None
                        else {}
                    ),
                }

            dispatcher.register("initialize", _initialize_handler)

            # --- logging/setLevel ---
            log_state = self._log_state

            async def _set_level(params: dict[str, Any] | None) -> dict[str, Any]:
                level = (params or {}).get("level")
                if level not in VALID_LOG_LEVELS:
                    raise ValueError(f"Invalid log level: {level!r}")
                log_state.level = level
                return {}

            dispatcher.register("logging/setLevel", _set_level)

            # --- shared context factory ---
            context_factory = make_context_factory(
                _server_metadata,
                lifespan_getter=lambda: self._lifespan_ctx,
                log_level_state=log_state,
            )

            from lauren_mcp._types import JsonRpcRequest as _Req  # noqa: PLC0415

            # --- tools (catalog-backed; registered even when empty so
            #     dynamically added tools work) ---
            _tl_inner = make_tools_list_handler(catalog.list_tools)
            _tc_inner = make_tools_call_handler(
                srv,
                catalog.list_tools,
                context_factory=context_factory,
                dispatcher=dispatcher,
                container=self._container,
                owning_module=_McpModule,
                server_metadata=_server_metadata,
            )

            async def _tools_list(params: dict[str, Any] | None) -> dict[str, Any]:
                return await _tl_inner(_Req(method="tools/list", params=params))

            async def _tools_call(params: dict[str, Any] | None) -> dict[str, Any]:
                return await _tc_inner(_Req(method="tools/call", params=params))

            dispatcher.register("tools/list", _tools_list)
            dispatcher.register("tools/call", _tools_call)

            # --- resources ---
            _rl_inner = make_resources_list_handler(catalog.list_resources)
            _rr_inner = make_resources_read_handler(
                srv,
                catalog.list_resources,
                container=self._container,
                owning_module=_McpModule,
                server_metadata=_server_metadata,
            )

            async def _resources_list(params: dict[str, Any] | None) -> dict[str, Any]:
                return await _rl_inner(_Req(method="resources/list", params=params))

            async def _resources_read(params: dict[str, Any] | None) -> dict[str, Any]:
                return await _rr_inner(_Req(method="resources/read", params=params))

            dispatcher.register("resources/list", _resources_list)
            dispatcher.register("resources/read", _resources_read)

            # --- resources/subscribe, resources/unsubscribe ---
            async def _resources_subscribe(
                params: dict[str, Any] | None,
            ) -> dict[str, Any]:
                from lauren_mcp._server._binding import CURRENT_BINDING  # noqa: PLC0415

                p = params or {}
                uri = p.get("uri")
                if not uri:
                    raise ValueError("resources/subscribe requires 'uri'")
                binding = CURRENT_BINDING.get()
                session_key = (binding.session_id or "unknown") if binding else "unknown"
                send_notification = binding.send_notification if binding else None
                if send_notification is None:
                    raise ValueError("resources/subscribe requires a push-capable transport")
                # Wrap the dict-based send_notification into a raw-string SendFn
                import json as _json  # noqa: PLC0415

                _sn = send_notification

                async def _raw_send(raw: str) -> None:
                    await _sn(_json.loads(raw))

                sub_mgr.subscribe(uri, session_key, _raw_send)
                return {}

            async def _resources_unsubscribe(
                params: dict[str, Any] | None,
            ) -> dict[str, Any]:
                from lauren_mcp._server._binding import CURRENT_BINDING  # noqa: PLC0415

                p = params or {}
                uri = p.get("uri")
                if not uri:
                    raise ValueError("resources/unsubscribe requires 'uri'")
                binding = CURRENT_BINDING.get()
                session_key = (binding.session_id or "unknown") if binding else "unknown"
                sub_mgr.unsubscribe(uri, session_key)
                return {}

            dispatcher.register("resources/subscribe", _resources_subscribe)
            dispatcher.register("resources/unsubscribe", _resources_unsubscribe)

            # --- prompts ---
            _pl_inner = make_prompts_list_handler(catalog.list_prompts)
            _pg_inner = make_prompts_get_handler(srv, catalog.list_prompts)

            async def _prompts_list(params: dict[str, Any] | None) -> dict[str, Any]:
                return await _pl_inner(_Req(method="prompts/list", params=params))

            async def _prompts_get(params: dict[str, Any] | None) -> dict[str, Any]:
                return await _pg_inner(_Req(method="prompts/get", params=params))

            dispatcher.register("prompts/list", _prompts_list)
            dispatcher.register("prompts/get", _prompts_get)

            # --- completion/complete ---
            if _completions:
                _cc_inner = make_completion_handler(srv, _completions)

                async def _completion_complete(
                    params: dict[str, Any] | None,
                ) -> dict[str, Any]:
                    return await _cc_inner(_Req(method="completion/complete", params=params))

                dispatcher.register("completion/complete", _completion_complete)

            # --- OpenTelemetry instrumentation ---
            from lauren_mcp._server._otel import (  # noqa: PLC0415
                instrument_dispatcher,
                is_otel_available,
            )

            effective_otel = _instrument_otel
            if effective_otel is None:
                effective_otel = is_otel_available()

            if effective_otel:
                if not is_otel_available():
                    raise ImportError(
                        "instrument_otel=True requires opentelemetry-api; "
                        "install it with: pip install 'lauren-mcp[otel]'"
                    )
                instrument_dispatcher(dispatcher)

        @pre_destruct
        async def _shutdown(self) -> None:
            """Close the lifespan generator at server shutdown."""
            gen = self._lifespan_gen
            self._lifespan_gen = None
            if gen is not None:
                await gen.aclose()

    # With ``from __future__ import annotations`` all annotations are
    # stored as strings.  Lauren's DI evaluates them via
    # ``typing.get_type_hints()``, which looks up names in the module
    # globals — ``server_cls`` is a local, so it won't be found.
    # Override the annotation with the actual class object so the DI
    # compiler can locate the provider.
    _McpHandlerRegistrar.__init__.__annotations__["server_instance"] = server_cls

    _McpHandlerRegistrar.__name__ = f"McpHandlerRegistrar[{server_cls.__name__}]"
    _McpHandlerRegistrar.__qualname__ = _McpHandlerRegistrar.__name__

    # ------------------------------------------------------------------
    # 8. Build the @module class — a thin container; all lifecycle
    #    logic lives in _McpHandlerRegistrar above.
    # ------------------------------------------------------------------
    from lauren.reflect import (  # noqa: PLC0415
        reflect_guards,
        reflect_interceptors,
        reflect_middlewares,
    )

    _composition_providers: list[type] = []
    if mounts:
        from ._composition import make_mount_binder  # noqa: PLC0415

        for mounted_cls, prefix in mounts:
            _composition_providers.append(mounted_cls)
            _composition_providers.append(make_mount_binder(mounted_cls, prefix))
    if proxies:
        from ._composition import make_proxy_binder  # noqa: PLC0415

        for proxy_client, prefix in proxies:
            _composition_providers.append(make_proxy_binder(proxy_client, prefix))

    _existing_extra = set(providers or [])
    _auto_guard_providers: list[type] = [
        cls
        for cls in (
            *reflect_guards(server_cls),
            *reflect_interceptors(server_cls),
            *reflect_middlewares(server_cls),
        )
        if cls not in _existing_extra
    ]
    _auto_guard_set = set(_auto_guard_providers)

    # Collect per-method guard, interceptor, and exception_handler classes
    # not already in providers — auto-register them so DI can resolve them.
    _method_level_providers: list[type] = []
    for _meta_item in (*_tools, *_resources, *_prompts):
        for _cls in (
            *getattr(_meta_item, "guards", ()),
            *getattr(_meta_item, "interceptors", ()),
            *getattr(_meta_item, "exception_handlers", ()),
        ):
            if (
                isinstance(_cls, type)
                and _cls not in _existing_extra
                and _cls not in _auto_guard_set
                and _cls not in _method_level_providers
            ):
                _method_level_providers.append(_cls)

    _all_providers = [
        server_cls,
        McpDispatcher,
        SseSessionStore,
        StreamableSessionStore,
        McpConnectionRegistry,
        McpCatalogManager,
        ResourceSubscriptionManager,
        _McpHandlerRegistrar,
        *_composition_providers,
        *_auto_guard_providers,
        *_method_level_providers,
        *(providers or []),
    ]
    _all_imports = list(imports or [])
    _all_exports = list(exports or [])

    @module(
        providers=_all_providers,
        imports=_all_imports,
        exports=_all_exports,
        controllers=controllers,
    )
    class _McpModule:
        """Auto-generated Lauren module for MCP server integration."""

    _McpModule.__name__ = f"McpModule[{server_cls.__name__}]"
    _McpModule.__qualname__ = (
        f"McpServerModule.for_root.<locals>._McpModule[{server_cls.__name__}]"
    )
    # Expose the registrar for tests that need to wire handlers without DI.
    _McpModule._handler_registrar_cls = _McpHandlerRegistrar  # type: ignore[attr-defined]

    return _McpModule

mcp_server

mcp_server(path, *, transport='ws')

Class decorator that marks a class as an MCP server.

Applies @injectable(scope=Scope.SINGLETON) from Lauren so the class participates in DI, and attaches :class:McpServerMeta as an attribute.

Parameters:

Name Type Description Default
path str

The mount path for the MCP server endpoint (e.g. "/mcp").

required
transport str

One of "ws", "sse", "streamable", "both", or "all".

'ws'
Source code in src/lauren_mcp/server/_decorators.py
def mcp_server(path: str, *, transport: str = "ws") -> Callable[[type], type]:
    """Class decorator that marks a class as an MCP server.

    Applies ``@injectable(scope=Scope.SINGLETON)`` from Lauren so the class
    participates in DI, and attaches :class:`McpServerMeta` as an attribute.

    Args:
        path: The mount path for the MCP server endpoint (e.g. ``"/mcp"``).
        transport: One of ``"ws"``, ``"sse"``, ``"streamable"``, ``"both"``,
            or ``"all"``.
    """

    def decorator(cls: type) -> type:
        from lauren import Scope, injectable

        injectable(scope=Scope.SINGLETON)(cls)
        setattr(cls, MCP_SERVER_META, McpServerMeta(path=path, transport=transport))
        return cls

    return decorator

mcp_tool

mcp_tool(
    *,
    name=None,
    description=None,
    title=None,
    annotations=None,
    timeout=None,
    tags=None,
    meta=None,
    output_schema=None,
    structured_output=None,
    strict=True,
)

Method decorator that exposes a coroutine as an MCP tool.

Parameters:

Name Type Description Default
name str | None

Override the tool name (defaults to the method name).

None
description str | None

Override the tool description (defaults to docstring).

None
title str | None

Human-readable display name shown in client UIs (distinct from the machine-readable name).

None
annotations ToolAnnotations | None

Behavioural hints (:class:ToolAnnotations) transmitted to clients.

None
timeout float | None

Per-call execution deadline in seconds; exceeding it fails the call with an internal error.

None
tags frozenset[str] | set[str] | None

Categorical tags included in the tool's tools/list entry.

None
meta dict[str, Any] | None

Opaque metadata forwarded to clients under _meta.

None
output_schema Any

JSON Schema dict or Pydantic model class describing the tool's structured output; advertised as outputSchema.

None
structured_output bool | None

Control auto-detection of output schema. None (default) auto-detects structured types (Pydantic, dataclass, TypedDict, msgspec.Struct); True forces schema generation even for primitives; False disables auto-detection entirely.

None
strict bool

When True (default), validates the tool name against the MCP specification (SEP-986). Set False to allow legacy names.

True
Source code in src/lauren_mcp/server/_decorators.py
def mcp_tool(
    *,
    name: str | None = None,
    description: str | None = None,
    title: str | None = None,
    annotations: ToolAnnotations | None = None,
    timeout: float | None = None,
    tags: frozenset[str] | set[str] | None = None,
    meta: dict[str, Any] | None = None,
    output_schema: Any = None,
    structured_output: bool | None = None,
    strict: bool = True,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
    """Method decorator that exposes a coroutine as an MCP tool.

    Args:
        name: Override the tool name (defaults to the method name).
        description: Override the tool description (defaults to docstring).
        title: Human-readable display name shown in client UIs (distinct from
            the machine-readable ``name``).
        annotations: Behavioural hints (:class:`ToolAnnotations`) transmitted
            to clients.
        timeout: Per-call execution deadline in seconds; exceeding it fails
            the call with an internal error.
        tags: Categorical tags included in the tool's ``tools/list`` entry.
        meta: Opaque metadata forwarded to clients under ``_meta``.
        output_schema: JSON Schema dict or Pydantic model class describing the
            tool's structured output; advertised as ``outputSchema``.
        structured_output: Control auto-detection of output schema.  ``None``
            (default) auto-detects structured types (Pydantic, dataclass,
            TypedDict, msgspec.Struct); ``True`` forces schema generation even
            for primitives; ``False`` disables auto-detection entirely.
        strict: When ``True`` (default), validates the tool name against the
            MCP specification (SEP-986).  Set ``False`` to allow legacy names.
    """

    def decorator(fn: Callable[..., Any]) -> Callable[..., Any]:
        (
            auto_name,
            auto_desc,
            schema,
            context_param,
            param_descs,
            pipe_chains,
            bg_tasks_param,
            depends_params,
            header_params,
            state_params,
        ) = _build_schema(fn)
        resolved_name = name if name is not None else auto_name
        resolved_desc = description if description is not None else auto_desc

        _validate_tool_name(resolved_name, strict=strict)

        # Resolve explicit output_schema first; fall back to auto-detection
        if output_schema is not None:
            resolved_output_schema = _resolve_output_schema(output_schema)
        else:
            try:
                hints = typing.get_type_hints(fn)
            except Exception:
                hints = {}
            return_annotation = hints.get("return", inspect.Parameter.empty)
            resolved_output_schema = _auto_output_schema(return_annotation, structured_output)

        # Backward-compat: also populate param_specs with raw FieldDescriptor /
        # _ParamSpec objects for code that still accesses meta.param_specs.
        _param_specs: dict[str, Any] = {}
        try:
            _hints_extra = typing.get_type_hints(fn, include_extras=True)
        except Exception:
            _hints_extra = {}
        _sig = inspect.signature(fn)
        for _pname, _param in _sig.parameters.items():
            if _pname == "self":
                continue
            _ann = _hints_extra.get(_pname, _param.annotation)
            _spec = _extract_lauren_annotation(_ann)
            if _spec is not None:
                _param_specs[_pname] = _spec

        # Read @use_guards / @use_interceptors / @use_exception_handlers / @set_metadata
        _method_deco = _read_method_decorators(fn)

        tool_meta = McpToolMeta(
            name=resolved_name,
            description=resolved_desc,
            input_schema=schema,
            method_name=fn.__name__,
            context_param_name=context_param,
            reads_context=context_param is not None,
            annotations=annotations,
            output_schema=resolved_output_schema,
            timeout=timeout,
            tags=frozenset(tags) if tags else frozenset(),
            meta=dict(meta) if meta else {},
            param_descriptions=param_descs,
            structured_output=structured_output,
            title=title,
            pipe_chains=pipe_chains,
            bg_tasks_param=bg_tasks_param,
            depends_params=depends_params,
            header_params=header_params,
            state_params=state_params,
            param_specs=_param_specs,
            guards=_method_deco["guards"],
            interceptors=_method_deco["interceptors"],
            exception_handlers=_method_deco["exception_handlers"],
            tool_metadata=_method_deco["tool_metadata"],
        )
        setattr(fn, MCP_TOOL_META, tool_meta)
        return fn

    return decorator

mcp_resource

mcp_resource(
    uri_template,
    *,
    name=None,
    description=None,
    title=None,
    mime_type=None,
    annotations=None,
)

Method decorator that exposes a coroutine as an MCP resource.

Parameters:

Name Type Description Default
uri_template str

A URI template with {param} placeholders. Also supports {+param} / {param*} multi-segment placeholders and a {?p1,p2} optional query-parameter suffix.

required
name str | None

Resource name (defaults to the method name).

None
description str | None

Human-readable description (defaults to docstring).

None
title str | None

Human-readable display name shown in client UIs.

None
mime_type str | None

Optional MIME type hint (e.g. "text/plain").

None
annotations ResourceAnnotations | None

Audience and priority hints (:class:ResourceAnnotations) transmitted to clients.

None
Source code in src/lauren_mcp/server/_decorators.py
def mcp_resource(
    uri_template: str,
    *,
    name: str | None = None,
    description: str | None = None,
    title: str | None = None,
    mime_type: str | None = None,
    annotations: ResourceAnnotations | None = None,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
    """Method decorator that exposes a coroutine as an MCP resource.

    Args:
        uri_template: A URI template with ``{param}`` placeholders.  Also
            supports ``{+param}`` / ``{param*}`` multi-segment placeholders
            and a ``{?p1,p2}`` optional query-parameter suffix.
        name: Resource name (defaults to the method name).
        description: Human-readable description (defaults to docstring).
        title: Human-readable display name shown in client UIs.
        mime_type: Optional MIME type hint (e.g. ``"text/plain"``).
        annotations: Audience and priority hints (:class:`ResourceAnnotations`)
            transmitted to clients.
    """

    def decorator(fn: Callable[..., Any]) -> Callable[..., Any]:
        from ._uri import compile_uri_template

        resolved_name = name if name is not None else fn.__name__
        top_desc, _ = parse_docstring(fn)
        resolved_desc = description if description is not None else top_desc

        compiled = compile_uri_template(uri_template)
        try:
            hints = typing.get_type_hints(fn, include_extras=True)
        except Exception:
            hints = {}
        hints.pop("return", None)

        # Extract pipe chains, BackgroundTasks, Depends, Header, State from hints
        pipe_chains: dict[str, list[Any]] = {}
        bg_tasks_param: str | None = None
        depends_params: dict[str, Any] = {}
        header_params: dict[str, HeaderParamSpec] = {}
        state_params: dict[str, type] = {}
        clean_hints: dict[str, Any] = {}

        sig = inspect.signature(fn)
        for param_name, annotation in hints.items():
            if param_name == "self":
                continue
            if _is_context_annotation(annotation):
                continue
            if _is_background_tasks_annotation(annotation):
                if bg_tasks_param is None:
                    bg_tasks_param = param_name
                # Don't include in clean_hints — handler injects this
                continue
            if _is_depends_annotation(annotation):
                provider = _extract_depends_callable(annotation)
                if provider is not None:
                    depends_params[param_name] = provider
                continue
            if _is_header_annotation(annotation):
                coerce_to = _extract_header_type(annotation)
                is_optional = _is_optional_header(annotation)
                param = sig.parameters.get(param_name)
                default = param.default if param is not None else inspect.Parameter.empty
                pipe_chain = _extract_header_pipe_chain(annotation)
                header_params[param_name] = HeaderParamSpec(
                    header_name=_param_to_header_name(param_name),
                    coerce_to=coerce_to,
                    default=default,
                    is_optional=is_optional,
                    pipe_chain=pipe_chain,
                )
                continue
            if _is_state_annotation(annotation):
                state_type = _extract_state_type(annotation)
                state_params[param_name] = state_type
                continue
            base_type, fd, pipes = _extract_lauren_hint(annotation)
            if pipes:
                pipe_chains[param_name] = list(pipes)
            # Store the base type (stripped of Lauren markers) for coerce_params
            clean_hints[param_name] = base_type

        # Read @use_guards / @use_interceptors / @use_exception_handlers / @set_metadata
        _method_deco = _read_method_decorators(fn)

        resource_meta = McpResourceMeta(
            uri_template=uri_template,
            name=resolved_name,
            description=resolved_desc,
            mime_type=mime_type,
            method_name=fn.__name__,
            query_params=list(compiled.query_params),
            param_type_hints=clean_hints,
            annotations=annotations,
            title=title,
            pipe_chains=pipe_chains,
            bg_tasks_param=bg_tasks_param,
            depends_params=depends_params,
            header_params=header_params,
            state_params=state_params,
            guards=_method_deco["guards"],
            interceptors=_method_deco["interceptors"],
            exception_handlers=_method_deco["exception_handlers"],
            tool_metadata=_method_deco["tool_metadata"],
        )
        setattr(fn, MCP_RESOURCE_META, resource_meta)
        return fn

    return decorator

mcp_prompt

mcp_prompt(name=None, *, description=None, title=None)

Method decorator that exposes a coroutine as an MCP prompt.

Parameters:

Name Type Description Default
name str | None

Prompt name (defaults to the method name).

None
description str | None

Human-readable description (defaults to docstring).

None
title str | None

Human-readable display name shown in client UIs.

None
Source code in src/lauren_mcp/server/_decorators.py
def mcp_prompt(
    name: str | None = None,
    *,
    description: str | None = None,
    title: str | None = None,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
    """Method decorator that exposes a coroutine as an MCP prompt.

    Args:
        name: Prompt name (defaults to the method name).
        description: Human-readable description (defaults to docstring).
        title: Human-readable display name shown in client UIs.
    """

    def decorator(fn: Callable[..., Any]) -> Callable[..., Any]:
        resolved_name = name if name is not None else fn.__name__
        top_desc, param_descs = parse_docstring(fn)
        resolved_desc = description if description is not None else top_desc

        sig = inspect.signature(fn)
        arguments: list[dict[str, Any]] = []
        for param_name, param in sig.parameters.items():
            if param_name == "self":
                continue
            arg_entry: dict[str, Any] = {
                "name": param_name,
                "description": param_descs.get(param_name),
                "required": param.default is inspect.Parameter.empty,
            }
            arguments.append(arg_entry)

        # Read @use_guards / @use_interceptors / @use_exception_handlers / @set_metadata
        _method_deco = _read_method_decorators(fn)

        prompt_meta = McpPromptMeta(
            name=resolved_name,
            description=resolved_desc,
            arguments=arguments,
            method_name=fn.__name__,
            title=title,
            guards=_method_deco["guards"],
            interceptors=_method_deco["interceptors"],
            exception_handlers=_method_deco["exception_handlers"],
            tool_metadata=_method_deco["tool_metadata"],
        )
        setattr(fn, MCP_PROMPT_META, prompt_meta)
        return fn

    return decorator

SchemaBuilder

SchemaBuilder()

Builds JSON Schema fragments from Python type annotations.

One builder instance is shared across all parameters of a function so Pydantic model definitions are accumulated once into defs and can be attached to the top-level schema as $defs.

Source code in src/lauren_mcp/server/_schema.py
def __init__(self) -> None:
    self.defs: dict[str, dict[str, Any]] = {}

build

build(annotation)

Return a JSON Schema fragment for annotation.

Unknown types degrade to {} (unconstrained) rather than raising.

Source code in src/lauren_mcp/server/_schema.py
def build(self, annotation: Any) -> dict[str, Any]:
    """Return a JSON Schema fragment for *annotation*.

    Unknown types degrade to ``{}`` (unconstrained) rather than raising.
    """
    if annotation is inspect.Parameter.empty:
        return {"type": "string"}

    # TypedDict must be checked before the generic isinstance(type) path:
    # its metaclass behaviour differs across Python versions.
    if typing.is_typeddict(annotation):
        return self._build_typeddict(annotation)

    # Annotated[T, Field(...), ...] — build T then layer constraints
    if typing.get_origin(annotation) is typing.Annotated:
        base, *extras = typing.get_args(annotation)
        schema = self.build(base)
        for extra in extras:
            self._apply_metadata(schema, extra)
        return schema

    origin = typing.get_origin(annotation)

    if origin in (Union, types.UnionType):
        return self._build_union(typing.get_args(annotation))

    if origin is Literal:
        return self._build_literal(typing.get_args(annotation))

    if origin in (list, typing.List):  # noqa: UP006
        args = typing.get_args(annotation)
        schema = {"type": "array"}
        if args:
            schema["items"] = self.build(args[0])
        return schema

    if origin in (set, frozenset, typing.Set, typing.FrozenSet):  # noqa: UP006
        args = typing.get_args(annotation)
        schema = {"type": "array", "uniqueItems": True}
        if args:
            schema["items"] = self.build(args[0])
        return schema

    if origin in (tuple, typing.Tuple):  # noqa: UP006
        return self._build_tuple(typing.get_args(annotation))

    if origin in (dict, typing.Dict):  # noqa: UP006
        args = typing.get_args(annotation)
        schema = {"type": "object"}
        if len(args) == 2:
            schema["additionalProperties"] = self.build(args[1])
        return schema

    if isinstance(annotation, type):
        if annotation in _PRIMITIVES:
            return dict(_PRIMITIVES[annotation])
        if issubclass(annotation, enum.Enum):
            return self._build_enum(annotation)
        if _PYDANTIC_AVAILABLE and issubclass(annotation, BaseModel):
            return self._build_pydantic(annotation)
        if _MSGSPEC_AVAILABLE and issubclass(annotation, msgspec.Struct):
            return self._build_msgspec(annotation)
        if dataclasses.is_dataclass(annotation):
            return self._build_dataclass(annotation)
        if annotation in (list, set, frozenset):
            return {"type": "array"}
        if annotation is dict:
            return {"type": "object"}
        if not _PYDANTIC_AVAILABLE and hasattr(annotation, "model_fields"):
            _logger.warning(
                "Pydantic is not installed; emitting unconstrained object schema for %r",
                annotation,
            )
            return {"type": "object"}
        if not _MSGSPEC_AVAILABLE and hasattr(annotation, "__struct_fields__"):
            _logger.warning(
                "msgspec is not installed; emitting unconstrained object schema for %r",
                annotation,
            )
            return {"type": "object"}

    return {}

FileResource

FileResource(
    path,
    uri,
    *,
    name=None,
    description=None,
    mime_type=None,
)

Expose a local file as a static MCP resource.

MIME type is auto-detected from the file extension when mime_type is omitted. Files whose MIME type starts with "text/" or is one of the common text-like types (application/json, application/xml) are served as UTF-8 strings; everything else is returned as a :class:~lauren_mcp._types.BlobResource.

Parameters:

Name Type Description Default
path str | Path

Absolute or relative path to the file on disk.

required
uri str

The URI the resource is reachable under (e.g. "file:///data/report.pdf").

required
name str | None

Resource name for resources/list (defaults to the file's basename).

None
description str | None

Human-readable description (optional).

None
mime_type str | None

Explicit MIME type; auto-detected from extension when None.

None
Source code in src/lauren_mcp/server/_builtin_resources.py
def __init__(
    self,
    path: str | Path,
    uri: str,
    *,
    name: str | None = None,
    description: str | None = None,
    mime_type: str | None = None,
) -> None:
    self._path = Path(path)
    self._uri = uri
    self._name = name or self._path.name
    self._description = description
    self._mime_type = mime_type or _detect_mime(self._path)

read async

read()

Read the file and return text or :class:BlobResource.

Source code in src/lauren_mcp/server/_builtin_resources.py
async def read(self) -> str | BlobResource:
    """Read the file and return text or :class:`BlobResource`."""
    data = self._path.read_bytes()
    if self._is_text():
        return data.decode("utf-8", errors="replace")
    return BlobResource(data=data, mime_type=self._mime_type or "application/octet-stream")

as_mcp_resource_meta

as_mcp_resource_meta()

Return an :class:McpResourceMeta whose method_name points to read.

Source code in src/lauren_mcp/server/_builtin_resources.py
def as_mcp_resource_meta(self) -> McpResourceMeta:
    """Return an :class:`McpResourceMeta` whose ``method_name`` points to
    ``read``."""
    meta = McpResourceMeta(
        uri_template=self._uri,
        name=self._name,
        description=self._description,
        mime_type=self._mime_type,
        method_name="read",
        query_params=[],
        param_type_hints={},
    )
    # Bind this instance so make_resources_read_handler resolves the target.
    meta._bound_instance = self  # type: ignore[attr-defined]
    return meta

HttpResource

HttpResource(
    url,
    uri,
    *,
    name=None,
    description=None,
    mime_type=None,
    headers=None,
    timeout=30.0,
)

Fetch an HTTP URL and expose the response body as an MCP resource.

Requires the [http] extra (httpx).

Parameters:

Name Type Description Default
url str

The upstream URL to fetch on each resources/read.

required
uri str

The URI the resource is reachable under.

required
name str | None

Resource name (defaults to url).

None
description str | None

Human-readable description (optional).

None
mime_type str | None

Explicit MIME type; taken from the response Content-Type header when None.

None
headers dict[str, str] | None

Extra HTTP headers forwarded with every request.

None
timeout float

HTTP request timeout in seconds (default 30.0).

30.0
Source code in src/lauren_mcp/server/_builtin_resources.py
def __init__(
    self,
    url: str,
    uri: str,
    *,
    name: str | None = None,
    description: str | None = None,
    mime_type: str | None = None,
    headers: dict[str, str] | None = None,
    timeout: float = 30.0,
) -> None:
    self._url = url
    self._uri = uri
    self._name = name or url
    self._description = description
    self._mime_type = mime_type
    self._headers = headers or {}
    self._timeout = timeout

read async

read()

Fetch the upstream URL and return its body.

Source code in src/lauren_mcp/server/_builtin_resources.py
async def read(self) -> str | BlobResource:
    """Fetch the upstream URL and return its body."""
    try:
        import httpx  # noqa: PLC0415
    except ImportError as exc:
        raise ImportError(
            "HttpResource requires httpx; install it with: pip install 'lauren-mcp[http]'"
        ) from exc
    async with httpx.AsyncClient(timeout=self._timeout) as client:
        response = await client.get(self._url, headers=self._headers)
        response.raise_for_status()
        effective_mime = (
            self._mime_type
            or response.headers.get("content-type", "application/octet-stream")
            .split(";")[0]
            .strip()
        )
        if effective_mime.startswith("text/") or effective_mime in (
            "application/json",
            "application/xml",
        ):
            return str(response.text)
        return BlobResource(data=response.content, mime_type=effective_mime)

DirectoryResource

DirectoryResource(
    path,
    uri,
    *,
    name=None,
    description=None,
    pattern="*",
    recursive=False,
    include_hidden=False,
)

List files in a directory as a JSON array resource.

Returns a JSON-serialised list of relative file paths matching pattern. The response MIME type is always "application/json".

Parameters:

Name Type Description Default
path str | Path

Root directory to list.

required
uri str

The URI the resource is reachable under.

required
name str | None

Resource name (defaults to the directory's basename).

None
description str | None

Human-readable description (optional).

None
pattern str

Glob pattern relative to path (default "*").

'*'
recursive bool

When True uses rglob instead of glob.

False
include_hidden bool

When False (default) entries whose name starts with . are excluded.

False
Source code in src/lauren_mcp/server/_builtin_resources.py
def __init__(
    self,
    path: str | Path,
    uri: str,
    *,
    name: str | None = None,
    description: str | None = None,
    pattern: str = "*",
    recursive: bool = False,
    include_hidden: bool = False,
) -> None:
    self._path = Path(path)
    self._uri = uri
    self._name = name or self._path.name
    self._description = description
    self._pattern = pattern
    self._recursive = recursive
    self._include_hidden = include_hidden

read async

read()

Return JSON array of relative paths matching the pattern.

Source code in src/lauren_mcp/server/_builtin_resources.py
async def read(self) -> str:
    """Return JSON array of relative paths matching the pattern."""
    import json  # noqa: PLC0415

    method = self._path.rglob if self._recursive else self._path.glob
    entries = sorted(
        str(p.relative_to(self._path))
        for p in method(self._pattern)
        if p.is_file() and (self._include_hidden or not p.name.startswith("."))
    )
    return json.dumps(entries)

McpToolContext dataclass

McpToolContext(
    tool_name,
    tool_use_id=None,
    headers=None,
    execution_context=None,
    session_id=None,
    metadata=dict(),
    state=dict(),
    extras=dict(),
    lifespan_context=dict(),
    _progress_token=None,
    _send_notification=None,
    _client_rpc=None,
    _client_capabilities=None,
    _log_level_state=None,
    _cancel_event=None,
)

Context injected into an @mcp_tool method when a parameter is annotated with McpToolContext.

The object is immutable so tool authors cannot accidentally mutate shared transport state. The state bag is mutable per-call scratch space and extras is the extension bag for integrations (lauren-ai stores its AgentContext under extras["agent_context"]).

cancel_requested property

cancel_requested

An asyncio.Event set when the client cancels this call.

The event is created lazily on first access and stored in the (normally immutable) dataclass via object.__setattr__. This is safe because the event is local to this call instance and is only written once (by the dispatcher, before task.cancel() is called).

Tools should treat this as a read-only hint: check cancel_requested.is_set() between work units and return early for graceful shutdown. The containing asyncio.Task will still be hard-cancelled shortly after the event fires.

.. note:: The event is never set on the legacy HTTP+SSE transport (which does not implement $/cancelRequest).

report_progress async

report_progress(progress, total=None, message=None)

Send notifications/progress to the client.

No-op when the client did not supply a progressToken in the tools/call request, or when the transport has no notification channel.

Parameters:

Name Type Description Default
progress float | int

Current progress value (e.g. number of items processed).

required
total float | int | None

Optional upper bound. When omitted the client treats progress as indeterminate.

None
message str | None

Optional human-readable status string displayed alongside the progress indicator (e.g. "Scanning 3 of 10 files").

None
Source code in src/lauren_mcp/_server/_context.py
async def report_progress(
    self,
    progress: float | int,
    total: float | int | None = None,
    message: str | None = None,
) -> None:
    """Send ``notifications/progress`` to the client.

    No-op when the client did not supply a ``progressToken`` in the
    ``tools/call`` request, or when the transport has no notification
    channel.

    Args:
        progress: Current progress value (e.g. number of items processed).
        total: Optional upper bound.  When omitted the client treats
            progress as indeterminate.
        message: Optional human-readable status string displayed alongside
            the progress indicator (e.g. ``"Scanning 3 of 10 files"``).
    """
    if self._progress_token is None or self._send_notification is None:
        return
    params: dict[str, Any] = {
        "progressToken": self._progress_token,
        "progress": progress,
    }
    if total is not None:
        params["total"] = total
    if message is not None:
        params["message"] = message
    await self._send_notification(
        {"jsonrpc": "2.0", "method": "notifications/progress", "params": params}
    )

log async

log(level, message, data=None)

Send a structured notifications/message log entry to the client.

Dropped silently when below the server's minimum level or when the transport has no notification channel.

Source code in src/lauren_mcp/_server/_context.py
async def log(self, level: LogLevel, message: str, data: dict[str, Any] | None = None) -> None:
    """Send a structured ``notifications/message`` log entry to the client.

    Dropped silently when below the server's minimum level or when the
    transport has no notification channel.
    """
    if self._send_notification is None:
        return
    if self._log_level_state is not None and not self._log_level_state.allows(level):
        return
    payload: dict[str, Any] = {"message": message}
    if data:
        payload["extra"] = data
    await self._send_notification(
        {
            "jsonrpc": "2.0",
            "method": "notifications/message",
            "params": {"level": level, "logger": self.tool_name, "data": payload},
        }
    )

notice async

notice(message, data=None)

Send a notice-level log notification.

Use for normal but significant conditions that operators should be aware of (e.g. a configuration override taking effect, a fallback path used).

Source code in src/lauren_mcp/_server/_context.py
async def notice(self, message: str, data: dict[str, Any] | None = None) -> None:
    """Send a ``notice``-level log notification.

    Use for normal but significant conditions that operators should be aware
    of (e.g. a configuration override taking effect, a fallback path used).
    """
    await self.log("notice", message, data)

critical async

critical(message, data=None)

Send a critical-level log notification.

Use for conditions that require immediate attention but have not yet caused complete service failure (e.g. a primary data source is down and a fallback is active).

Source code in src/lauren_mcp/_server/_context.py
async def critical(self, message: str, data: dict[str, Any] | None = None) -> None:
    """Send a ``critical``-level log notification.

    Use for conditions that require immediate attention but have not yet
    caused complete service failure (e.g. a primary data source is down and
    a fallback is active).
    """
    await self.log("critical", message, data)

sample async

sample(
    messages,
    *,
    max_tokens=1024,
    system_prompt=None,
    temperature=None,
    stop_sequences=None,
    model_preferences=None,
    include_context="none",
    result_type=None,
    tools=None,
    tool_choice=None,
    max_tool_iterations=10,
)

Ask the connected MCP client to run an LLM call on our behalf.

Returns a :class:CreateMessageResult, or — when result_type is a Pydantic model class — an instance of that model parsed from the reply text.

When tools is supplied the client's LLM may respond with a ToolUseContent block instead of a TextContent block. ctx.sample() does not execute tools automatically. The caller is responsible for handling ToolUseContent responses and building the agentic loop.

Parameters

tools: Tool descriptors to pass to the LLM. Each entry may be a :class:~lauren_mcp._types.ToolSchema, a :class:~lauren_mcp.server._meta.McpToolMeta (auto-converted), or a raw dict with name, description, and inputSchema keys. None means no tools are passed (single-turn text/image only). tool_choice: Forwarded to the client as-is. None omits the field (client default). max_tool_iterations: Advisory upper bound on agentic loop iterations passed to the client in CreateMessageParams.metadata["max_tool_iterations"]. Default: 10.

Raises

McpSamplingNotAvailable Client did not advertise sampling capability, or did not advertise tools support within sampling when tools= is supplied, or the transport does not support server-to-client requests.

Source code in src/lauren_mcp/_server/_context.py
async def sample(
    self,
    messages: str | list[Any],  # list[SamplingMessage]
    *,
    max_tokens: int = 1024,
    system_prompt: str | None = None,
    temperature: float | None = None,
    stop_sequences: list[str] | None = None,
    model_preferences: dict[str, Any] | None = None,
    include_context: Literal["none", "thisServer", "allServers"] = "none",
    result_type: type[Any] | None = None,
    # New agentic loop parameters:
    tools: list[Any] | None = None,
    tool_choice: dict[str, Any] | None = None,
    max_tool_iterations: int = 10,
) -> Any:
    """Ask the connected MCP client to run an LLM call on our behalf.

    Returns a :class:`CreateMessageResult`, or — when *result_type* is a
    Pydantic model class — an instance of that model parsed from the
    reply text.

    When *tools* is supplied the client's LLM may respond with a
    ``ToolUseContent`` block instead of a ``TextContent`` block.
    ``ctx.sample()`` does **not** execute tools automatically.  The caller
    is responsible for handling ``ToolUseContent`` responses and building
    the agentic loop.

    Parameters
    ----------
    tools:
        Tool descriptors to pass to the LLM.  Each entry may be a
        :class:`~lauren_mcp._types.ToolSchema`, a
        :class:`~lauren_mcp.server._meta.McpToolMeta` (auto-converted), or a
        raw ``dict`` with ``name``, ``description``, and ``inputSchema`` keys.
        ``None`` means no tools are passed (single-turn text/image only).
    tool_choice:
        Forwarded to the client as-is.  ``None`` omits the field (client default).
    max_tool_iterations:
        Advisory upper bound on agentic loop iterations passed to the client
        in ``CreateMessageParams.metadata["max_tool_iterations"]``.  Default: 10.

    Raises
    ------
    McpSamplingNotAvailable
        Client did not advertise ``sampling`` capability, or did not advertise
        ``tools`` support within ``sampling`` when ``tools=`` is supplied, or
        the transport does not support server-to-client requests.
    """
    if self._client_rpc is None:
        raise McpSamplingNotAvailable("This transport cannot deliver server-to-client requests")
    caps = self._client_capabilities
    if caps is None or caps.sampling is None:
        raise McpSamplingNotAvailable(
            "The connected client did not advertise the 'sampling' capability"
        )

    # Capability check for tool-enabled sampling
    if tools is not None:
        sampling_caps = caps.sampling
        if not (isinstance(sampling_caps, dict) and sampling_caps.get("tools")):
            raise McpSamplingNotAvailable(
                "The connected client does not support tool-enabled sampling "
                "('tools' not set in sampling capability). "
                "Pass tools=None or upgrade the client."
            )

    if isinstance(messages, str):
        messages = [SamplingMessage(role="user", content=TextContent(text=messages))]

    # Build metadata: include max_tool_iterations advisory when tools are used
    metadata: dict[str, Any] | None = model_preferences
    if tools is not None:
        metadata = {**(model_preferences or {}), "max_tool_iterations": max_tool_iterations}

    # Coerce tools to wire dicts
    coerced_tools = _coerce_tools(tools) if tools is not None else None

    params = CreateMessageParams(
        messages=messages,
        maxTokens=max_tokens,
        systemPrompt=system_prompt,
        includeContext=include_context,
        temperature=temperature,
        stopSequences=stop_sequences or [],
        modelPreferences=model_preferences,
        metadata=metadata,
    )
    params_dict = params.to_dict()
    # Add tools / toolChoice to the wire dict (not yet in CreateMessageParams dataclass)
    if coerced_tools is not None:
        params_dict["tools"] = coerced_tools
    if tool_choice is not None:
        params_dict["toolChoice"] = tool_choice
    raw = await self._client_rpc("sampling/createMessage", params_dict)
    result = CreateMessageResult.from_dict(raw if isinstance(raw, dict) else {})
    if result_type is None:
        return result
    try:
        data = json.loads(result.text)
        return _convert_result(data, result_type)
    except Exception as exc:
        raise ValueError(
            f"Sampling reply could not be parsed as {result_type.__name__}: {exc}"
        ) from exc

elicit async

elicit(message, response_type=None)

Ask the connected MCP client to prompt its user for input.

response_type may be None (approval only), str, bool, int, float, a Literal[...], an Enum subclass, list[str] (multi-select string array), or a flat Pydantic model / dataclass / TypedDict whose fields are all of the above scalar types.

Raises :class:McpElicitationNotAvailable when the client did not advertise the elicitation capability or the transport cannot carry server-to-client requests (legacy SSE).

Source code in src/lauren_mcp/_server/_context.py
async def elicit(
    self,
    message: str,
    response_type: Any = None,
) -> ElicitResult:
    """Ask the connected MCP client to prompt its user for input.

    *response_type* may be ``None`` (approval only), ``str``, ``bool``,
    ``int``, ``float``, a ``Literal[...]``, an ``Enum`` subclass,
    ``list[str]`` (multi-select string array), or a flat Pydantic model /
    dataclass / TypedDict whose fields are all of the above scalar types.

    Raises :class:`McpElicitationNotAvailable` when the client did not
    advertise the ``elicitation`` capability or the transport cannot carry
    server-to-client requests (legacy SSE).
    """
    if self._client_rpc is None:
        raise McpElicitationNotAvailable(
            "This transport cannot deliver server-to-client requests"
        )
    caps = self._client_capabilities
    if caps is None or caps.elicitation is None:
        raise McpElicitationNotAvailable(
            "The connected client did not advertise the 'elicitation' capability"
        )

    params: dict[str, Any] = {"message": message}
    schema = build_elicitation_schema(response_type)
    if schema is not None:
        params["requestedSchema"] = schema
    raw = await self._client_rpc("elicitation/create", params)
    return ElicitResult.from_dict(raw if isinstance(raw, dict) else {})

elicit_url async

elicit_url(message, url, *, elicitation_id=None)

Direct the user to an external URL and await completion.

The server sends elicitation/create with requestedUrl (and elicitationId) rather than requestedSchema. The client opens the URL in a browser; the user completes the external flow; the client responds with {"action": "accept"} or {"action": "cancel"}.

Parameters

message: Human-readable prompt shown to the user before the URL is opened. url: The URL to open. elicitation_id: An opaque string identifying this elicitation instance. Auto-generated as a UUID4 hex string when not provided.

Returns

UrlElicitResult action is "accept" (flow completed) or "cancel" (user dismissed or flow was abandoned).

Raises

McpUrlElicitationNotAvailable Client did not advertise the urlElicitation sub-capability, elicitation capability is absent, or the transport does not support server-to-client requests (e.g. legacy HTTP+SSE).

Source code in src/lauren_mcp/_server/_context.py
async def elicit_url(
    self,
    message: str,
    url: str,
    *,
    elicitation_id: str | None = None,
) -> Any:  # returns UrlElicitResult
    """Direct the user to an external URL and await completion.

    The server sends ``elicitation/create`` with ``requestedUrl`` (and
    ``elicitationId``) rather than ``requestedSchema``.  The client opens
    the URL in a browser; the user completes the external flow; the client
    responds with ``{"action": "accept"}`` or ``{"action": "cancel"}``.

    Parameters
    ----------
    message:
        Human-readable prompt shown to the user before the URL is opened.
    url:
        The URL to open.
    elicitation_id:
        An opaque string identifying this elicitation instance.
        Auto-generated as a UUID4 hex string when not provided.

    Returns
    -------
    UrlElicitResult
        ``action`` is ``"accept"`` (flow completed) or ``"cancel"``
        (user dismissed or flow was abandoned).

    Raises
    ------
    McpUrlElicitationNotAvailable
        Client did not advertise the ``urlElicitation`` sub-capability,
        ``elicitation`` capability is absent, or the transport does not
        support server-to-client requests (e.g. legacy HTTP+SSE).
    """
    from lauren_mcp._types import (  # noqa: PLC0415
        McpUrlElicitationNotAvailable,
        UrlElicitResult,
    )

    # -- capability gate --
    caps = self._client_capabilities
    if caps is None or caps.elicitation is None:
        raise McpUrlElicitationNotAvailable(
            "The connected client did not advertise the 'elicitation' capability"
        )
    elicitation_caps = caps.elicitation
    if not (isinstance(elicitation_caps, dict) and elicitation_caps.get("urlElicitation")):
        raise McpUrlElicitationNotAvailable(
            "The connected client does not support URL elicitation "
            "('urlElicitation' not set in elicitation capability)"
        )
    if self._client_rpc is None:
        raise McpUrlElicitationNotAvailable(
            "This transport cannot deliver server-to-client requests"
        )

    import uuid  # noqa: PLC0415

    eid = elicitation_id if elicitation_id is not None else uuid.uuid4().hex

    rpc_params: dict[str, Any] = {
        "message": message,
        "requestedUrl": url,
        "elicitationId": eid,
    }
    raw = await self._client_rpc("elicitation/create", rpc_params)
    return UrlElicitResult.from_dict(raw if isinstance(raw, dict) else {})

McpCatalogManager

McpCatalogManager()

SINGLETON holding the live tool / resource / prompt catalogue.

The catalogue is seeded from decorator metadata at startup and can be mutated at runtime. Every mutation after :meth:set_broadcast_fn fires the matching notifications/*/list_changed broadcast.

Source code in src/lauren_mcp/_server/_catalog.py
def __init__(self) -> None:
    self._tools: dict[str, Any] = {}
    self._resources: dict[str, Any] = {}
    self._prompts: dict[str, Any] = {}
    self._broadcast_fn: BroadcastFn | None = None

set_broadcast_fn

set_broadcast_fn(fn)

Attach the broadcast hook; mutations before this stay silent.

Source code in src/lauren_mcp/_server/_catalog.py
def set_broadcast_fn(self, fn: BroadcastFn | None) -> None:
    """Attach the broadcast hook; mutations before this stay silent."""
    self._broadcast_fn = fn

ResourceSubscriptionManager

ResourceSubscriptionManager()

SINGLETON that tracks per-URI subscriptions and broadcasts update events.

Session keys come from :class:~lauren_mcp._server._registry.McpConnectionRegistry (WS transport) or the session_id allocated by :class:~lauren_mcp._server._session.SseSessionStore / :class:~lauren_mcp._server._streamable.StreamableSessionStore.

Source code in src/lauren_mcp/_server/_subscriptions.py
def __init__(self) -> None:
    # uri -> {session_key -> send_fn}
    self._subscriptions: dict[str, dict[str, SendFn]] = {}

subscription_count property

subscription_count

Total number of active subscriptions across all URIs.

subscribe

subscribe(uri, session_key, send_fn)

Register send_fn as the delivery channel for session_key on uri.

Source code in src/lauren_mcp/_server/_subscriptions.py
def subscribe(self, uri: str, session_key: str, send_fn: SendFn) -> None:
    """Register *send_fn* as the delivery channel for *session_key* on *uri*."""
    self._subscriptions.setdefault(uri, {})[session_key] = send_fn

unsubscribe

unsubscribe(uri, session_key)

Remove one subscription. No-op if not present.

Source code in src/lauren_mcp/_server/_subscriptions.py
def unsubscribe(self, uri: str, session_key: str) -> None:
    """Remove one subscription.  No-op if not present."""
    uri_subs = self._subscriptions.get(uri)
    if uri_subs is not None:
        uri_subs.pop(session_key, None)
        if not uri_subs:
            del self._subscriptions[uri]

unsubscribe_all

unsubscribe_all(session_key)

Remove all subscriptions for session_key (called on disconnect).

Source code in src/lauren_mcp/_server/_subscriptions.py
def unsubscribe_all(self, session_key: str) -> None:
    """Remove all subscriptions for *session_key* (called on disconnect)."""
    empty: list[str] = []
    for uri, subs in self._subscriptions.items():
        subs.pop(session_key, None)
        if not subs:
            empty.append(uri)
    for uri in empty:
        del self._subscriptions[uri]

get_subscribers

get_subscribers(uri)

Return a snapshot of subscribers for uri.

Source code in src/lauren_mcp/_server/_subscriptions.py
def get_subscribers(self, uri: str) -> dict[str, SendFn]:
    """Return a snapshot of subscribers for *uri*."""
    return dict(self._subscriptions.get(uri, {}))

notify_updated async

notify_updated(uri)

Broadcast notifications/resources/updated to all subscribers of uri.

Source code in src/lauren_mcp/_server/_subscriptions.py
async def notify_updated(self, uri: str) -> None:
    """Broadcast ``notifications/resources/updated`` to all subscribers of *uri*."""
    subs = self._subscriptions.get(uri)
    if not subs:
        return
    payload = json.dumps(
        {
            "jsonrpc": "2.0",
            "method": "notifications/resources/updated",
            "params": {"uri": uri},
        }
    )
    results = await asyncio.gather(
        *(fn(payload) for fn in list(subs.values())),
        return_exceptions=True,
    )
    for result in results:
        if isinstance(result, Exception):
            _logger.warning("ResourceSubscription: send failed — %s", result)

EventStore

Bases: ABC

Abstract store for SSE event persistence and replay.

Implementations must be thread-safe if the event loop runs in a thread pool; for asyncio single-loop deployments this is not required.

store_event abstractmethod async

store_event(session_id, event_id, data)

Persist a single SSE event for session_id.

Parameters

session_id: The Streamable HTTP session identifier. event_id: The assigned id: field value, e.g. "sess123:7". data: The raw data: payload (JSON string).

Source code in src/lauren_mcp/_server/_event_store.py
@abstractmethod
async def store_event(self, session_id: str, event_id: str, data: str) -> None:
    """Persist a single SSE event for *session_id*.

    Parameters
    ----------
    session_id:
        The Streamable HTTP session identifier.
    event_id:
        The assigned ``id:`` field value, e.g. ``"sess123:7"``.
    data:
        The raw ``data:`` payload (JSON string).
    """

replay_events_after abstractmethod async

replay_events_after(session_id, last_event_id, send)

Replay all stored events after last_event_id for session_id.

Calls send(event_id, data) for each replayed event in order.

Parameters

session_id: The session whose events should be replayed. last_event_id: The last event ID the client received, or None to replay all stored events from the beginning. send: Async callback invoked once per replayed event.

Source code in src/lauren_mcp/_server/_event_store.py
@abstractmethod
async def replay_events_after(
    self,
    session_id: str,
    last_event_id: str | None,
    send: Callable[[str, str], Awaitable[None]],
) -> None:
    """Replay all stored events after *last_event_id* for *session_id*.

    Calls ``send(event_id, data)`` for each replayed event in order.

    Parameters
    ----------
    session_id:
        The session whose events should be replayed.
    last_event_id:
        The last event ID the client received, or ``None`` to replay
        all stored events from the beginning.
    send:
        Async callback invoked once per replayed event.
    """

InMemoryEventStore

InMemoryEventStore(*, max_events=1000)

Bases: EventStore

In-memory event store for single-process deployments.

Events are stored in a per-session bounded deque. Older events are dropped when the deque reaches max_events to prevent unbounded memory growth.

Parameters

max_events: Maximum number of events retained per session. Defaults to 1000.

Source code in src/lauren_mcp/_server/_event_store.py
def __init__(self, *, max_events: int = 1000) -> None:
    self._max_events = max_events
    # session_id -> deque of (event_id, data) tuples
    self._store: dict[str, deque[tuple[str, str]]] = {}

evict_session

evict_session(session_id)

Remove all events for session_id (call when the session ends).

Source code in src/lauren_mcp/_server/_event_store.py
def evict_session(self, session_id: str) -> None:
    """Remove all events for *session_id* (call when the session ends)."""
    self._store.pop(session_id, None)

TransportSecuritySettings dataclass

TransportSecuritySettings(
    enable_dns_rebinding_protection=True,
    allowed_hosts=list(),
    allowed_origins=list(),
)

Host/Origin validation settings for HTTP MCP transports.

Parameters

enable_dns_rebinding_protection: Master switch. When False the guard is a no-op. allowed_hosts: Accepted Host header values. Each entry may be an exact host ("example.com") or a host with a wildcard port ("example.com:*"). "localhost" and "127.0.0.1" are always implicitly allowed when the list is empty. allowed_origins: Accepted Origin header values for cross-origin POST requests. An empty list means only same-origin (i.e. requests with no Origin header or an Origin matching an allowed_hosts entry pass through).

is_host_allowed

is_host_allowed(host)

Check if Host header is in allowed_hosts (supports 'host:*' wildcard port).

Source code in src/lauren_mcp/_server/_transport_security.py
def is_host_allowed(self, host: str) -> bool:
    """Check if Host header is in allowed_hosts (supports 'host:*' wildcard port)."""
    return _host_allowed(host, self.allowed_hosts)

is_origin_allowed

is_origin_allowed(origin)

Check if Origin is in allowed_origins or is None (same-origin).

Source code in src/lauren_mcp/_server/_transport_security.py
def is_origin_allowed(self, origin: str | None) -> bool:
    """Check if Origin is in allowed_origins or is None (same-origin)."""
    if origin is None:
        return True
    return _origin_allowed(origin, self)