Logo
热心市民王先生

Kimi-CLI 工具注册和调用机制

技术研究 人工智能 LLM

Kimi-CLI 的工具系统采用依赖注入和异步调用的架构,支持动态工具加载和灵活的工具调度。核心组件包括: - 工具定义 工具接口和实现 - 工具注册 工具发现和注册机制 - 工具调用 工具执行和结果处理 - 工具调度 LLM 到工具的桥接

概述

Kimi-CLI 的工具系统采用依赖注入和异步调用的架构,支持动态工具加载和灵活的工具调度。核心组件包括:

  • 工具定义: 工具接口和实现
  • 工具注册: 工具发现和注册机制
  • 工具调用: 工具执行和结果处理
  • 工具调度: LLM 到工具的桥接

1. 工具定义

1.1 工具基类

工具继承自 CallableToolCallableTool2

# rust/kosong/src/tooling/mod.rs
pub trait CallableTool: Send + Sync {
    fn name(&self) -> &str;
    fn description(&self) -> &str;
    fn parameters(&self) -> Value;
    
    async fn call(&self, args: Value) -> Result<ToolReturnValue>;
}

pub trait CallableTool2<T>: Send + Sync {
    fn name(&self) -> &str;
    fn description(&self) -> &str;
    fn parameters(&self) -> Value;
    
    async fn call(&self, params: T) -> Result<ToolReturnValue>;
}

1.2 工具返回值

from kosong.tooling import ToolOk, ToolError

# 成功返回
return ToolOk(
    output=[TextPart(text="操作成功的结果")]
)

# 错误返回
return ToolError(
    message="操作失败的描述",
    brief="简短错误信息",
    output=[TextPart(text="错误详情")]
)

# 拒绝返回(用户拒绝批准)
return ToolRejectedError()

1.3 工具参数模型

使用 Pydantic 定义参数模型:

from pydantic import BaseModel, Field

class Params(BaseModel):
    query: str = Field(description="搜索查询文本")
    limit: int = Field(
        default=5,
        ge=1,
        le=20,
        description="返回结果数量"
    )
    include_content: bool = Field(
        default=False,
        description="是否包含网页内容"
    )

1.4 工具描述

工具描述从 Markdown 文件加载:

from kimi_cli.tools.utils import load_desc

class SearchWeb(CallableTool2[Params]):
    name: str = "SearchWeb"
    description: str = load_desc(
        Path(__file__).parent / "search.md", 
        {}
    )

search.md:

WebSearch tool allows you to search on the internet to get latest information, including news, documents, release notes, blog posts, papers, etc.

2. 工具注册

2.1 工具集初始化

KimiToolset 管理所有工具:

# src/kimi_cli/soul/toolset.py
class KimiToolset:
    def __init__(self) -> None:
        self._tool_dict: dict[str, ToolType] = {}
        self._mcp_servers: dict[str, MCPServerInfo] = {}
        self._mcp_loading_task: asyncio.Task[None] | None = None
    
    def add(self, tool: ToolType) -> None:
        """添加工具到工具集"""
        self._tool_dict[tool.name] = tool

2.2 工具发现和加载

从配置文件加载工具:

# 在 agent.yaml 中配置
tools:
  - "kimi_cli.tools.multiagent:Task"
  - "kimi_cli.tools.shell:Shell"
  - "kimi_cli.tools.file:ReadFile"
  - "kimi_cli.tools.web:SearchWeb"

加载实现:

def load_tools(self, tool_paths: list[str], dependencies: dict[type[Any], Any]) -> None:
    good_tools: list[str] = []
    bad_tools: list[str] = []
    
    for tool_path in tool_paths:
        try:
            tool = self._load_tool(tool_path, dependencies)
        except SkipThisTool:
            logger.info("Skipping tool: {tool_path}", tool_path=tool_path)
            continue
        
        if tool:
            self.add(tool)
            good_tools.append(tool_path)
        else:
            bad_tools.append(tool_path)
    
    logger.info("Loaded tools: {good_tools}", good_tools=good_tools)
    if bad_tools:
        raise InvalidToolError(f"Invalid tools: {bad_tools}")

2.3 依赖注入机制

通过类型注解自动注入依赖:

@staticmethod
def _load_tool(tool_path: str, dependencies: dict[type[Any], Any]) -> ToolType | None:
    logger.debug("Loading tool: {tool_path}", tool_path=tool_path)
    
    # 1. 解析模块和类名
    module_name, class_name = tool_path.rsplit(":", 1)
    
    # 2. 导入模块
    try:
        module = importlib.import_module(module_name)
    except ImportError:
        return None
    
    # 3. 获取工具类
    tool_cls = getattr(module, class_name, None)
    if tool_cls is None:
        return None
    
    # 4. 注入依赖
    args: list[Any] = []
    if "__init__" in tool_cls.__dict__:
        for param in inspect.signature(tool_cls).parameters.values():
            if param.kind == inspect.Parameter.KEYWORD_ONLY:
                break
            if param.annotation not in dependencies:
                raise ValueError(f"Tool dependency not found: {param.annotation}")
            args.append(dependencies[param.annotation])
    
    return tool_cls(*args)

2.4 依赖字典

运行时构建依赖字典:

# src/kimi_cli/soul/agent.py
tool_deps = {
    KimiToolset: toolset,
    Runtime: runtime,
    Config: runtime.config,
    BuiltinSystemPromptArgs: runtime.builtin_args,
    Session: runtime.session,
    DenwaRenji: runtime.denwa_renji,
    Approval: runtime.approval,
    LaborMarket: runtime.labor_market,
    Environment: runtime.environment,
}

toolset.load_tools(tools, tool_deps)

2.5 工具查找

@overload
def find(self, tool_name_or_type: str) -> ToolType | None: ...

@overload
def find[T: ToolType](self, tool_name_or_type: type[T]) -> T | None: ...

def find(self, tool_name_or_type: str | type[ToolType]) -> ToolType | None:
    if isinstance(tool_name_or_type, str):
        return self._tool_dict.get(tool_name_or_type)
    else:
        for tool in self._tool_dict.values():
            if isinstance(tool, tool_name_or_type):
                return tool
    return None

3. 工具调用流程

3.1 调用链路

用户请求 → Agent → LLM 生成 → 工具调用 → Toolset.handle()
         → 找到工具 → 解析参数 → 调用工具 → 返回结果
         → Agent → LLM → 响应 → 用户

3.2 Kosong Step 函数

rust/kosong/src/lib.rs:

pub async fn step(
    chat_provider: &dyn ChatProvider,
    system_prompt: &str,
    toolset: &dyn Toolset,
    history: &[Message],
    message_part_tx: Option<mpsc::UnboundedSender<StreamedMessagePart>>,
    tool_result_tx: Option<mpsc::UnboundedSender<ToolResult>>,
) -> Result<StepResult, ChatProviderError> {
    let mut tool_calls = Vec::new();
    let mut tool_result_receivers: HashMap<String, oneshot::Receiver<anyhow::Result<ToolResult>>> =
        HashMap::new();

    // 生成 LLM 响应
    let result = {
        let tool_calls_ref = &mut tool_calls;
        let tool_results_ref = &mut tool_result_receivers;
        let tool_result_tx = tool_result_tx.clone();

        let mut on_tool_call = move |tool_call: ToolCall| {
            tool_calls_ref.push(tool_call.clone());
            let (result_tx, result_rx) = oneshot::channel();
            tool_results_ref.insert(tool_call.id.clone(), result_rx);
            
            let result = toolset.handle(tool_call.clone());
            match result {
                ToolResultFuture::Immediate(res) => {
                    if let Some(tx) = tool_result_tx.as_ref() {
                        let _ = tx.send(res.clone());
                    }
                    let _ = result_tx.send(Ok(res));
                }
                ToolResultFuture::Pending(fut) => {
                    let tool_result_tx = tool_result_tx.clone();
                    let tool_call_id = tool_call.id.clone();
                    tokio::spawn(async move {
                        let result = match fut.await {
                            Ok(res) => res,
                            Err(err) => ToolResult {
                                tool_call_id,
                                return_value: tool_runtime_error(&err.to_string()),
                            },
                        };
                        if let Some(tx) = tool_result_tx {
                            let _ = tx.send(result.clone());
                        }
                        let _ = result_tx.send(Ok(result));
                    });
                }
            }
        };

        generate::generate(
            chat_provider,
            system_prompt,
            toolset.tools(),
            history,
            message_part_tx,
            Some(&mut on_tool_call),
        ).await?
    };

    Ok(StepResult {
        id: result.id,
        message: result.message,
        usage: result.usage,
        tool_calls,
        tool_result_receivers,
    })
}

3.3 Toolset Handle 方法

src/kimi_cli/soul/toolset.py:

def handle(self, tool_call: ToolCall) -> HandleResult:
    # 设置当前工具调用上下文
    token = current_tool_call.set(tool_call)
    try:
        # 查找工具
        if tool_call.function.name not in self._tool_dict:
            return ToolResult(
                tool_call_id=tool_call.id,
                return_value=ToolNotFoundError(tool_call.function.name),
            )
        
        tool = self._tool_dict[tool_call.function.name]
        
        # 解析参数
        try:
            arguments: JsonType = json.loads(tool_call.function.arguments or "{}")
        except json.JSONDecodeError as e:
            return ToolResult(
                tool_call_id=tool_call.id, 
                return_value=ToolParseError(str(e))
            )
        
        # 创建异步任务
        async def _call():
            try:
                ret = await tool.call(arguments)
                return ToolResult(tool_call_id=tool_call.id, return_value=ret)
            except Exception as e:
                return ToolResult(
                    tool_call_id=tool_call.id, 
                    return_value=ToolRuntimeError(str(e))
                )
        
        return asyncio.create_task(_call())
    finally:
        current_tool_call.reset(token)

3.4 工具调用上下文

# 当前工具调用的上下文变量
current_tool_call = ContextVar[ToolCall | None]("current_tool_call", default=None)

def get_current_tool_call_or_none() -> ToolCall | None:
    """获取当前工具调用或 None"""
    return current_tool_call.get()

# 在工具中使用
class SearchWeb(CallableTool2[Params]):
    async def __call__(self, params: Params) -> ToolReturnValue:
        tool_call = get_current_tool_call_or_none()
        # 使用 tool_call.id 作为追踪 ID
        pass

4. 工具执行

4.1 同步和异步工具

工具可以是同步或异步的:

# 同步工具
class EchoTool(CallableTool2[Params]):
    async def __call__(self, params: Params) -> ToolReturnValue:
        return ToolOk(output=[TextPart(text=params.text)])

# 异步工具
class SearchWeb(CallableTool2[Params]):
    async def __call__(self, params: Params) -> ToolReturnValue:
        async with session.post(url, json=data) as response:
            return ToolOk(output=[TextPart(text=await response.text())])

4.2 批准机制

工具调用需要用户批准:

# 在工具中请求批准
async def __call__(self, params: Params) -> ToolReturnValue:
    description = f"Call MCP tool `{self._mcp_tool.name}`."
    
    if not await self._runtime.approval.request(
        self.name, 
        self._action_name, 
        description
    ):
        return ToolRejectedError()
    
    # 执行实际操作
    pass

4.3 错误处理

try:
    ret = await tool.call(arguments)
    return ToolResult(tool_call_id=tool_call.id, return_value=ret)
except Exception as e:
    return ToolResult(
        tool_call_id=tool_call.id, 
        return_value=ToolRuntimeError(str(e))
    )

5. 工具调度

5.1 LLM 到工具

LLM 根据工具描述决定调用哪个工具:

# 工具列表传递给 LLM
toolset.tools()  # 返回所有工具的列表

# LLM 决定调用
tool_calls = [
    ToolCall(
        id="call_123",
        function=FunctionCall(
            name="SearchWeb",
            arguments='{"query": "Python 3.13", "limit": 5}'
        )
    )
]

5.2 并行工具调用

支持并行调用多个工具:

# LLM 可以生成多个工具调用
tool_calls = [
    ToolCall(
        id="call_1",
        function=FunctionCall(name="ReadFile", arguments='{"path": "file1.txt"}')
    ),
    ToolCall(
        id="call_2",
        function=FunctionCall(name="ReadFile", arguments='{"path": "file2.txt"}')
    ),
]

# 并行执行
for tool_call in tool_calls:
    result = toolset.handle(tool_call)
    results.append(await result)

5.3 工具结果收集

# rust/kosong/src/lib.rs
impl StepResult {
    pub async fn tool_results(&mut self) -> anyhow::Result<Vec<ToolResult>> {
        let mut results = Vec::new();
        for tool_call in &self.tool_calls {
            if let Some(rx) = self.tool_result_receivers.remove(&tool_call.id) {
                let result = rx.await??;
                results.push(result?);
            }
        }
        Ok(results)
    }
}

6. 工具结果处理

6.1 结果转换

工具结果转换为消息:

# src/kimi_cli/soul/message.py
def tool_result_to_message(tr: ToolResult) -> Message:
    if isinstance(tr.return_value, ToolOk):
        return Message(
            role="tool",
            content=[
                TextPart(text=tr.return_value.message or "")
            ] + tr.return_value.output,
            tool_call_id=tr.tool_call_id,
        )
    elif isinstance(tr.return_value, ToolError):
        return Message(
            role="tool",
            content=[
                TextPart(text=tr.return_value.message)
            ],
            tool_call_id=tr.tool_call_id,
        )

6.2 结果追加到上下文

async def _grow_context(self, result: StepResult, tool_results: list[ToolResult]):
    # 添加助手消息
    await self._context.append_message(result.message)
    
    # 添加工具结果消息
    tool_messages = [tool_result_to_message(tr) for tr in tool_results]
    await self._context.append_message(tool_messages)

7. 特殊工具

7.1 SkipThisTool

工具可以决定跳过自己:

# src/kimi_cli/tools/__init__.py
class SkipThisTool(Exception):
    """当工具决定跳过加载时抛出"""
    pass

# 在工具中
class SearchWeb(CallableTool2[Params]):
    def __init__(self, config: Config, runtime: Runtime):
        if config.services.moonshot_search is None:
            raise SkipThisTool()  # 跳过加载
        super().__init__()

7.2 外部工具

通过 Wire 协议调用外部工具:

class WireExternalTool(CallableTool):
    def __init__(self, *, name: str, description: str, parameters: dict[str, Any]) -> None:
        super().__init__(
            name=name,
            description=description or "No description provided.",
            parameters=parameters,
        )
    
    async def __call__(self, *args: Any, **kwargs: Any) -> ToolReturnValue:
        tool_call = get_current_tool_call_or_none()
        if tool_call is None:
            return ToolError(
                message="External tool calls must be invoked from a tool call context.",
                brief="Invalid tool call",
            )
        
        from kimi_cli.soul import get_wire_or_none
        wire = get_wire_or_none()
        if wire is None:
            return ToolError(
                message="Wire is not available for external tool calls.",
                brief="Wire unavailable",
            )
        
        # 发送到 Wire
        external_tool_call = ToolCallRequest.from_tool_call(tool_call)
        wire.soul_side.send(external_tool_call)
        try:
            return await external_tool_call.wait()
        except asyncio.CancelledError:
            raise
        except Exception as e:
            return ToolError(
                message=f"External tool call failed: {e}",
                brief="External tool error",
            )

8. 工具链和工具流

8.1 工具链示例

用户: 分析一个项目的代码

1. Glob("**/*.py")
   → 列出所有 Python 文件

2. ReadFile("main.py")
   → 读取主文件

3. ReadFile("utils.py")
   → 读取工具文件

4. Shell("python -m pytest")
   → 运行测试

5. 总结分析结果

8.2 工具流控制

# 工具结果决定下一步
if result.tool_calls:
    # 有工具调用,继续
    return None
else:
    # 无工具调用,结束
    return StepOutcome(stop_reason="no_tool_calls", assistant_message=result.message)

8.3 工具链优化

  • 并行调用独立的工具
  • 串行调用有依赖的工具
  • 批量处理相似的工具调用

9. 工具监控和调试

9.1 工具调用日志

logger.debug("Loading tool: {tool_path}", tool_path=tool_path)
logger.info("Loaded tools: {good_tools}", good_tools=good_tools)

9.2 工具错误日志

except Exception as e:
    logger.exception("External tool call failed: {tool_name}", tool_name=self.name)
    return ToolError(
        message=f"External tool call failed: {e}",
        brief="External tool error",
    )

9.3 工具性能监控

# 在 Rust 中测量工具执行时间
let start = Instant::now();
let result = tool.call(args).await?;
let duration = start.elapsed();
logger.debug!("Tool {} took {:?}", tool.name(), duration);

10. 工具安全

10.1 参数验证

# Pydantic 自动验证参数
class Params(BaseModel):
    query: str = Field(description="搜索查询文本")
    limit: int = Field(default=5, ge=1, le=20)

10.2 路径验证

# 文件工具中的路径验证
def _normalize_path(path: str) -> str:
    cwd = str(KaosPath.cwd().canonical())
    if path.startswith(cwd):
        path = path[len(cwd):].lstrip("/\\")
    return path

10.3 批准机制

所有工具调用都需要批准:

if not await self._runtime.approval.request(
    self.name, 
    self._action_name, 
    description
):
    return ToolRejectedError()

11. 工具扩展

11.1 添加新工具

  1. 创建工具类
  2. 实现参数模型
  3. 实现 __call__ 方法
  4. 在 agent.yaml 中注册
# my_tool.py
class MyTool(CallableTool2[Params]):
    name: str = "MyTool"
    description: str = "我的自定义工具"
    params: type[Params] = Params
    
    async def __call__(self, params: Params) -> ToolReturnValue:
        return ToolOk(output=[TextPart(text="执行成功")])
# agent.yaml
tools:
  - "my_tools.my_tool:MyTool"

11.2 工具库扩展

可以创建工具库:

my_tool_library/
├── __init__.py
├── file_tools.py
├── web_tools.py
└── database_tools.py

11.3 工具动态加载

运行时动态加载工具:

toolset.load_tools(
    tool_paths=["custom_tools.tool:CustomTool"],
    dependencies=tool_deps
)