Model Context Protocol (MCP) 基于一个灵活、可扩展的架构,使 LLM 应用程序与集成之间的通信无缝衔接。本文档涵盖了核心架构组件和概念。
MCP 遵循客户端-服务器架构,其中:
- 主机是发起连接的 LLM 应用程序(如 Claude Desktop 或 IDE)。
- 客户端在主机应用程序内部与服务器保持 1:1 的连接。
- 服务器为客户端提供上下文、工具和提示。
核心组件
协议层
协议层处理消息帧、请求/响应链接以及高级通信模式。
class Protocol<Request, Notification, Result> {
// 处理传入请求
setRequestHandler<T>(schema: T, handler: (request: T, extra: RequestHandlerExtra) => Promise<Result>): void
// 处理传入通知
setNotificationHandler<T>(schema: T, handler: (notification: T) => Promise<void>): void
// 发送请求并等待响应
request<T>(request: Request, schema: T, options?: RequestOptions): Promise<T>
// 发送单向通知
notification(notification: Notification): Promise<void>
}
class Protocol<Request, Notification, Result> {
// 处理传入请求
setRequestHandler<T>(schema: T, handler: (request: T, extra: RequestHandlerExtra) => Promise<Result>): void
// 处理传入通知
setNotificationHandler<T>(schema: T, handler: (notification: T) => Promise<void>): void
// 发送请求并等待响应
request<T>(request: Request, schema: T, options?: RequestOptions): Promise<T>
// 发送单向通知
notification(notification: Notification): Promise<void>
}
class Session(BaseSession[RequestT, NotificationT, ResultT]):
async def send_request(
self,
request: RequestT,
result_type: type[Result]
) -> Result:
"""
发送请求并等待响应。如果响应包含错误,则引发 McpError。
"""
# 请求处理实现
async def send_notification(
self,
notification: NotificationT
) -> None:
"""发送不需要响应的单向通知。"""
# 通知处理实现
async def _received_request(
self,
responder: RequestResponder[ReceiveRequestT, ResultT]
) -> None:
"""处理来自另一端的传入请求。"""
# 请求处理实现
async def _received_notification(
self,
notification: ReceiveNotificationT
) -> None:
"""处理来自另一端的传入通知。"""
# 通知处理实现
关键类包括:
传输层
传输层处理客户端和服务器之间的实际通信。MCP 支持多种传输机制:
-
标准输入输出传输
-
HTTP 与 SSE 传输
- 使用服务器发送事件 (SSE) 进行服务器到客户端的消息传递。
- 使用 HTTP POST 进行客户端到服务器的消息传递。
所有传输都使用 JSON-RPC 2.0 来交换消息。有关 Model Context Protocol 消息格式的详细信息,请参阅规范。
消息类型
MCP 有以下主要消息类型:
-
请求期望从另一端获得响应:
interface Request {
method: string;
params?: { ... };
}
-
结果是请求的成功响应:
interface Result {
[key: string]: unknown;
}
-
错误表示请求失败:
interface Error {
code: number;
message: string;
data?: unknown;
}
-
通知是不期望响应的单向消息:
interface Notification {
method: string;
params?: { ... };
}
连接生命周期
1. 初始化
- 客户端发送
initialize
请求,包含协议版本和能力。
- 服务器响应其协议版本和能力。
- 客户端发送
initialized
通知作为确认。
- 正常消息交换开始。
2. 消息交换
初始化后,支持以下模式:
- 请求-响应:客户端或服务器发送请求,另一端响应。
- 通知:任一方发送单向消息。
3. 终止
任一方可以终止连接:
- 通过
close()
进行干净关闭。
- 传输断开。
- 错误情况。
错误处理
MCP 定义了以下标准错误代码:
enum ErrorCode {
// 标准 JSON-RPC 错误代码
ParseError = -32700,
InvalidRequest = -32600,
MethodNotFound = -32601,
InvalidParams = -32602,
InternalError = -32603
}
SDK 和应用程序可以在 -32000 以上定义自己的错误代码。
错误通过以下方式传播:
- 对请求的错误响应。
- 传输上的错误事件。
- 协议级别的错误处理程序。
实现示例
以下是一个实现 MCP 服务器的基本示例:
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
const server = new Server({
name: "example-server",
version: "1.0.0"
}, {
capabilities: {
resources: {}
}
});
// 处理请求
server.setRequestHandler(ListResourcesRequestSchema, async () => {
return {
resources: [
{
uri: "example://resource",
name: "Example Resource"
}
]
};
});
// 连接传输
const transport = new StdioServerTransport();
await server.connect(transport);
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
const server = new Server({
name: "example-server",
version: "1.0.0"
}, {
capabilities: {
resources: {}
}
});
// 处理请求
server.setRequestHandler(ListResourcesRequestSchema, async () => {
return {
resources: [
{
uri: "example://resource",
name: "Example Resource"
}
]
};
});
// 连接传输
const transport = new StdioServerTransport();
await server.connect(transport);
import asyncio
import mcp.types as types
from mcp.server import Server
from mcp.server.stdio import stdio_server
app = Server("example-server")
@app.list_resources()
async def list_resources() -> list[types.Resource]:
return [
types.Resource(
uri="example://resource",
name="Example Resource"
)
]
async def main():
async with stdio_server() as streams:
await app.run(
streams[0],
streams[1],
app.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main)
最佳实践
传输选择
-
本地通信
- 对本地进程使用标准输入输出传输。
- 对同一台机器上的通信高效。
- 简单的进程管理。
-
远程通信
- 对需要 HTTP 兼容性的场景使用 SSE。
- 考虑包括认证和授权的安全影响。
消息处理
-
请求处理
- 彻底验证输入。
- 使用类型安全的模式。
- 优雅地处理错误。
- 实现超时。
-
进度报告
- 对长时间操作使用进度令牌。
- 增量报告进度。
- 在已知时包括总进度。
-
错误管理
- 使用适当的错误代码。
- 包括有用的错误消息。
- 在错误时清理资源。
安全考虑
-
传输安全
- 对远程连接使用 TLS。
- 验证连接来源。
- 在需要时实现认证。
-
消息验证
- 验证所有传入消息。
- 清理输入。
- 检查消息大小限制。
- 验证 JSON-RPC 格式。
-
资源保护
- 实现访问控制。
- 验证资源路径。
- 监控资源使用。
- 限速请求。
-
错误处理
- 不要泄露敏感信息。
- 记录与安全相关的错误。
- 实现适当的清理。
- 处理 DoS 场景。
调试和监控
-
日志记录
- 记录协议事件。
- 跟踪消息流。
- 监控性能。
- 记录错误。
-
诊断
- 实现健康检查。
- 监控连接状态。
- 跟踪资源使用。
- 分析性能。
-
测试
- 测试不同的传输。
- 验证错误处理。
- 检查边缘情况。
- 对服务器进行负载测试。