实施指南
技术研究 人工智能 API
步骤 1 登录 Azure Portal - 访问 https//portal.azure.com/ - 使用 Microsoft 账户登录 步骤 2 创建 Bing Search v7 资源 1. 点击创建资源 2. 搜索 Bing Search v7 3. 选择 Bing Search v7 → 点击创建 4. 填写基本信息: - 订阅:选择你的订阅 -...
本指南提供从零开始实现类 Kimi CLI web_search 功能的完整步骤,包含配置说明和可直接使用的代码示例。
前置准备清单
方案 B 所需准备(第三方搜索 API)
| 准备项 | 必要性 | 获取方式 | 预计时间 |
|---|---|---|---|
| Azure 账户 | 必需 | Azure Portal | 10 分钟 |
| Bing Search v7 资源 | 必需 | Azure Marketplace | 5 分钟 |
| API Key | 必需 | Azure 资源管理页面 | 2 分钟 |
| Python 3.9+ | 必需 | 已安装或 python.org | - |
| aiohttp 库 | 必需 | pip install aiohttp | 1 分钟 |
可选准备项
- 环境变量管理工具: python-dotenv (
pip install python-dotenv) - API 测试工具: curl 或 Postman
- 密钥管理器: 系统 keyring 或 1Password CLI
环境配置
1. 获取 Bing Search API Key
步骤 1: 登录 Azure Portal
- 访问 https://portal.azure.com/
- 使用 Microsoft 账户登录
步骤 2: 创建 Bing Search v7 资源
- 点击”创建资源”
- 搜索 “Bing Search v7”
- 选择 “Bing Search v7” → 点击”创建”
- 填写基本信息:
- 订阅:选择你的订阅
- 资源组:创建新的(如
ai-search-rg) - 名称:
bing-search-api - 定价层:选择 S1 (1,000 transactions/month 免费)
- 点击”审阅并创建” → “创建”
步骤 3: 获取 API Key
- 资源创建完成后,点击”转到资源”
- 左侧菜单 → “密钥和终结点”
- 复制 Key 1(格式类似
a1b2c3d4e5f6...)
2. 配置环境变量
创建 .env 文件:
# .env
BING_SEARCH_API_KEY=your_api_key_here
BING_SEARCH_ENDPOINT=https://api.bing.microsoft.com/v7.0
DEFAULT_SEARCH_LIMIT=5
SEARCH_TIMEOUT=30
加载环境变量:
from dotenv import load_dotenv
import os
load_dotenv()
API_KEY = os.getenv("BING_SEARCH_API_KEY")
ENDPOINT = os.getenv("BING_SEARCH_ENDPOINT", "https://api.bing.microsoft.com/v7.0")
核心代码实现
1. 配置模型(Kimi CLI 风格)
# config.py
from pydantic import BaseModel, SecretStr, Field
from typing import Literal, Optional
class SearchConfig(BaseModel):
"""
搜索服务配置
借鉴 Kimi CLI 的配置设计,支持多服务商和灵活参数
"""
provider: Literal["bing", "google", "exa"] = Field(
default="bing",
description="搜索服务提供商"
)
api_key: SecretStr = Field(
description="API 密钥"
)
base_url: str = Field(
default="https://api.bing.microsoft.com/v7.0",
description="API 基础 URL"
)
custom_headers: Optional[dict[str, str]] = Field(
default=None,
description="自定义请求头"
)
default_limit: int = Field(
default=5,
ge=1,
le=20,
description="默认返回结果数量"
)
timeout_seconds: int = Field(
default=30,
description="请求超时时间"
)
class Config:
# 允许从环境变量加载
env_prefix = "SEARCH_"
2. 响应数据模型
# models.py
from pydantic import BaseModel, Field
from typing import List, Optional
from datetime import datetime
class SearchResult(BaseModel):
"""
搜索结果项 - 兼容 Kimi CLI 格式
字段命名与 Kimi CLI 的 SearchResult 保持一致,
便于未来迁移到官方 API
"""
site_name: str = Field(
default="",
description="网站名称"
)
title: str = Field(
description="页面标题"
)
url: str = Field(
description="页面 URL"
)
snippet: str = Field(
description="内容摘要"
)
content: str = Field(
default="",
description="页面完整内容(需额外抓取)"
)
date: str = Field(
default="",
description="发布/更新日期"
)
icon: str = Field(
default="",
description="网站图标 URL"
)
mime: str = Field(
default="text/html",
description="MIME 类型"
)
class SearchResponse(BaseModel):
"""搜索响应包装器"""
search_results: List[SearchResult] = Field(
default_factory=list,
description="搜索结果列表"
)
total_results: int = Field(
default=0,
description="总结果数"
)
query: str = Field(
description="原始查询"
)
3. 搜索工具实现
# search_tool.py
import aiohttp
import asyncio
from typing import List, Dict, Any, Optional
from urllib.parse import urlparse
from config import SearchConfig
from models import SearchResult, SearchResponse
class WebSearchTool:
"""
Web 搜索工具 - 兼容 Kimi CLI 风格
支持多服务商(Bing、Exa.ai),结果格式与 Kimi CLI 保持一致
"""
def __init__(self, config: SearchConfig):
self.config = config
self._session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self):
"""异步上下文管理器入口"""
self._session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=self.config.timeout_seconds)
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""异步上下文管理器出口"""
if self._session:
await self._session.close()
self._session = None
async def search(
self,
query: str,
limit: Optional[int] = None,
include_content: bool = False,
**kwargs
) -> SearchResponse:
"""
执行网页搜索
Args:
query: 搜索关键词
limit: 返回结果数量 (1-20),默认使用配置值
include_content: 是否抓取页面全文(注意:会增加耗时)
**kwargs: 额外参数(透传给服务商)
Returns:
SearchResponse: 格式化的搜索结果
Raises:
SearchAPIError: API 调用失败
RateLimitError: 频率限制
"""
if not self._session:
raise RuntimeError("SearchTool must be used within async context manager")
limit = limit or self.config.default_limit
limit = max(1, min(limit, 20)) # 限制范围 1-20
# 根据服务商路由
if self.config.provider == "bing":
results = await self._search_bing(query, limit, **kwargs)
elif self.config.provider == "exa":
results = await self._search_exa(query, limit, **kwargs)
else:
raise ValueError(f"Unsupported provider: {self.config.provider}")
# 如需抓取全文内容
if include_content:
results = await self._fetch_content_batch(results)
return SearchResponse(
search_results=results,
total_results=len(results),
query=query
)
async def _search_bing(self, query: str, limit: int, **kwargs) -> List[SearchResult]:
"""Bing Search API 实现"""
headers = {
"Ocp-Apim-Subscription-Key": self.config.api_key.get_secret_value(),
**(self.config.custom_headers or {})
}
params = {
"q": query,
"count": limit,
"responseFilter": "Webpages",
"mkt": kwargs.get("market", "zh-CN"), # 市场区域
"setLang": kwargs.get("language", "zh"), # 语言
}
async with self._session.get(
f"{self.config.base_url}/search",
headers=headers,
params=params
) as response:
if response.status == 401:
raise SearchAPIError("Invalid API key")
elif response.status == 429:
raise RateLimitError("Rate limit exceeded")
elif response.status != 200:
raise SearchAPIError(f"API error: {response.status}")
data = await response.json()
# 解析 Bing 响应
results = []
for item in data.get("webPages", {}).get("value", []):
results.append(SearchResult(
site_name=item.get("siteName", self._extract_domain(item.get("url", ""))),
title=item.get("name", ""),
url=item.get("url", ""),
snippet=item.get("snippet", ""),
date=item.get("dateLastCrawled", ""),
))
return results
async def _search_exa(self, query: str, limit: int, **kwargs) -> List[SearchResult]:
"""Exa.ai API 实现"""
headers = {
"Authorization": f"Bearer {self.config.api_key.get_secret_value()}",
"Content-Type": "application/json",
**(self.config.custom_headers or {})
}
payload = {
"query": query,
"numResults": limit,
"useAutoprompt": True,
"type": "auto",
"contents": {
"text": True # 返回文本内容
}
}
async with self._session.post(
"https://api.exa.ai/search",
headers=headers,
json=payload
) as response:
if response.status == 401:
raise SearchAPIError("Invalid API key")
elif response.status == 429:
raise RateLimitError("Rate limit exceeded")
data = await response.json()
results = []
for item in data.get("results", []):
results.append(SearchResult(
site_name=self._extract_domain(item.get("url", "")),
title=item.get("title", ""),
url=item.get("url", ""),
snippet=item.get("snippet", ""),
content=item.get("text", ""), # Exa 可以直接返回内容
))
return results
async def _fetch_content_batch(self, results: List[SearchResult]) -> List[SearchResult]:
"""批量抓取页面内容(简化版,生产环境建议使用专用爬虫)"""
# 注意:这是一个简化实现,实际生产环境应使用更健壮的爬虫方案
# 如:Playwright、Scrapy、或专门的网页抓取服务
async def fetch_single(result: SearchResult) -> SearchResult:
try:
async with self._session.get(
result.url,
timeout=aiohttp.ClientTimeout(total=10)
) as response:
if response.status == 200:
html = await response.text()
# 简单的 HTML 到文本转换(生产环境建议使用 BeautifulSoup)
text = self._html_to_text(html)
result.content = text[:5000] # 限制长度
except Exception:
pass # 抓取失败不影响主结果
return result
# 并发抓取(限制并发数)
semaphore = asyncio.Semaphore(3)
async def fetch_with_limit(result: SearchResult) -> SearchResult:
async with semaphore:
return await fetch_single(result)
tasks = [fetch_with_limit(r) for r in results]
return await asyncio.gather(*tasks)
def _extract_domain(self, url: str) -> str:
"""从 URL 提取域名"""
try:
parsed = urlparse(url)
return parsed.netloc.replace("www.", "")
except:
return ""
def _html_to_text(self, html: str) -> str:
"""简单 HTML 到文本转换"""
# 生产环境建议使用 BeautifulSoup 或 html2text
import re
# 移除 script 和 style
html = re.sub(r'<script[^>]*>.*?</script>', '', html, flags=re.DOTALL)
html = re.sub(r'<style[^>]*>.*?</style>', '', html, flags=re.DOTALL)
# 移除 HTML 标签
text = re.sub(r'<[^>]+>', ' ', html)
# 规范化空白
text = ' '.join(text.split())
return text.strip()
class SearchAPIError(Exception):
"""搜索 API 错误"""
pass
class RateLimitError(Exception):
"""频率限制错误"""
pass
4. 结果格式化器(Kimi CLI 兼容)
# formatter.py
from typing import List
from models import SearchResult
class ResultFormatter:
"""
搜索结果格式化器
将搜索结果格式化为 Kimi CLI 风格的 Markdown 文本,
便于 AI 模型消费
"""
@staticmethod
def to_markdown(results: List[SearchResult]) -> str:
"""
转换为 Kimi CLI 风格的 Markdown 格式
格式示例:
Title: xxx
Date: xxx
URL: xxx
Summary: xxx
---
Title: xxx
...
"""
if not results:
return "未找到相关结果。"
lines = []
for i, result in enumerate(results):
# 分隔线(除第一条外)
if i > 0:
lines.append("---\n")
# 基本信息
lines.append(f"Title: {result.title}")
lines.append(f"Date: {result.date}")
lines.append(f"URL: {result.url}")
lines.append(f"Summary: {result.snippet}\n")
# 全文内容(如果有)
if result.content:
lines.append(f"{result.content}\n")
return "\n".join(lines)
@staticmethod
def to_json(results: List[SearchResult]) -> str:
"""转换为 JSON 格式(便于程序处理)"""
from pydantic import TypeAdapter
adapter = TypeAdapter(List[SearchResult])
return adapter.dump_json(results, indent=2).decode('utf-8')
@staticmethod
def to_simple_text(results: List[SearchResult]) -> str:
"""转换为简化文本格式"""
lines = []
for i, result in enumerate(results, 1):
lines.append(f"{i}. {result.title}")
lines.append(f" {result.url}")
lines.append(f" {result.snippet[:100]}...")
lines.append("")
return "\n".join(lines)
5. 完整使用示例
# example.py
import asyncio
import os
from dotenv import load_dotenv
from config import SearchConfig
from models import SearchResponse
from search_tool import WebSearchTool
from formatter import ResultFormatter
async def main():
"""使用示例"""
# 加载环境变量
load_dotenv()
# 初始化配置
config = SearchConfig(
provider="bing",
api_key=os.getenv("BING_SEARCH_API_KEY"),
base_url="https://api.bing.microsoft.com/v7.0",
default_limit=5
)
# 使用搜索工具
async with WebSearchTool(config) as search_tool:
try:
# 执行搜索
response: SearchResponse = await search_tool.search(
query="OpenAI GPT-4 最新发布",
limit=5,
include_content=False
)
print(f"查询: {response.query}")
print(f"找到 {response.total_results} 条结果\n")
print("=" * 50)
# 格式化为 Kimi CLI 风格输出
markdown_output = ResultFormatter.to_markdown(response.search_results)
print(markdown_output)
except Exception as e:
print(f"搜索失败: {e}")
if __name__ == "__main__":
asyncio.run(main())
项目结构
推荐的项目文件结构:
your-search-project/
├── .env # 环境变量(不要提交到 Git)
├── .env.example # 环境变量示例
├── .gitignore
├── requirements.txt # 依赖列表
│
├── src/
│ ├── __init__.py
│ ├── config.py # 配置模型
│ ├── models.py # 数据模型
│ ├── search_tool.py # 核心搜索工具
│ ├── formatter.py # 结果格式化
│ └── utils.py # 工具函数
│
├── tests/
│ └── test_search.py # 测试用例
│
└── example.py # 使用示例
requirements.txt
aiohttp>=3.8.0
pydantic>=2.0.0
python-dotenv>=1.0.0
.gitignore
.env
__pycache__/
*.pyc
.DS_Store
.vscode/
.idea/
.env.example
# 搜索 API 配置
BING_SEARCH_API_KEY=your_api_key_here
BING_SEARCH_ENDPOINT=https://api.bing.microsoft.com/v7.0
# 可选:其他服务商
EXA_API_KEY=your_exa_key_here
# 默认参数
DEFAULT_SEARCH_LIMIT=5
SEARCH_TIMEOUT=30
测试验证
单元测试示例
# tests/test_search.py
import pytest
import asyncio
from unittest.mock import AsyncMock, patch
from src.config import SearchConfig
from src.search_tool import WebSearchTool
from src.models import SearchResult
@pytest.fixture
def mock_config():
return SearchConfig(
provider="bing",
api_key="test_key",
base_url="https://test.api.com"
)
@pytest.mark.asyncio
async def test_search_basic(mock_config):
"""测试基本搜索功能"""
mock_response = {
"webPages": {
"value": [
{
"name": "Test Title",
"url": "https://example.com",
"snippet": "Test snippet"
}
]
}
}
with patch('aiohttp.ClientSession.get') as mock_get:
mock_get.return_value.__aenter__.return_value.status = 200
mock_get.return_value.__aenter__.return_value.json = AsyncMock(
return_value=mock_response
)
async with WebSearchTool(mock_config) as tool:
response = await tool.search("test query")
assert response.total_results == 1
assert response.search_results[0].title == "Test Title"
手动测试命令
# 1. 设置环境变量
export BING_SEARCH_API_KEY="your_actual_api_key"
# 2. 运行示例
python example.py
# 3. 或使用 curl 直接测试 API
curl -X GET "https://api.bing.microsoft.com/v7.0/search?q=OpenAI&count=5" \
-H "Ocp-Apim-Subscription-Key: your_api_key"
故障排除
常见问题
| 问题 | 可能原因 | 解决方案 |
|---|---|---|
| 401 Unauthorized | API Key 无效 | 检查 Key 是否复制完整,是否在正确的资源下 |
| 429 Too Many Requests | 频率限制 | 降低请求频率,或升级定价层 |
| 空结果 | 查询词问题 | 尝试更通用的关键词,检查 market 参数 |
| 超时 | 网络问题 | 增加 timeout_seconds,检查网络连接 |
| SSL 错误 | 证书问题 | 更新 Python,或临时禁用 SSL 验证(不推荐) |
调试技巧
# 启用详细日志
import logging
logging.basicConfig(level=logging.DEBUG)
# 在搜索工具中添加日志
import logging
logger = logging.getLogger(__name__)
# 在 _search_bing 方法中
logger.debug(f"Request URL: {self.config.base_url}/search")
logger.debug(f"Request params: {params}")
生产环境建议
1. 错误重试机制
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
retry=retry_if_exception_type((SearchAPIError, asyncio.TimeoutError))
)
async def search_with_retry(tool, query):
return await tool.search(query)
2. 缓存机制
from functools import lru_cache
import hashlib
def _make_cache_key(query: str, limit: int) -> str:
return hashlib.md5(f"{query}:{limit}".encode()).hexdigest()
# 使用 Redis 或内存缓存
results_cache = {}
async def cached_search(tool, query: str, limit: int = 5):
cache_key = _make_cache_key(query, limit)
if cache_key in results_cache:
return results_cache[cache_key]
result = await tool.search(query, limit)
results_cache[cache_key] = result
return result
3. 监控埋点
import time
from typing import Callable
async def search_with_metrics(
tool: WebSearchTool,
query: str,
metrics_callback: Callable
) -> SearchResponse:
start_time = time.time()
try:
response = await tool.search(query)
status = "success"
result_count = response.total_results
except Exception as e:
status = "error"
result_count = 0
raise
finally:
duration = time.time() - start_time
metrics_callback({
"operation": "web_search",
"duration": duration,
"status": status,
"result_count": result_count,
"query_length": len(query)
})