Logo
热心市民王先生

Kimi-CLI MCP (Model Context Protocol) 工具实现

技术研究 人工智能 LLM

Model Context Protocol (MCP) 是一个开放协议,允许 AI 模型安全地与外部工具和数据源交互。Kimi-CLI 完整实现了 MCP 客户端功能,可以连接各种 MCP 服务器来扩展 AI 能力。

MCP 概述

Model Context Protocol (MCP) 是一个开放协议,允许 AI 模型安全地与外部工具和数据源交互。Kimi-CLI 完整实现了 MCP 客户端功能,可以连接各种 MCP 服务器来扩展 AI 能力。

1. MCP 架构

1.1 整体架构

┌─────────────────────────────────────────────────────────────┐
│                    Kimi-CLI Agent                          │
├─────────────────────────────────────────────────────────────┤
│  KimiToolset                                             │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐  │
│  │  内置工具     │  │ MCP 工具     │  │ 外部工具     │  │
│  │ (Python)     │  │ (MCP协议)    │  │ (Wire协议)   │  │
│  └──────────────┘  └──────────────┘  └──────────────┘  │
└─────────────────────────────────────────────────────────────┘

                            │ MCP 协议

┌─────────────────────────────────────────────────────────────┐
│                  MCP 服务器                                │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐  │
│  │  Context7    │  │  Linear      │  │  PostgreSQL  │  │
│  └──────────────┘  └──────────────┘  └──────────────┘  │
└─────────────────────────────────────────────────────────────┘

1.2 核心组件

KimiToolset

src/kimi_cli/soul/toolset.py 中的 KimiToolset 类管理所有工具,包括 MCP 工具:

class KimiToolset:
    def __init__(self) -> None:
        self._tool_dict: dict[str, ToolType] = {}
        self._mcp_servers: dict[str, MCPServerInfo] = {}
        self._mcp_loading_task: asyncio.Task[None] | None = None
    
    def add(self, tool: ToolType) -> None:
        """添加工具到工具集"""
        self._tool_dict[tool.name] = tool
    
    @property
    def mcp_servers(self) -> dict[str, MCPServerInfo]:
        """获取 MCP 服务器信息"""
        return self._mcp_servers

MCPServerInfo

MCP 服务器信息的数据结构:

@dataclass(slots=True)
class MCPServerInfo:
    status: Literal["pending", "connecting", "connected", "failed", "unauthorized"]
    client: fastmcp.Client[Any]
    tools: list[MCPTool[Any]]

MCPTool

MCP 工具的包装类:

class MCPTool[T: ClientTransport](CallableTool):
    def __init__(
        self,
        server_name: str,
        mcp_tool: mcp.Tool,
        client: fastmcp.Client[T],
        *,
        runtime: Runtime,
        **kwargs: Any,
    ):
        super().__init__(
            name=mcp_tool.name,
            description=(
                f"This is an MCP (Model Context Protocol) tool from MCP server `{server_name}`.\n\n"
                f"{mcp_tool.description or 'No description provided.'}"
            ),
            parameters=mcp_tool.inputSchema,
            **kwargs,
        )
        self._mcp_tool = mcp_tool
        self._client = client
        self._runtime = runtime
        self._timeout = timedelta(
            milliseconds=runtime.config.mcp.client.tool_call_timeout_ms
        )

2. MCP 工具加载流程

2.1 加载入口

load_agent 函数中加载 MCP 工具:

async def load_agent(
    agent_file: Path,
    runtime: Runtime,
    *,
    mcp_configs: list[MCPConfig] | list[dict[str, Any]],
) -> Agent:
    # ... 加载其他工具 ...
    
    if mcp_configs:
        validated_mcp_configs: list[MCPConfig] = []
        for mcp_config in mcp_configs:
            validated_mcp_configs.append(
                mcp_config
                if isinstance(mcp_config, MCPConfig)
                else MCPConfig.model_validate(mcp_config)
            )
        await toolset.load_mcp_tools(validated_mcp_configs, runtime)

2.2 加载详细流程

async def load_mcp_tools(
    self, 
    mcp_configs: list[MCPConfig], 
    runtime: Runtime, 
    in_background: bool = True
) -> None:
    import fastmcp
    from fastmcp.mcp_config import MCPConfig
    
    # 1. 创建 MCP 客户端
    for mcp_config in mcp_configs:
        if not mcp_config.mcpServers:
            continue
            
        for server_name, server_config in mcp_config.mcpServers.items():
            client = fastmcp.Client(MCPConfig(mcpServers={server_name: server_config}))
            self._mcp_servers[server_name] = MCPServerInfo(
                status="pending",
                client=client,
                tools=[]
            )
    
    # 2. 连接并加载工具
    async def _connect():
        for server_name, server_info in self._mcp_servers.items():
            server_info.status = "connecting"
            try:
                async with server_info.client as client:
                    for tool in await client.list_tools():
                        mcp_tool = MCPTool(server_name, tool, client, runtime=runtime)
                        server_info.tools.append(mcp_tool)
                        self.add(mcp_tool)
                
                server_info.status = "connected"
                logger.info("Connected MCP server: {server_name}", server_name=server_name)
            except Exception as e:
                logger.error("Failed to connect MCP server: {server_name}", server_name=server_name)
                server_info.status = "failed"
    
    # 3. 异步或同步加载
    if in_background:
        self._mcp_loading_task = asyncio.create_task(_connect())
    else:
        await _connect()

2.3 等待加载完成

async def wait_for_mcp_tools(self) -> None:
    """等待后台 MCP 工具加载完成"""
    task = self._mcp_loading_task
    if not task:
        return
    try:
        await task
    finally:
        if self._mcp_loading_task is task and task.done():
            self._mcp_loading_task = None

3. MCP 工具调用

3.1 调用流程

用户请求 → Agent → LLM → 工具调用 → Toolset.handle()
         → MCPTool.__call__() → 批准 → fastmcp.client.call_tool()
         → MCP 服务器 → 返回结果 → 转换 → Agent → 用户

3.2 MCPTool 实现

async def __call__(self, *args: Any, **kwargs: Any) -> ToolReturnValue:
    # 1. 构建操作描述
    description = f"Call MCP tool `{self._mcp_tool.name}`."
    
    # 2. 请求用户批准
    if not await self._runtime.approval.request(
        self.name, 
        self._action_name, 
        description
    ):
        return ToolRejectedError()
    
    # 3. 调用 MCP 工具
    try:
        async with self._client as client:
            result = await client.call_tool(
                self._mcp_tool.name,
                kwargs,
                timeout=self._timeout,
                raise_on_error=False,
            )
            return convert_mcp_tool_result(result)
    except Exception as e:
        exc_msg = str(e).lower()
        if "timeout" in exc_msg or "timed out" in exc_msg:
            return ToolError(
                message=(
                    f"Timeout while calling MCP tool `{self._mcp_tool.name}`. "
                    "You may explain to the user that the timeout config is set too low."
                ),
                brief="Timeout",
            )
        raise

3.3 结果转换

def convert_mcp_tool_result(result: CallToolResult) -> ToolReturnValue:
    """将 MCP 工具结果转换为 kosong 工具返回值"""
    content: list[ContentPart] = []
    for part in result.content:
        content.append(convert_mcp_content(part))
    
    if result.is_error:
        return ToolError(
            output=content,
            message="Tool returned an error. The output may be error message or incomplete output",
            brief="",
        )
    else:
        return ToolOk(output=content)

4. MCP 服务器类型

4.1 HTTP 服务器

使用 HTTP 协议连接的 MCP 服务器:

case acp.schema.HttpMcpServer():
    return {
        "url": server.url,
        "transport": "http",
        "headers": {header.name: header.value for header in server.headers},
    }

配置示例:

[mcp.servers.context7]
url = "https://mcp.context7.com/mcp"
transport = "http"
headers = { CONTEXT7_API_KEY = "your-key" }

4.2 SSE 服务器

使用 Server-Sent Events 协议连接的 MCP 服务器:

case acp.schema.SseMcpServer():
    return {
        "url": server.url,
        "transport": "sse",
        "headers": {header.name: header.value for header in server.headers},
    }

配置示例:

[mcp.servers.my-server]
url = "https://my-server.com/mcp"
transport = "sse"
headers = { Authorization = "Bearer token" }

4.3 Stdio 服务器

使用标准输入/输出连接的本地 MCP 服务器:

case acp.schema.McpServerStdio():
    return {
        "command": server.command,
        "args": server.args,
        "env": {item.name: item.value for item in server.env},
        "transport": "stdio",
    }

配置示例:

[mcp.servers.chrome-devtools]
command = "npx"
args = ["-y", "chrome-devtools-mcp@latest"]
transport = "stdio"

5. OAuth 认证

5.1 OAuth 支持

MCP 服务器可以配置 OAuth 认证:

async def _check_oauth_tokens(server_url: str) -> bool:
    """检查 OAuth 令牌是否存在"""
    try:
        from fastmcp.client.auth.oauth import FileTokenStorage
        
        storage = FileTokenStorage(server_url=server_url)
        tokens = await storage.get_tokens()
        return tokens is not None
    except Exception:
        return False

5.2 认证流程

oauth_servers: dict[str, str] = {}

# 识别 OAuth 服务器
for server_name, server_config in mcp_config.mcpServers.items():
    if isinstance(server_config, RemoteMCPServer) and server_config.auth == "oauth":
        oauth_servers[server_name] = server_config.url

# 检查令牌
for server_name, server_info in self._mcp_servers.items():
    server_url = oauth_servers.get(server_name)
    if not server_url:
        continue
    if not await _check_oauth_tokens(server_url):
        logger.warning(
            "Skipping OAuth MCP server '{server_name}': not authorized. "
            "Run 'kimi mcp auth {server_name}' first.",
            server_name=server_name,
        )
        server_info.status = "unauthorized"

5.3 认证命令

# 授权 MCP 服务器
kimi mcp auth linear

6. MCP 配置

6.1 配置文件位置

  • 用户配置: ~/.kimi/config.toml
  • 临时配置: 通过 --mcp-config-file 参数

6.2 配置格式

[mcp.client]
tool_call_timeout_ms = 60000  # 工具调用超时时间

[mcp.servers]
# HTTP 服务器
[mcp.servers.context7]
url = "https://mcp.context7.com/mcp"
transport = "http"
headers = { CONTEXT7_API_KEY = "your-key" }

# SSE 服务器
[mcp.servers.sse-server]
url = "https://sse-server.com/mcp"
transport = "sse"
headers = { Authorization = "Bearer token" }

# Stdio 服务器
[mcp.servers.chrome-devtools]
command = "npx"
args = ["-y", "chrome-devtools-mcp@latest"]
transport = "stdio"

# OAuth 服务器
[mcp.servers.linear]
url = "https://mcp.linear.app/mcp"
transport = "http"
auth = "oauth"

6.3 配置加载

MCP 配置从多个来源加载:

  1. ACP 客户端传递的配置
  2. 配置文件中的配置
  3. 命令行参数指定的配置
# 从 ACP 转换 MCP 配置
def acp_mcp_servers_to_mcp_config(mcp_servers: list[MCPServer]) -> MCPConfig:
    if not mcp_servers:
        return MCPConfig()
    
    return MCPConfig.model_validate(
        {"mcpServers": {
            server.name: _convert_acp_mcp_server(server) 
            for server in mcp_servers
        }}
    )

7. MCP 命令行接口

7.1 kimi mcp 命令

# 添加服务器
kimi mcp add --transport http context7 https://mcp.context7.com/mcp
kimi mcp add --transport stdio chrome-devtools -- npx chrome-devtools-mcp@latest
kimi mcp add --transport http --auth oauth linear https://mcp.linear.app/mcp

# 列出服务器
kimi mcp list

# 删除服务器
kimi mcp remove context7

# 授权服务器
kimi mcp auth linear

7.2 查看连接状态

在 Kimi-CLI 运行时,输入 /mcp 查看已连接的服务器和加载的工具:

/mcp
Connected MCP servers:
- context7 (HTTP): 5 tools loaded
- linear (HTTP, OAuth): 12 tools loaded
- chrome-devtools (Stdio): 3 tools loaded

8. MCP 工具示例

8.1 Context7 MCP 工具

提供 API 文档和代码示例搜索:

工具列表:
- search_api_docs: 搜索 API 文档
- search_code_examples: 搜索代码示例
- get_api_details: 获取 API 详细信息

8.2 Linear MCP 工具

提供项目管理功能:

工具列表:
- list_issues: 列出问题
- create_issue: 创建问题
- update_issue: 更新问题
- add_comment: 添加评论

8.3 PostgreSQL MCP 工具

提供数据库访问:

工具列表:
- execute_query: 执行 SQL 查询
- list_tables: 列出表
- describe_table: 描述表结构

9. MCP 与其他系统的集成

9.1 ACP 集成

ACP 客户端可以传递 MCP 配置给 Kimi-CLI:

# src/kimi_cli/acp/mcp.py
def acp_mcp_servers_to_mcp_config(mcp_servers: list[MCPServer]) -> MCPConfig:
    """将 ACP MCP 服务器配置转换为 fastmcp MCP 配置"""
    if not mcp_servers:
        return MCPConfig()
    
    try:
        return MCPConfig.model_validate(
            {"mcpServers": {
                server.name: _convert_acp_mcp_server(server) 
                for server in mcp_servers
            }}
        )
    except ValidationError as exc:
        raise MCPConfigError(f"Invalid MCP config from ACP client: {exc}") from exc

9.2 Wire 协议集成

MCP 工具调用通过 Wire 协议传递批准请求:

# 请求批准
if not await self._runtime.approval.request(
    self.name, 
    self._action_name, 
    description
):
    return ToolRejectedError()

# 发送批准请求到 Wire
wire_request = ApprovalRequest(
    id=request.id,
    action=request.action,
    description=request.description,
    sender=request.sender,
    tool_call_id=request.tool_call_id,
    display=request.display,
)
wire_send(wire_request)

10. 错误处理

10.1 连接失败

except Exception as e:
    logger.error(
        "Failed to connect MCP server: {server_name}, error: {error}",
        server_name=server_name,
        error=e,
    )
    server_info.status = "failed"
    return server_name, e

10.2 超时处理

except Exception as e:
    exc_msg = str(e).lower()
    if "timeout" in exc_msg or "timed out" in exc_msg:
        return ToolError(
            message=(
                f"Timeout while calling MCP tool `{self._mcp_tool.name}`. "
                "You may explain to the user that the timeout config is set too low."
            ),
            brief="Timeout",
        )
    raise

10.3 未授权

if not await _check_oauth_tokens(server_url):
    logger.warning(
        "Skipping OAuth MCP server '{server_name}': not authorized. "
        "Run 'kimi mcp auth {server_name}' first.",
        server_name=server_name,
    )
    server_info.status = "unauthorized"

11. 性能优化

11.1 异步加载

MCP 工具在后台异步加载,不阻塞 Agent 启动:

if in_background:
    self._mcp_loading_task = asyncio.create_task(_connect())
else:
    await _connect()

11.2 并行连接

多个 MCP 服务器并行连接:

tasks = [
    asyncio.create_task(_connect_server(server_name, server_info))
    for server_name, server_info in self._mcp_servers.items()
    if server_info.status == "pending"
]
results = await asyncio.gather(*tasks) if tasks else []

11.3 超时控制

配置工具调用超时:

[mcp.client]
tool_call_timeout_ms = 60000  # 60 秒

12. 安全考虑

12.1 批准机制

所有 MCP 工具调用都需要用户批准:

if not await self._runtime.approval.request(
    self.name, 
    self._action_name, 
    description
):
    return ToolRejectedError()

12.2 权限控制

通过 OAuth 实现细粒度权限控制:

if server_config.auth == "oauth":
    # 检查 OAuth 令牌
    if not await _check_oauth_tokens(server_url):
        server_info.status = "unauthorized"

12.3 环境隔离

Stdio 服务器在独立进程中运行,通过标准输入输出通信:

case acp.schema.McpServerStdio():
    return {
        "command": server.command,
        "args": server.args,
        "env": {item.name: item.value for item in server.env},
        "transport": "stdio",
    }