Testing¶
This guide covers the testing patterns used in the lauren-mcp test suite and
shows you how to apply them to your own server and client code.
1. Test structure overview¶
tests/
unit/ — pure unit tests (no subprocess, no network)
integration/ — in-process tests with real DI containers and test HTTP clients
end_to_end/ — full-stack tests: subprocess server + connected McpStdioClient
docs/ — E2E tests for every code example in the documentation
All tests use pytest with asyncio_mode = "auto" so every async def
test_* function is awaited automatically.
2. Testing a tool handler directly (unit)¶
The fastest way to test tool logic is to instantiate your server class and call the method directly — no subprocess, no network, no DI:
import pytest
from lauren_mcp import mcp_server, mcp_tool
BOOKS = [
{"id": 1, "title": "Clean Code", "author": "Martin"},
{"id": 2, "title": "Pragmatic Programmer", "author": "Thomas"},
]
@mcp_server("/mcp")
class BookServer:
@mcp_tool()
async def search(self, query: str) -> list:
"""Search books by title."""
return [b for b in BOOKS if query.lower() in b["title"].lower()]
@mcp_tool()
async def get_book(self, book_id: int) -> dict | None:
"""Get a book by ID."""
return next((b for b in BOOKS if b["id"] == book_id), None)
@pytest.fixture
def server():
return BookServer()
async def test_search_returns_matching_books(server):
results = await server.search("clean")
assert len(results) == 1
assert results[0]["title"] == "Clean Code"
async def test_search_case_insensitive(server):
assert await server.search("CLEAN") == await server.search("clean")
async def test_get_book_not_found(server):
assert await server.get_book(9999) is None
3. Testing with McpToolContext injection¶
Tools that accept a McpToolContext parameter get it injected by the dispatcher
at runtime. In unit tests, wire the handler directly using make_tools_call_handler
and set CURRENT_BINDING to supply transport-specific state:
from lauren_mcp import mcp_tool, McpToolContext
from lauren_mcp._server._binding import CURRENT_BINDING, TransportBinding
from lauren_mcp._types import JsonRpcRequest
from lauren_mcp.server._handlers import make_context_factory, make_tools_call_handler
from lauren_mcp.server._meta import MCP_TOOL_META
class MyServer:
@mcp_tool()
async def whoami(self, ctx: McpToolContext) -> dict:
"""Return caller identity."""
return {
"session_id": ctx.session_id,
"lifespan": ctx.lifespan_context,
}
async def test_context_injected():
meta = getattr(MyServer.whoami, MCP_TOOL_META)
# make_context_factory takes server-level metadata and a lifespan getter.
factory = make_context_factory(
{"team": "core"},
lifespan_getter=lambda: {"db": "conn"},
)
handler = make_tools_call_handler(MyServer(), [meta], context_factory=factory)
# CURRENT_BINDING is a ContextVar; set it before calling the handler.
binding = TransportBinding(session_id="sess-42")
token = CURRENT_BINDING.set(binding)
try:
req = JsonRpcRequest(method="tools/call", id=1, params={"name": "whoami"})
result = await handler(req)
finally:
CURRENT_BINDING.reset(token)
facts = result["structuredContent"]
assert facts["session_id"] == "sess-42"
assert facts["lifespan"] == {"db": "conn"}
4. Testing progress notifications and logging¶
Mock send_notification in the TransportBinding to assert that your tool
sends the expected progress and log payloads:
from lauren_mcp import mcp_tool, McpToolContext
from lauren_mcp._server._binding import CURRENT_BINDING, TransportBinding
from lauren_mcp._types import JsonRpcRequest
from lauren_mcp.server._handlers import make_context_factory, make_tools_call_handler
from lauren_mcp.server._meta import MCP_TOOL_META
class WorkServer:
@mcp_tool()
async def process(self, n: int, ctx: McpToolContext) -> str:
"""Process n items, reporting progress."""
for i in range(n):
await ctx.report_progress(i + 1, total=n)
await ctx.info("done", {"items": n})
return "ok"
async def test_progress_and_log():
sent = []
async def capture(payload):
sent.append(payload)
meta = getattr(WorkServer.process, MCP_TOOL_META)
handler = make_tools_call_handler(WorkServer(), [meta], context_factory=make_context_factory())
binding = TransportBinding(
send_notification=capture,
)
token = CURRENT_BINDING.set(binding)
try:
req = JsonRpcRequest(
method="tools/call",
id=1,
params={"name": "process", "arguments": {"n": 3}, "_meta": {"progressToken": "p-1"}},
)
await handler(req)
finally:
CURRENT_BINDING.reset(token)
progress_events = [m for m in sent if m["method"] == "notifications/progress"]
assert len(progress_events) == 3
assert progress_events[-1]["params"]["progress"] == 3
log_events = [m for m in sent if m["method"] == "notifications/message"]
assert len(log_events) == 1
assert log_events[0]["params"]["level"] == "info"
5. Testing lifespan (@mcp_lifespan)¶
The lifespan generator runs inside _McpHandlerRegistrar._register_handlers().
You can test it without the full DI stack using the build_wired_dispatcher
helper:
from lauren_mcp import mcp_server, mcp_tool, McpToolContext
from lauren_mcp.server import mcp_lifespan
from lauren_mcp.server._module import McpServerModule
from lauren_mcp._server._catalog import McpCatalogManager
from lauren_mcp._server._registry import McpConnectionRegistry
from lauren_mcp._server._dispatcher import McpDispatcher
from lauren_mcp._types import JsonRpcRequest
async def build_wired_dispatcher(server_cls, **kwargs):
"""Wire handlers without starting a Lauren app — useful in unit tests."""
mod = McpServerModule.for_root(server_cls, **kwargs)
dispatcher = McpDispatcher()
dispatcher._register_builtins()
server = server_cls()
registrar_cls = mod._handler_registrar_cls
registrar = registrar_cls(
dispatcher, McpConnectionRegistry(), McpCatalogManager(), server
)
await registrar._register_handlers()
return dispatcher, server
@mcp_server("/mcp")
class DbServer:
@mcp_lifespan
async def lifespan(self):
connection = {"url": "sqlite:///:memory:"}
try:
yield {"db": connection}
finally:
pass # close connection
@mcp_tool()
async def ping_db(self, ctx: McpToolContext) -> str:
"""Ping the database."""
db = ctx.lifespan_context.get("db")
return f"connected: {db['url']}"
async def test_lifespan_context_reaches_tool():
dispatcher, _ = await build_wired_dispatcher(DbServer)
# Initialize
init_req = JsonRpcRequest(
method="initialize",
id=0,
params={
"protocolVersion": "2025-03-26",
"capabilities": {},
"clientInfo": {"name": "test", "version": "0"},
},
)
await dispatcher.dispatch(init_req)
# Call the tool
call_req = JsonRpcRequest(
method="tools/call",
id=1,
params={"name": "ping_db", "arguments": {}},
)
result = await dispatcher.dispatch(call_req)
assert "sqlite:///:memory:" in result.result["content"][0]["text"]
6. Testing with a Lauren app (integration)¶
For testing with real WebSocket connections, wrap your module in a Lauren app
and use TestClient + WsTestClient:
import json
import pytest
from lauren import LaurenFactory, module
from lauren.testing import TestClient, WsTestClient
from lauren_mcp import McpServerModule, mcp_server, mcp_tool
@mcp_server("/mcp")
class CalcServer:
@mcp_tool()
async def add(self, a: int, b: int) -> int:
"""Add two numbers."""
return a + b
@pytest.fixture(scope="module")
def app():
@module(imports=[McpServerModule.for_root(CalcServer, transport="ws")])
class App:
pass
# LaurenFactory.create builds the DI container.
# TestClient(app) triggers @post_construct hooks (including handler registration).
a = LaurenFactory.create(App)
TestClient(a) # important — must come after create()
return a
async def test_add_tool(app):
async with WsTestClient(app).connect("/mcp/ws") as ws:
# Handshake
await ws.send_text(json.dumps({
"jsonrpc": "2.0", "id": 0, "method": "initialize",
"params": {
"protocolVersion": "2025-03-26",
"capabilities": {},
"clientInfo": {"name": "test", "version": "0"},
},
}))
await ws.receive_text()
await ws.send_text(json.dumps({"jsonrpc": "2.0", "method": "notifications/initialized"}))
# Tool call
await ws.send_text(json.dumps({
"jsonrpc": "2.0", "id": 1, "method": "tools/call",
"params": {"name": "add", "arguments": {"a": 3, "b": 4}},
}))
resp = json.loads(await ws.receive_text())
assert resp["result"]["content"][0]["text"] == "7"
Critical: Always call
TestClient(app)afterLaurenFactory.create(app). It triggers@post_constructhooks. Without it,initializewill fail withMcpCallError: Method not found: 'initialize'.
7. Testing Streamable HTTP transport¶
Use TestClient(app).post("/mcp/", ...) to drive the Streamable HTTP transport.
The first request must be initialize, which returns an mcp-session-id header.
Subsequent requests must include that header:
import json
import pytest
from lauren import LaurenFactory, module
from lauren.testing import TestClient
from lauren_mcp import McpServerModule, mcp_server, mcp_tool
@mcp_server("/mcp")
class EchoServer:
@mcp_tool()
async def echo(self, message: str) -> str:
"""Echo a message."""
return message
@pytest.fixture(scope="module")
def app():
@module(imports=[McpServerModule.for_root(EchoServer, transport="streamable")])
class App:
pass
a = LaurenFactory.create(App)
TestClient(a)
return a
def _post(client, body, session_id=None):
headers = {"content-type": "application/json"}
if session_id:
headers["mcp-session-id"] = session_id
return client.post("/mcp/", content=json.dumps(body).encode(), headers=headers)
async def test_echo_tool_streamable(app):
client = TestClient(app)
# Initialize — no session header yet
resp = _post(client, {
"jsonrpc": "2.0", "id": 0, "method": "initialize",
"params": {
"protocolVersion": "2025-03-26",
"capabilities": {},
"clientInfo": {"name": "test", "version": "0"},
},
})
assert resp.status_code == 200
session_id = resp.header("mcp-session-id")
# Complete handshake
_post(client, {"jsonrpc": "2.0", "method": "notifications/initialized"}, session_id)
# Tool call
resp = _post(client, {
"jsonrpc": "2.0", "id": 1, "method": "tools/call",
"params": {"name": "echo", "arguments": {"message": "hello"}},
}, session_id)
assert resp.status_code == 200
payload = resp.json()
assert payload["result"]["content"][0]["text"] == "hello"
Key behaviours to test:
- Request without
mcp-session-id→ HTTP 400 - Unknown
mcp-session-id→ HTTP 404 DELETE /mcp/with session header → HTTP 204 (terminates the session)Accept: text/event-streamheader → response is an SSE stream
8. Testing with mock clients (unit)¶
Use unittest.mock.AsyncMock to test code that consumes McpClientProtocol
without spawning any subprocess:
from unittest.mock import AsyncMock, MagicMock
from lauren_mcp._types import ToolSchema
def make_mock_client(*tool_names: str):
client = MagicMock()
client.connect = AsyncMock()
client.close = AsyncMock()
client.list_tools = AsyncMock(return_value=[
ToolSchema(
name=name,
description=f"Tool {name}",
inputSchema={"type": "object", "properties": {}, "required": []},
)
for name in tool_names
])
async def call_tool(name, args):
return {"content": [{"type": "text", "text": f"{name}:{args}"}], "isError": False}
client.call_tool = call_tool
return client
async def test_mock_client_list_tools():
client = make_mock_client("search", "get_item")
await client.connect()
tools = await client.list_tools()
assert {t.name for t in tools} == {"search", "get_item"}
await client.close()
9. Testing with a real subprocess (end-to-end)¶
For full-stack testing write your server as a standalone stdio script and
launch it with McpServer.stdio. Set max_retries=0 to get immediate failures
rather than 30-second hangs if the script crashes.
# tests/end_to_end/test_book_server.py
from __future__ import annotations
import asyncio, json, os, sys, tempfile, textwrap
import pytest
from lauren_mcp import McpServer
_BOOK_SERVER = textwrap.dedent('''\
import sys, json, asyncio
from lauren_mcp.server._decorators import mcp_server, mcp_tool
from lauren_mcp.server._meta import MCP_TOOL_META
from lauren_mcp.server._handlers import make_tools_list_handler, make_tools_call_handler
from lauren_mcp._server._dispatcher import McpDispatcher
from lauren_mcp._types import JsonRpcRequest
BOOKS = [{"id": 1, "title": "Clean Code", "author": "Martin"}]
@mcp_server('/mcp')
class BookServer:
@mcp_tool()
async def search(self, query: str) -> list:
'Search books.'
return [b for b in BOOKS if query.lower() in b['title'].lower()]
async def main():
dispatcher = McpDispatcher()
dispatcher._register_builtins()
server = BookServer()
tools = [getattr(getattr(BookServer, n), MCP_TOOL_META)
for n in dir(BookServer)
if hasattr(getattr(BookServer, n, None), MCP_TOOL_META)]
async def _init(p):
return {"protocolVersion":"2025-03-26",
"capabilities":{"tools":{}},"serverInfo":{"name":"book","version":"1.0.0"}}
dispatcher.register('initialize', _init)
tl = make_tools_list_handler(tools)
tc = make_tools_call_handler(server, tools)
async def _tl(p): return await tl(JsonRpcRequest(method='tools/list', params=p))
async def _tc(p): return await tc(JsonRpcRequest(method='tools/call', params=p))
dispatcher.register('tools/list', _tl)
dispatcher.register('tools/call', _tc)
loop = asyncio.get_event_loop()
reader = asyncio.StreamReader()
await loop.connect_read_pipe(lambda: asyncio.StreamReaderProtocol(reader), sys.stdin)
while True:
line = await reader.readline()
if not line:
break
msg = json.loads(line.decode().strip())
req = JsonRpcRequest(method=msg.get('method',''), id=msg.get('id'),
params=msg.get('params'))
resp = await dispatcher.dispatch(req)
print(resp.to_json(), flush=True)
asyncio.run(main())
''')
@pytest.fixture
def book_server_cmd():
with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f:
f.write(_BOOK_SERVER)
fname = f.name
yield [sys.executable, fname]
os.unlink(fname)
@pytest.fixture
async def book_client(book_server_cmd):
# max_retries=0 prevents 30-second hangs if the server script crashes
client = McpServer.stdio(book_server_cmd, startup_timeout=10.0, max_retries=0)
await asyncio.wait_for(client.connect(), timeout=10.0)
yield client
await client.close()
async def test_list_tools_includes_search(book_client):
tools = await asyncio.wait_for(book_client.list_tools(), timeout=5.0)
assert any(t.name == "search" for t in tools)
async def test_search_returns_matching_books(book_client):
result = await asyncio.wait_for(
book_client.call_tool("search", {"query": "clean"}), timeout=5.0
)
books = json.loads(result["content"][0]["text"])
assert books[0]["title"] == "Clean Code"
Note: Subprocess scripts use single-quoted docstrings to avoid terminating the outer
'''...'''string literal.
10. Marking tests that need live services¶
Use @pytest.mark.eval for tests that require an external network service.
These are excluded from the default run:
import pytest
from lauren_mcp import McpServer
@pytest.mark.eval
async def test_filesystem_server_live():
"""Requires: npx and @modelcontextprotocol/server-filesystem."""
client = McpServer.stdio(
["npx", "-y", "@modelcontextprotocol/server-filesystem", "/tmp"],
max_retries=0,
)
await client.connect()
tools = await client.list_tools()
assert any(t.name == "list_directory" for t in tools)
await client.close()
Run eval tests explicitly:
11. Running the test suite¶
# Unit tests only (fastest, ~1 s)
uv run --no-sync pytest tests/unit -q
# Unit + integration (~10 s)
uv run --no-sync pytest tests/unit tests/integration -q
# Full suite including e2e and docs examples
uv run --no-sync pytest -q
# Specific test file
uv run --no-sync pytest tests/integration/test_mcp_streamable_http.py -v
# With coverage report
uv run --no-sync pytest --cov=src/lauren_mcp --cov-report=term-missing
# Via nox (all Python versions)
uv run --no-sync nox -s tests-3.12
Next steps¶
- Error handling — test error conditions
- Multiple servers — test multi-server composition