Kimi-CLI MCP (Model Context Protocol) 工具实现
技术研究 人工智能 LLM
Model Context Protocol (MCP) 是一个开放协议,允许 AI 模型安全地与外部工具和数据源交互。Kimi-CLI 完整实现了 MCP 客户端功能,可以连接各种 MCP 服务器来扩展 AI 能力。
MCP 概述
Model Context Protocol (MCP) 是一个开放协议,允许 AI 模型安全地与外部工具和数据源交互。Kimi-CLI 完整实现了 MCP 客户端功能,可以连接各种 MCP 服务器来扩展 AI 能力。
1. MCP 架构
1.1 整体架构
┌─────────────────────────────────────────────────────────────┐
│ Kimi-CLI Agent │
├─────────────────────────────────────────────────────────────┤
│ KimiToolset │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ 内置工具 │ │ MCP 工具 │ │ 外部工具 │ │
│ │ (Python) │ │ (MCP协议) │ │ (Wire协议) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────┘
│
│ MCP 协议
▼
┌─────────────────────────────────────────────────────────────┐
│ MCP 服务器 │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Context7 │ │ Linear │ │ PostgreSQL │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────┘
1.2 核心组件
KimiToolset
src/kimi_cli/soul/toolset.py 中的 KimiToolset 类管理所有工具,包括 MCP 工具:
class KimiToolset:
def __init__(self) -> None:
self._tool_dict: dict[str, ToolType] = {}
self._mcp_servers: dict[str, MCPServerInfo] = {}
self._mcp_loading_task: asyncio.Task[None] | None = None
def add(self, tool: ToolType) -> None:
"""添加工具到工具集"""
self._tool_dict[tool.name] = tool
@property
def mcp_servers(self) -> dict[str, MCPServerInfo]:
"""获取 MCP 服务器信息"""
return self._mcp_servers
MCPServerInfo
MCP 服务器信息的数据结构:
@dataclass(slots=True)
class MCPServerInfo:
status: Literal["pending", "connecting", "connected", "failed", "unauthorized"]
client: fastmcp.Client[Any]
tools: list[MCPTool[Any]]
MCPTool
MCP 工具的包装类:
class MCPTool[T: ClientTransport](CallableTool):
def __init__(
self,
server_name: str,
mcp_tool: mcp.Tool,
client: fastmcp.Client[T],
*,
runtime: Runtime,
**kwargs: Any,
):
super().__init__(
name=mcp_tool.name,
description=(
f"This is an MCP (Model Context Protocol) tool from MCP server `{server_name}`.\n\n"
f"{mcp_tool.description or 'No description provided.'}"
),
parameters=mcp_tool.inputSchema,
**kwargs,
)
self._mcp_tool = mcp_tool
self._client = client
self._runtime = runtime
self._timeout = timedelta(
milliseconds=runtime.config.mcp.client.tool_call_timeout_ms
)
2. MCP 工具加载流程
2.1 加载入口
在 load_agent 函数中加载 MCP 工具:
async def load_agent(
agent_file: Path,
runtime: Runtime,
*,
mcp_configs: list[MCPConfig] | list[dict[str, Any]],
) -> Agent:
# ... 加载其他工具 ...
if mcp_configs:
validated_mcp_configs: list[MCPConfig] = []
for mcp_config in mcp_configs:
validated_mcp_configs.append(
mcp_config
if isinstance(mcp_config, MCPConfig)
else MCPConfig.model_validate(mcp_config)
)
await toolset.load_mcp_tools(validated_mcp_configs, runtime)
2.2 加载详细流程
async def load_mcp_tools(
self,
mcp_configs: list[MCPConfig],
runtime: Runtime,
in_background: bool = True
) -> None:
import fastmcp
from fastmcp.mcp_config import MCPConfig
# 1. 创建 MCP 客户端
for mcp_config in mcp_configs:
if not mcp_config.mcpServers:
continue
for server_name, server_config in mcp_config.mcpServers.items():
client = fastmcp.Client(MCPConfig(mcpServers={server_name: server_config}))
self._mcp_servers[server_name] = MCPServerInfo(
status="pending",
client=client,
tools=[]
)
# 2. 连接并加载工具
async def _connect():
for server_name, server_info in self._mcp_servers.items():
server_info.status = "connecting"
try:
async with server_info.client as client:
for tool in await client.list_tools():
mcp_tool = MCPTool(server_name, tool, client, runtime=runtime)
server_info.tools.append(mcp_tool)
self.add(mcp_tool)
server_info.status = "connected"
logger.info("Connected MCP server: {server_name}", server_name=server_name)
except Exception as e:
logger.error("Failed to connect MCP server: {server_name}", server_name=server_name)
server_info.status = "failed"
# 3. 异步或同步加载
if in_background:
self._mcp_loading_task = asyncio.create_task(_connect())
else:
await _connect()
2.3 等待加载完成
async def wait_for_mcp_tools(self) -> None:
"""等待后台 MCP 工具加载完成"""
task = self._mcp_loading_task
if not task:
return
try:
await task
finally:
if self._mcp_loading_task is task and task.done():
self._mcp_loading_task = None
3. MCP 工具调用
3.1 调用流程
用户请求 → Agent → LLM → 工具调用 → Toolset.handle()
→ MCPTool.__call__() → 批准 → fastmcp.client.call_tool()
→ MCP 服务器 → 返回结果 → 转换 → Agent → 用户
3.2 MCPTool 实现
async def __call__(self, *args: Any, **kwargs: Any) -> ToolReturnValue:
# 1. 构建操作描述
description = f"Call MCP tool `{self._mcp_tool.name}`."
# 2. 请求用户批准
if not await self._runtime.approval.request(
self.name,
self._action_name,
description
):
return ToolRejectedError()
# 3. 调用 MCP 工具
try:
async with self._client as client:
result = await client.call_tool(
self._mcp_tool.name,
kwargs,
timeout=self._timeout,
raise_on_error=False,
)
return convert_mcp_tool_result(result)
except Exception as e:
exc_msg = str(e).lower()
if "timeout" in exc_msg or "timed out" in exc_msg:
return ToolError(
message=(
f"Timeout while calling MCP tool `{self._mcp_tool.name}`. "
"You may explain to the user that the timeout config is set too low."
),
brief="Timeout",
)
raise
3.3 结果转换
def convert_mcp_tool_result(result: CallToolResult) -> ToolReturnValue:
"""将 MCP 工具结果转换为 kosong 工具返回值"""
content: list[ContentPart] = []
for part in result.content:
content.append(convert_mcp_content(part))
if result.is_error:
return ToolError(
output=content,
message="Tool returned an error. The output may be error message or incomplete output",
brief="",
)
else:
return ToolOk(output=content)
4. MCP 服务器类型
4.1 HTTP 服务器
使用 HTTP 协议连接的 MCP 服务器:
case acp.schema.HttpMcpServer():
return {
"url": server.url,
"transport": "http",
"headers": {header.name: header.value for header in server.headers},
}
配置示例:
[mcp.servers.context7]
url = "https://mcp.context7.com/mcp"
transport = "http"
headers = { CONTEXT7_API_KEY = "your-key" }
4.2 SSE 服务器
使用 Server-Sent Events 协议连接的 MCP 服务器:
case acp.schema.SseMcpServer():
return {
"url": server.url,
"transport": "sse",
"headers": {header.name: header.value for header in server.headers},
}
配置示例:
[mcp.servers.my-server]
url = "https://my-server.com/mcp"
transport = "sse"
headers = { Authorization = "Bearer token" }
4.3 Stdio 服务器
使用标准输入/输出连接的本地 MCP 服务器:
case acp.schema.McpServerStdio():
return {
"command": server.command,
"args": server.args,
"env": {item.name: item.value for item in server.env},
"transport": "stdio",
}
配置示例:
[mcp.servers.chrome-devtools]
command = "npx"
args = ["-y", "chrome-devtools-mcp@latest"]
transport = "stdio"
5. OAuth 认证
5.1 OAuth 支持
MCP 服务器可以配置 OAuth 认证:
async def _check_oauth_tokens(server_url: str) -> bool:
"""检查 OAuth 令牌是否存在"""
try:
from fastmcp.client.auth.oauth import FileTokenStorage
storage = FileTokenStorage(server_url=server_url)
tokens = await storage.get_tokens()
return tokens is not None
except Exception:
return False
5.2 认证流程
oauth_servers: dict[str, str] = {}
# 识别 OAuth 服务器
for server_name, server_config in mcp_config.mcpServers.items():
if isinstance(server_config, RemoteMCPServer) and server_config.auth == "oauth":
oauth_servers[server_name] = server_config.url
# 检查令牌
for server_name, server_info in self._mcp_servers.items():
server_url = oauth_servers.get(server_name)
if not server_url:
continue
if not await _check_oauth_tokens(server_url):
logger.warning(
"Skipping OAuth MCP server '{server_name}': not authorized. "
"Run 'kimi mcp auth {server_name}' first.",
server_name=server_name,
)
server_info.status = "unauthorized"
5.3 认证命令
# 授权 MCP 服务器
kimi mcp auth linear
6. MCP 配置
6.1 配置文件位置
- 用户配置:
~/.kimi/config.toml - 临时配置: 通过
--mcp-config-file参数
6.2 配置格式
[mcp.client]
tool_call_timeout_ms = 60000 # 工具调用超时时间
[mcp.servers]
# HTTP 服务器
[mcp.servers.context7]
url = "https://mcp.context7.com/mcp"
transport = "http"
headers = { CONTEXT7_API_KEY = "your-key" }
# SSE 服务器
[mcp.servers.sse-server]
url = "https://sse-server.com/mcp"
transport = "sse"
headers = { Authorization = "Bearer token" }
# Stdio 服务器
[mcp.servers.chrome-devtools]
command = "npx"
args = ["-y", "chrome-devtools-mcp@latest"]
transport = "stdio"
# OAuth 服务器
[mcp.servers.linear]
url = "https://mcp.linear.app/mcp"
transport = "http"
auth = "oauth"
6.3 配置加载
MCP 配置从多个来源加载:
- ACP 客户端传递的配置
- 配置文件中的配置
- 命令行参数指定的配置
# 从 ACP 转换 MCP 配置
def acp_mcp_servers_to_mcp_config(mcp_servers: list[MCPServer]) -> MCPConfig:
if not mcp_servers:
return MCPConfig()
return MCPConfig.model_validate(
{"mcpServers": {
server.name: _convert_acp_mcp_server(server)
for server in mcp_servers
}}
)
7. MCP 命令行接口
7.1 kimi mcp 命令
# 添加服务器
kimi mcp add --transport http context7 https://mcp.context7.com/mcp
kimi mcp add --transport stdio chrome-devtools -- npx chrome-devtools-mcp@latest
kimi mcp add --transport http --auth oauth linear https://mcp.linear.app/mcp
# 列出服务器
kimi mcp list
# 删除服务器
kimi mcp remove context7
# 授权服务器
kimi mcp auth linear
7.2 查看连接状态
在 Kimi-CLI 运行时,输入 /mcp 查看已连接的服务器和加载的工具:
/mcp
Connected MCP servers:
- context7 (HTTP): 5 tools loaded
- linear (HTTP, OAuth): 12 tools loaded
- chrome-devtools (Stdio): 3 tools loaded
8. MCP 工具示例
8.1 Context7 MCP 工具
提供 API 文档和代码示例搜索:
工具列表:
- search_api_docs: 搜索 API 文档
- search_code_examples: 搜索代码示例
- get_api_details: 获取 API 详细信息
8.2 Linear MCP 工具
提供项目管理功能:
工具列表:
- list_issues: 列出问题
- create_issue: 创建问题
- update_issue: 更新问题
- add_comment: 添加评论
8.3 PostgreSQL MCP 工具
提供数据库访问:
工具列表:
- execute_query: 执行 SQL 查询
- list_tables: 列出表
- describe_table: 描述表结构
9. MCP 与其他系统的集成
9.1 ACP 集成
ACP 客户端可以传递 MCP 配置给 Kimi-CLI:
# src/kimi_cli/acp/mcp.py
def acp_mcp_servers_to_mcp_config(mcp_servers: list[MCPServer]) -> MCPConfig:
"""将 ACP MCP 服务器配置转换为 fastmcp MCP 配置"""
if not mcp_servers:
return MCPConfig()
try:
return MCPConfig.model_validate(
{"mcpServers": {
server.name: _convert_acp_mcp_server(server)
for server in mcp_servers
}}
)
except ValidationError as exc:
raise MCPConfigError(f"Invalid MCP config from ACP client: {exc}") from exc
9.2 Wire 协议集成
MCP 工具调用通过 Wire 协议传递批准请求:
# 请求批准
if not await self._runtime.approval.request(
self.name,
self._action_name,
description
):
return ToolRejectedError()
# 发送批准请求到 Wire
wire_request = ApprovalRequest(
id=request.id,
action=request.action,
description=request.description,
sender=request.sender,
tool_call_id=request.tool_call_id,
display=request.display,
)
wire_send(wire_request)
10. 错误处理
10.1 连接失败
except Exception as e:
logger.error(
"Failed to connect MCP server: {server_name}, error: {error}",
server_name=server_name,
error=e,
)
server_info.status = "failed"
return server_name, e
10.2 超时处理
except Exception as e:
exc_msg = str(e).lower()
if "timeout" in exc_msg or "timed out" in exc_msg:
return ToolError(
message=(
f"Timeout while calling MCP tool `{self._mcp_tool.name}`. "
"You may explain to the user that the timeout config is set too low."
),
brief="Timeout",
)
raise
10.3 未授权
if not await _check_oauth_tokens(server_url):
logger.warning(
"Skipping OAuth MCP server '{server_name}': not authorized. "
"Run 'kimi mcp auth {server_name}' first.",
server_name=server_name,
)
server_info.status = "unauthorized"
11. 性能优化
11.1 异步加载
MCP 工具在后台异步加载,不阻塞 Agent 启动:
if in_background:
self._mcp_loading_task = asyncio.create_task(_connect())
else:
await _connect()
11.2 并行连接
多个 MCP 服务器并行连接:
tasks = [
asyncio.create_task(_connect_server(server_name, server_info))
for server_name, server_info in self._mcp_servers.items()
if server_info.status == "pending"
]
results = await asyncio.gather(*tasks) if tasks else []
11.3 超时控制
配置工具调用超时:
[mcp.client]
tool_call_timeout_ms = 60000 # 60 秒
12. 安全考虑
12.1 批准机制
所有 MCP 工具调用都需要用户批准:
if not await self._runtime.approval.request(
self.name,
self._action_name,
description
):
return ToolRejectedError()
12.2 权限控制
通过 OAuth 实现细粒度权限控制:
if server_config.auth == "oauth":
# 检查 OAuth 令牌
if not await _check_oauth_tokens(server_url):
server_info.status = "unauthorized"
12.3 环境隔离
Stdio 服务器在独立进程中运行,通过标准输入输出通信:
case acp.schema.McpServerStdio():
return {
"command": server.command,
"args": server.args,
"env": {item.name: item.value for item in server.env},
"transport": "stdio",
}