Kimi-CLI 工具注册和调用机制
技术研究 人工智能 LLM
Kimi-CLI 的工具系统采用依赖注入和异步调用的架构,支持动态工具加载和灵活的工具调度。核心组件包括: - 工具定义 工具接口和实现 - 工具注册 工具发现和注册机制 - 工具调用 工具执行和结果处理 - 工具调度 LLM 到工具的桥接
概述
Kimi-CLI 的工具系统采用依赖注入和异步调用的架构,支持动态工具加载和灵活的工具调度。核心组件包括:
- 工具定义: 工具接口和实现
- 工具注册: 工具发现和注册机制
- 工具调用: 工具执行和结果处理
- 工具调度: LLM 到工具的桥接
1. 工具定义
1.1 工具基类
工具继承自 CallableTool 或 CallableTool2:
# rust/kosong/src/tooling/mod.rs
pub trait CallableTool: Send + Sync {
fn name(&self) -> &str;
fn description(&self) -> &str;
fn parameters(&self) -> Value;
async fn call(&self, args: Value) -> Result<ToolReturnValue>;
}
pub trait CallableTool2<T>: Send + Sync {
fn name(&self) -> &str;
fn description(&self) -> &str;
fn parameters(&self) -> Value;
async fn call(&self, params: T) -> Result<ToolReturnValue>;
}
1.2 工具返回值
from kosong.tooling import ToolOk, ToolError
# 成功返回
return ToolOk(
output=[TextPart(text="操作成功的结果")]
)
# 错误返回
return ToolError(
message="操作失败的描述",
brief="简短错误信息",
output=[TextPart(text="错误详情")]
)
# 拒绝返回(用户拒绝批准)
return ToolRejectedError()
1.3 工具参数模型
使用 Pydantic 定义参数模型:
from pydantic import BaseModel, Field
class Params(BaseModel):
query: str = Field(description="搜索查询文本")
limit: int = Field(
default=5,
ge=1,
le=20,
description="返回结果数量"
)
include_content: bool = Field(
default=False,
description="是否包含网页内容"
)
1.4 工具描述
工具描述从 Markdown 文件加载:
from kimi_cli.tools.utils import load_desc
class SearchWeb(CallableTool2[Params]):
name: str = "SearchWeb"
description: str = load_desc(
Path(__file__).parent / "search.md",
{}
)
search.md:
WebSearch tool allows you to search on the internet to get latest information, including news, documents, release notes, blog posts, papers, etc.
2. 工具注册
2.1 工具集初始化
KimiToolset 管理所有工具:
# src/kimi_cli/soul/toolset.py
class KimiToolset:
def __init__(self) -> None:
self._tool_dict: dict[str, ToolType] = {}
self._mcp_servers: dict[str, MCPServerInfo] = {}
self._mcp_loading_task: asyncio.Task[None] | None = None
def add(self, tool: ToolType) -> None:
"""添加工具到工具集"""
self._tool_dict[tool.name] = tool
2.2 工具发现和加载
从配置文件加载工具:
# 在 agent.yaml 中配置
tools:
- "kimi_cli.tools.multiagent:Task"
- "kimi_cli.tools.shell:Shell"
- "kimi_cli.tools.file:ReadFile"
- "kimi_cli.tools.web:SearchWeb"
加载实现:
def load_tools(self, tool_paths: list[str], dependencies: dict[type[Any], Any]) -> None:
good_tools: list[str] = []
bad_tools: list[str] = []
for tool_path in tool_paths:
try:
tool = self._load_tool(tool_path, dependencies)
except SkipThisTool:
logger.info("Skipping tool: {tool_path}", tool_path=tool_path)
continue
if tool:
self.add(tool)
good_tools.append(tool_path)
else:
bad_tools.append(tool_path)
logger.info("Loaded tools: {good_tools}", good_tools=good_tools)
if bad_tools:
raise InvalidToolError(f"Invalid tools: {bad_tools}")
2.3 依赖注入机制
通过类型注解自动注入依赖:
@staticmethod
def _load_tool(tool_path: str, dependencies: dict[type[Any], Any]) -> ToolType | None:
logger.debug("Loading tool: {tool_path}", tool_path=tool_path)
# 1. 解析模块和类名
module_name, class_name = tool_path.rsplit(":", 1)
# 2. 导入模块
try:
module = importlib.import_module(module_name)
except ImportError:
return None
# 3. 获取工具类
tool_cls = getattr(module, class_name, None)
if tool_cls is None:
return None
# 4. 注入依赖
args: list[Any] = []
if "__init__" in tool_cls.__dict__:
for param in inspect.signature(tool_cls).parameters.values():
if param.kind == inspect.Parameter.KEYWORD_ONLY:
break
if param.annotation not in dependencies:
raise ValueError(f"Tool dependency not found: {param.annotation}")
args.append(dependencies[param.annotation])
return tool_cls(*args)
2.4 依赖字典
运行时构建依赖字典:
# src/kimi_cli/soul/agent.py
tool_deps = {
KimiToolset: toolset,
Runtime: runtime,
Config: runtime.config,
BuiltinSystemPromptArgs: runtime.builtin_args,
Session: runtime.session,
DenwaRenji: runtime.denwa_renji,
Approval: runtime.approval,
LaborMarket: runtime.labor_market,
Environment: runtime.environment,
}
toolset.load_tools(tools, tool_deps)
2.5 工具查找
@overload
def find(self, tool_name_or_type: str) -> ToolType | None: ...
@overload
def find[T: ToolType](self, tool_name_or_type: type[T]) -> T | None: ...
def find(self, tool_name_or_type: str | type[ToolType]) -> ToolType | None:
if isinstance(tool_name_or_type, str):
return self._tool_dict.get(tool_name_or_type)
else:
for tool in self._tool_dict.values():
if isinstance(tool, tool_name_or_type):
return tool
return None
3. 工具调用流程
3.1 调用链路
用户请求 → Agent → LLM 生成 → 工具调用 → Toolset.handle()
→ 找到工具 → 解析参数 → 调用工具 → 返回结果
→ Agent → LLM → 响应 → 用户
3.2 Kosong Step 函数
rust/kosong/src/lib.rs:
pub async fn step(
chat_provider: &dyn ChatProvider,
system_prompt: &str,
toolset: &dyn Toolset,
history: &[Message],
message_part_tx: Option<mpsc::UnboundedSender<StreamedMessagePart>>,
tool_result_tx: Option<mpsc::UnboundedSender<ToolResult>>,
) -> Result<StepResult, ChatProviderError> {
let mut tool_calls = Vec::new();
let mut tool_result_receivers: HashMap<String, oneshot::Receiver<anyhow::Result<ToolResult>>> =
HashMap::new();
// 生成 LLM 响应
let result = {
let tool_calls_ref = &mut tool_calls;
let tool_results_ref = &mut tool_result_receivers;
let tool_result_tx = tool_result_tx.clone();
let mut on_tool_call = move |tool_call: ToolCall| {
tool_calls_ref.push(tool_call.clone());
let (result_tx, result_rx) = oneshot::channel();
tool_results_ref.insert(tool_call.id.clone(), result_rx);
let result = toolset.handle(tool_call.clone());
match result {
ToolResultFuture::Immediate(res) => {
if let Some(tx) = tool_result_tx.as_ref() {
let _ = tx.send(res.clone());
}
let _ = result_tx.send(Ok(res));
}
ToolResultFuture::Pending(fut) => {
let tool_result_tx = tool_result_tx.clone();
let tool_call_id = tool_call.id.clone();
tokio::spawn(async move {
let result = match fut.await {
Ok(res) => res,
Err(err) => ToolResult {
tool_call_id,
return_value: tool_runtime_error(&err.to_string()),
},
};
if let Some(tx) = tool_result_tx {
let _ = tx.send(result.clone());
}
let _ = result_tx.send(Ok(result));
});
}
}
};
generate::generate(
chat_provider,
system_prompt,
toolset.tools(),
history,
message_part_tx,
Some(&mut on_tool_call),
).await?
};
Ok(StepResult {
id: result.id,
message: result.message,
usage: result.usage,
tool_calls,
tool_result_receivers,
})
}
3.3 Toolset Handle 方法
src/kimi_cli/soul/toolset.py:
def handle(self, tool_call: ToolCall) -> HandleResult:
# 设置当前工具调用上下文
token = current_tool_call.set(tool_call)
try:
# 查找工具
if tool_call.function.name not in self._tool_dict:
return ToolResult(
tool_call_id=tool_call.id,
return_value=ToolNotFoundError(tool_call.function.name),
)
tool = self._tool_dict[tool_call.function.name]
# 解析参数
try:
arguments: JsonType = json.loads(tool_call.function.arguments or "{}")
except json.JSONDecodeError as e:
return ToolResult(
tool_call_id=tool_call.id,
return_value=ToolParseError(str(e))
)
# 创建异步任务
async def _call():
try:
ret = await tool.call(arguments)
return ToolResult(tool_call_id=tool_call.id, return_value=ret)
except Exception as e:
return ToolResult(
tool_call_id=tool_call.id,
return_value=ToolRuntimeError(str(e))
)
return asyncio.create_task(_call())
finally:
current_tool_call.reset(token)
3.4 工具调用上下文
# 当前工具调用的上下文变量
current_tool_call = ContextVar[ToolCall | None]("current_tool_call", default=None)
def get_current_tool_call_or_none() -> ToolCall | None:
"""获取当前工具调用或 None"""
return current_tool_call.get()
# 在工具中使用
class SearchWeb(CallableTool2[Params]):
async def __call__(self, params: Params) -> ToolReturnValue:
tool_call = get_current_tool_call_or_none()
# 使用 tool_call.id 作为追踪 ID
pass
4. 工具执行
4.1 同步和异步工具
工具可以是同步或异步的:
# 同步工具
class EchoTool(CallableTool2[Params]):
async def __call__(self, params: Params) -> ToolReturnValue:
return ToolOk(output=[TextPart(text=params.text)])
# 异步工具
class SearchWeb(CallableTool2[Params]):
async def __call__(self, params: Params) -> ToolReturnValue:
async with session.post(url, json=data) as response:
return ToolOk(output=[TextPart(text=await response.text())])
4.2 批准机制
工具调用需要用户批准:
# 在工具中请求批准
async def __call__(self, params: Params) -> ToolReturnValue:
description = f"Call MCP tool `{self._mcp_tool.name}`."
if not await self._runtime.approval.request(
self.name,
self._action_name,
description
):
return ToolRejectedError()
# 执行实际操作
pass
4.3 错误处理
try:
ret = await tool.call(arguments)
return ToolResult(tool_call_id=tool_call.id, return_value=ret)
except Exception as e:
return ToolResult(
tool_call_id=tool_call.id,
return_value=ToolRuntimeError(str(e))
)
5. 工具调度
5.1 LLM 到工具
LLM 根据工具描述决定调用哪个工具:
# 工具列表传递给 LLM
toolset.tools() # 返回所有工具的列表
# LLM 决定调用
tool_calls = [
ToolCall(
id="call_123",
function=FunctionCall(
name="SearchWeb",
arguments='{"query": "Python 3.13", "limit": 5}'
)
)
]
5.2 并行工具调用
支持并行调用多个工具:
# LLM 可以生成多个工具调用
tool_calls = [
ToolCall(
id="call_1",
function=FunctionCall(name="ReadFile", arguments='{"path": "file1.txt"}')
),
ToolCall(
id="call_2",
function=FunctionCall(name="ReadFile", arguments='{"path": "file2.txt"}')
),
]
# 并行执行
for tool_call in tool_calls:
result = toolset.handle(tool_call)
results.append(await result)
5.3 工具结果收集
# rust/kosong/src/lib.rs
impl StepResult {
pub async fn tool_results(&mut self) -> anyhow::Result<Vec<ToolResult>> {
let mut results = Vec::new();
for tool_call in &self.tool_calls {
if let Some(rx) = self.tool_result_receivers.remove(&tool_call.id) {
let result = rx.await??;
results.push(result?);
}
}
Ok(results)
}
}
6. 工具结果处理
6.1 结果转换
工具结果转换为消息:
# src/kimi_cli/soul/message.py
def tool_result_to_message(tr: ToolResult) -> Message:
if isinstance(tr.return_value, ToolOk):
return Message(
role="tool",
content=[
TextPart(text=tr.return_value.message or "")
] + tr.return_value.output,
tool_call_id=tr.tool_call_id,
)
elif isinstance(tr.return_value, ToolError):
return Message(
role="tool",
content=[
TextPart(text=tr.return_value.message)
],
tool_call_id=tr.tool_call_id,
)
6.2 结果追加到上下文
async def _grow_context(self, result: StepResult, tool_results: list[ToolResult]):
# 添加助手消息
await self._context.append_message(result.message)
# 添加工具结果消息
tool_messages = [tool_result_to_message(tr) for tr in tool_results]
await self._context.append_message(tool_messages)
7. 特殊工具
7.1 SkipThisTool
工具可以决定跳过自己:
# src/kimi_cli/tools/__init__.py
class SkipThisTool(Exception):
"""当工具决定跳过加载时抛出"""
pass
# 在工具中
class SearchWeb(CallableTool2[Params]):
def __init__(self, config: Config, runtime: Runtime):
if config.services.moonshot_search is None:
raise SkipThisTool() # 跳过加载
super().__init__()
7.2 外部工具
通过 Wire 协议调用外部工具:
class WireExternalTool(CallableTool):
def __init__(self, *, name: str, description: str, parameters: dict[str, Any]) -> None:
super().__init__(
name=name,
description=description or "No description provided.",
parameters=parameters,
)
async def __call__(self, *args: Any, **kwargs: Any) -> ToolReturnValue:
tool_call = get_current_tool_call_or_none()
if tool_call is None:
return ToolError(
message="External tool calls must be invoked from a tool call context.",
brief="Invalid tool call",
)
from kimi_cli.soul import get_wire_or_none
wire = get_wire_or_none()
if wire is None:
return ToolError(
message="Wire is not available for external tool calls.",
brief="Wire unavailable",
)
# 发送到 Wire
external_tool_call = ToolCallRequest.from_tool_call(tool_call)
wire.soul_side.send(external_tool_call)
try:
return await external_tool_call.wait()
except asyncio.CancelledError:
raise
except Exception as e:
return ToolError(
message=f"External tool call failed: {e}",
brief="External tool error",
)
8. 工具链和工具流
8.1 工具链示例
用户: 分析一个项目的代码
1. Glob("**/*.py")
→ 列出所有 Python 文件
2. ReadFile("main.py")
→ 读取主文件
3. ReadFile("utils.py")
→ 读取工具文件
4. Shell("python -m pytest")
→ 运行测试
5. 总结分析结果
8.2 工具流控制
# 工具结果决定下一步
if result.tool_calls:
# 有工具调用,继续
return None
else:
# 无工具调用,结束
return StepOutcome(stop_reason="no_tool_calls", assistant_message=result.message)
8.3 工具链优化
- 并行调用独立的工具
- 串行调用有依赖的工具
- 批量处理相似的工具调用
9. 工具监控和调试
9.1 工具调用日志
logger.debug("Loading tool: {tool_path}", tool_path=tool_path)
logger.info("Loaded tools: {good_tools}", good_tools=good_tools)
9.2 工具错误日志
except Exception as e:
logger.exception("External tool call failed: {tool_name}", tool_name=self.name)
return ToolError(
message=f"External tool call failed: {e}",
brief="External tool error",
)
9.3 工具性能监控
# 在 Rust 中测量工具执行时间
let start = Instant::now();
let result = tool.call(args).await?;
let duration = start.elapsed();
logger.debug!("Tool {} took {:?}", tool.name(), duration);
10. 工具安全
10.1 参数验证
# Pydantic 自动验证参数
class Params(BaseModel):
query: str = Field(description="搜索查询文本")
limit: int = Field(default=5, ge=1, le=20)
10.2 路径验证
# 文件工具中的路径验证
def _normalize_path(path: str) -> str:
cwd = str(KaosPath.cwd().canonical())
if path.startswith(cwd):
path = path[len(cwd):].lstrip("/\\")
return path
10.3 批准机制
所有工具调用都需要批准:
if not await self._runtime.approval.request(
self.name,
self._action_name,
description
):
return ToolRejectedError()
11. 工具扩展
11.1 添加新工具
- 创建工具类
- 实现参数模型
- 实现
__call__方法 - 在 agent.yaml 中注册
# my_tool.py
class MyTool(CallableTool2[Params]):
name: str = "MyTool"
description: str = "我的自定义工具"
params: type[Params] = Params
async def __call__(self, params: Params) -> ToolReturnValue:
return ToolOk(output=[TextPart(text="执行成功")])
# agent.yaml
tools:
- "my_tools.my_tool:MyTool"
11.2 工具库扩展
可以创建工具库:
my_tool_library/
├── __init__.py
├── file_tools.py
├── web_tools.py
└── database_tools.py
11.3 工具动态加载
运行时动态加载工具:
toolset.load_tools(
tool_paths=["custom_tools.tool:CustomTool"],
dependencies=tool_deps
)