Logo
热心市民王先生

实施指南

技术研究 人工智能 API

步骤 1 登录 Azure Portal - 访问 https//portal.azure.com/ - 使用 Microsoft 账户登录 步骤 2 创建 Bing Search v7 资源 1. 点击创建资源 2. 搜索 Bing Search v7 3. 选择 Bing Search v7 → 点击创建 4. 填写基本信息: - 订阅:选择你的订阅 -...

本指南提供从零开始实现类 Kimi CLI web_search 功能的完整步骤,包含配置说明和可直接使用的代码示例。

前置准备清单

方案 B 所需准备(第三方搜索 API)

准备项必要性获取方式预计时间
Azure 账户必需Azure Portal10 分钟
Bing Search v7 资源必需Azure Marketplace5 分钟
API Key必需Azure 资源管理页面2 分钟
Python 3.9+必需已安装或 python.org-
aiohttp 库必需pip install aiohttp1 分钟

可选准备项

  • 环境变量管理工具: python-dotenv (pip install python-dotenv)
  • API 测试工具: curl 或 Postman
  • 密钥管理器: 系统 keyring 或 1Password CLI

环境配置

1. 获取 Bing Search API Key

步骤 1: 登录 Azure Portal

步骤 2: 创建 Bing Search v7 资源

  1. 点击”创建资源”
  2. 搜索 “Bing Search v7”
  3. 选择 “Bing Search v7” → 点击”创建”
  4. 填写基本信息:
    • 订阅:选择你的订阅
    • 资源组:创建新的(如 ai-search-rg
    • 名称:bing-search-api
    • 定价层:选择 S1 (1,000 transactions/month 免费)
  5. 点击”审阅并创建” → “创建”

步骤 3: 获取 API Key

  1. 资源创建完成后,点击”转到资源”
  2. 左侧菜单 → “密钥和终结点”
  3. 复制 Key 1(格式类似 a1b2c3d4e5f6...

2. 配置环境变量

创建 .env 文件:

# .env
BING_SEARCH_API_KEY=your_api_key_here
BING_SEARCH_ENDPOINT=https://api.bing.microsoft.com/v7.0
DEFAULT_SEARCH_LIMIT=5
SEARCH_TIMEOUT=30

加载环境变量:

from dotenv import load_dotenv
import os

load_dotenv()

API_KEY = os.getenv("BING_SEARCH_API_KEY")
ENDPOINT = os.getenv("BING_SEARCH_ENDPOINT", "https://api.bing.microsoft.com/v7.0")

核心代码实现

1. 配置模型(Kimi CLI 风格)

# config.py
from pydantic import BaseModel, SecretStr, Field
from typing import Literal, Optional


class SearchConfig(BaseModel):
    """
    搜索服务配置
    
    借鉴 Kimi CLI 的配置设计,支持多服务商和灵活参数
    """
    provider: Literal["bing", "google", "exa"] = Field(
        default="bing",
        description="搜索服务提供商"
    )
    api_key: SecretStr = Field(
        description="API 密钥"
    )
    base_url: str = Field(
        default="https://api.bing.microsoft.com/v7.0",
        description="API 基础 URL"
    )
    custom_headers: Optional[dict[str, str]] = Field(
        default=None,
        description="自定义请求头"
    )
    default_limit: int = Field(
        default=5,
        ge=1,
        le=20,
        description="默认返回结果数量"
    )
    timeout_seconds: int = Field(
        default=30,
        description="请求超时时间"
    )
    
    class Config:
        # 允许从环境变量加载
        env_prefix = "SEARCH_"

2. 响应数据模型

# models.py
from pydantic import BaseModel, Field
from typing import List, Optional
from datetime import datetime


class SearchResult(BaseModel):
    """
    搜索结果项 - 兼容 Kimi CLI 格式
    
    字段命名与 Kimi CLI 的 SearchResult 保持一致,
    便于未来迁移到官方 API
    """
    site_name: str = Field(
        default="",
        description="网站名称"
    )
    title: str = Field(
        description="页面标题"
    )
    url: str = Field(
        description="页面 URL"
    )
    snippet: str = Field(
        description="内容摘要"
    )
    content: str = Field(
        default="",
        description="页面完整内容(需额外抓取)"
    )
    date: str = Field(
        default="",
        description="发布/更新日期"
    )
    icon: str = Field(
        default="",
        description="网站图标 URL"
    )
    mime: str = Field(
        default="text/html",
        description="MIME 类型"
    )


class SearchResponse(BaseModel):
    """搜索响应包装器"""
    search_results: List[SearchResult] = Field(
        default_factory=list,
        description="搜索结果列表"
    )
    total_results: int = Field(
        default=0,
        description="总结果数"
    )
    query: str = Field(
        description="原始查询"
    )

3. 搜索工具实现

# search_tool.py
import aiohttp
import asyncio
from typing import List, Dict, Any, Optional
from urllib.parse import urlparse

from config import SearchConfig
from models import SearchResult, SearchResponse


class WebSearchTool:
    """
    Web 搜索工具 - 兼容 Kimi CLI 风格
    
    支持多服务商(Bing、Exa.ai),结果格式与 Kimi CLI 保持一致
    """
    
    def __init__(self, config: SearchConfig):
        self.config = config
        self._session: Optional[aiohttp.ClientSession] = None
    
    async def __aenter__(self):
        """异步上下文管理器入口"""
        self._session = aiohttp.ClientSession(
            timeout=aiohttp.ClientTimeout(total=self.config.timeout_seconds)
        )
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """异步上下文管理器出口"""
        if self._session:
            await self._session.close()
            self._session = None
    
    async def search(
        self, 
        query: str, 
        limit: Optional[int] = None,
        include_content: bool = False,
        **kwargs
    ) -> SearchResponse:
        """
        执行网页搜索
        
        Args:
            query: 搜索关键词
            limit: 返回结果数量 (1-20),默认使用配置值
            include_content: 是否抓取页面全文(注意:会增加耗时)
            **kwargs: 额外参数(透传给服务商)
        
        Returns:
            SearchResponse: 格式化的搜索结果
        
        Raises:
            SearchAPIError: API 调用失败
            RateLimitError: 频率限制
        """
        if not self._session:
            raise RuntimeError("SearchTool must be used within async context manager")
        
        limit = limit or self.config.default_limit
        limit = max(1, min(limit, 20))  # 限制范围 1-20
        
        # 根据服务商路由
        if self.config.provider == "bing":
            results = await self._search_bing(query, limit, **kwargs)
        elif self.config.provider == "exa":
            results = await self._search_exa(query, limit, **kwargs)
        else:
            raise ValueError(f"Unsupported provider: {self.config.provider}")
        
        # 如需抓取全文内容
        if include_content:
            results = await self._fetch_content_batch(results)
        
        return SearchResponse(
            search_results=results,
            total_results=len(results),
            query=query
        )
    
    async def _search_bing(self, query: str, limit: int, **kwargs) -> List[SearchResult]:
        """Bing Search API 实现"""
        headers = {
            "Ocp-Apim-Subscription-Key": self.config.api_key.get_secret_value(),
            **(self.config.custom_headers or {})
        }
        
        params = {
            "q": query,
            "count": limit,
            "responseFilter": "Webpages",
            "mkt": kwargs.get("market", "zh-CN"),  # 市场区域
            "setLang": kwargs.get("language", "zh"),  # 语言
        }
        
        async with self._session.get(
            f"{self.config.base_url}/search",
            headers=headers,
            params=params
        ) as response:
            if response.status == 401:
                raise SearchAPIError("Invalid API key")
            elif response.status == 429:
                raise RateLimitError("Rate limit exceeded")
            elif response.status != 200:
                raise SearchAPIError(f"API error: {response.status}")
            
            data = await response.json()
            
            # 解析 Bing 响应
            results = []
            for item in data.get("webPages", {}).get("value", []):
                results.append(SearchResult(
                    site_name=item.get("siteName", self._extract_domain(item.get("url", ""))),
                    title=item.get("name", ""),
                    url=item.get("url", ""),
                    snippet=item.get("snippet", ""),
                    date=item.get("dateLastCrawled", ""),
                ))
            
            return results
    
    async def _search_exa(self, query: str, limit: int, **kwargs) -> List[SearchResult]:
        """Exa.ai API 实现"""
        headers = {
            "Authorization": f"Bearer {self.config.api_key.get_secret_value()}",
            "Content-Type": "application/json",
            **(self.config.custom_headers or {})
        }
        
        payload = {
            "query": query,
            "numResults": limit,
            "useAutoprompt": True,
            "type": "auto",
            "contents": {
                "text": True  # 返回文本内容
            }
        }
        
        async with self._session.post(
            "https://api.exa.ai/search",
            headers=headers,
            json=payload
        ) as response:
            if response.status == 401:
                raise SearchAPIError("Invalid API key")
            elif response.status == 429:
                raise RateLimitError("Rate limit exceeded")
            
            data = await response.json()
            
            results = []
            for item in data.get("results", []):
                results.append(SearchResult(
                    site_name=self._extract_domain(item.get("url", "")),
                    title=item.get("title", ""),
                    url=item.get("url", ""),
                    snippet=item.get("snippet", ""),
                    content=item.get("text", ""),  # Exa 可以直接返回内容
                ))
            
            return results
    
    async def _fetch_content_batch(self, results: List[SearchResult]) -> List[SearchResult]:
        """批量抓取页面内容(简化版,生产环境建议使用专用爬虫)"""
        # 注意:这是一个简化实现,实际生产环境应使用更健壮的爬虫方案
        # 如:Playwright、Scrapy、或专门的网页抓取服务
        
        async def fetch_single(result: SearchResult) -> SearchResult:
            try:
                async with self._session.get(
                    result.url, 
                    timeout=aiohttp.ClientTimeout(total=10)
                ) as response:
                    if response.status == 200:
                        html = await response.text()
                        # 简单的 HTML 到文本转换(生产环境建议使用 BeautifulSoup)
                        text = self._html_to_text(html)
                        result.content = text[:5000]  # 限制长度
            except Exception:
                pass  # 抓取失败不影响主结果
            return result
        
        # 并发抓取(限制并发数)
        semaphore = asyncio.Semaphore(3)
        
        async def fetch_with_limit(result: SearchResult) -> SearchResult:
            async with semaphore:
                return await fetch_single(result)
        
        tasks = [fetch_with_limit(r) for r in results]
        return await asyncio.gather(*tasks)
    
    def _extract_domain(self, url: str) -> str:
        """从 URL 提取域名"""
        try:
            parsed = urlparse(url)
            return parsed.netloc.replace("www.", "")
        except:
            return ""
    
    def _html_to_text(self, html: str) -> str:
        """简单 HTML 到文本转换"""
        # 生产环境建议使用 BeautifulSoup 或 html2text
        import re
        # 移除 script 和 style
        html = re.sub(r'<script[^>]*>.*?</script>', '', html, flags=re.DOTALL)
        html = re.sub(r'<style[^>]*>.*?</style>', '', html, flags=re.DOTALL)
        # 移除 HTML 标签
        text = re.sub(r'<[^>]+>', ' ', html)
        # 规范化空白
        text = ' '.join(text.split())
        return text.strip()


class SearchAPIError(Exception):
    """搜索 API 错误"""
    pass


class RateLimitError(Exception):
    """频率限制错误"""
    pass

4. 结果格式化器(Kimi CLI 兼容)

# formatter.py
from typing import List
from models import SearchResult


class ResultFormatter:
    """
    搜索结果格式化器
    
    将搜索结果格式化为 Kimi CLI 风格的 Markdown 文本,
    便于 AI 模型消费
    """
    
    @staticmethod
    def to_markdown(results: List[SearchResult]) -> str:
        """
        转换为 Kimi CLI 风格的 Markdown 格式
        
        格式示例:
        Title: xxx
        Date: xxx
        URL: xxx
        Summary: xxx
        
        ---
        
        Title: xxx
        ...
        """
        if not results:
            return "未找到相关结果。"
        
        lines = []
        
        for i, result in enumerate(results):
            # 分隔线(除第一条外)
            if i > 0:
                lines.append("---\n")
            
            # 基本信息
            lines.append(f"Title: {result.title}")
            lines.append(f"Date: {result.date}")
            lines.append(f"URL: {result.url}")
            lines.append(f"Summary: {result.snippet}\n")
            
            # 全文内容(如果有)
            if result.content:
                lines.append(f"{result.content}\n")
        
        return "\n".join(lines)
    
    @staticmethod
    def to_json(results: List[SearchResult]) -> str:
        """转换为 JSON 格式(便于程序处理)"""
        from pydantic import TypeAdapter
        adapter = TypeAdapter(List[SearchResult])
        return adapter.dump_json(results, indent=2).decode('utf-8')
    
    @staticmethod
    def to_simple_text(results: List[SearchResult]) -> str:
        """转换为简化文本格式"""
        lines = []
        for i, result in enumerate(results, 1):
            lines.append(f"{i}. {result.title}")
            lines.append(f"   {result.url}")
            lines.append(f"   {result.snippet[:100]}...")
            lines.append("")
        return "\n".join(lines)

5. 完整使用示例

# example.py
import asyncio
import os
from dotenv import load_dotenv

from config import SearchConfig
from models import SearchResponse
from search_tool import WebSearchTool
from formatter import ResultFormatter


async def main():
    """使用示例"""
    
    # 加载环境变量
    load_dotenv()
    
    # 初始化配置
    config = SearchConfig(
        provider="bing",
        api_key=os.getenv("BING_SEARCH_API_KEY"),
        base_url="https://api.bing.microsoft.com/v7.0",
        default_limit=5
    )
    
    # 使用搜索工具
    async with WebSearchTool(config) as search_tool:
        try:
            # 执行搜索
            response: SearchResponse = await search_tool.search(
                query="OpenAI GPT-4 最新发布",
                limit=5,
                include_content=False
            )
            
            print(f"查询: {response.query}")
            print(f"找到 {response.total_results} 条结果\n")
            print("=" * 50)
            
            # 格式化为 Kimi CLI 风格输出
            markdown_output = ResultFormatter.to_markdown(response.search_results)
            print(markdown_output)
            
        except Exception as e:
            print(f"搜索失败: {e}")


if __name__ == "__main__":
    asyncio.run(main())

项目结构

推荐的项目文件结构:

your-search-project/
├── .env                      # 环境变量(不要提交到 Git)
├── .env.example              # 环境变量示例
├── .gitignore
├── requirements.txt          # 依赖列表

├── src/
│   ├── __init__.py
│   ├── config.py            # 配置模型
│   ├── models.py            # 数据模型
│   ├── search_tool.py       # 核心搜索工具
│   ├── formatter.py         # 结果格式化
│   └── utils.py             # 工具函数

├── tests/
│   └── test_search.py       # 测试用例

└── example.py               # 使用示例

requirements.txt

aiohttp>=3.8.0
pydantic>=2.0.0
python-dotenv>=1.0.0

.gitignore

.env
__pycache__/
*.pyc
.DS_Store
.vscode/
.idea/

.env.example

# 搜索 API 配置
BING_SEARCH_API_KEY=your_api_key_here
BING_SEARCH_ENDPOINT=https://api.bing.microsoft.com/v7.0

# 可选:其他服务商
EXA_API_KEY=your_exa_key_here

# 默认参数
DEFAULT_SEARCH_LIMIT=5
SEARCH_TIMEOUT=30

测试验证

单元测试示例

# tests/test_search.py
import pytest
import asyncio
from unittest.mock import AsyncMock, patch

from src.config import SearchConfig
from src.search_tool import WebSearchTool
from src.models import SearchResult


@pytest.fixture
def mock_config():
    return SearchConfig(
        provider="bing",
        api_key="test_key",
        base_url="https://test.api.com"
    )


@pytest.mark.asyncio
async def test_search_basic(mock_config):
    """测试基本搜索功能"""
    mock_response = {
        "webPages": {
            "value": [
                {
                    "name": "Test Title",
                    "url": "https://example.com",
                    "snippet": "Test snippet"
                }
            ]
        }
    }
    
    with patch('aiohttp.ClientSession.get') as mock_get:
        mock_get.return_value.__aenter__.return_value.status = 200
        mock_get.return_value.__aenter__.return_value.json = AsyncMock(
            return_value=mock_response
        )
        
        async with WebSearchTool(mock_config) as tool:
            response = await tool.search("test query")
            
            assert response.total_results == 1
            assert response.search_results[0].title == "Test Title"

手动测试命令

# 1. 设置环境变量
export BING_SEARCH_API_KEY="your_actual_api_key"

# 2. 运行示例
python example.py

# 3. 或使用 curl 直接测试 API
curl -X GET "https://api.bing.microsoft.com/v7.0/search?q=OpenAI&count=5" \
  -H "Ocp-Apim-Subscription-Key: your_api_key"

故障排除

常见问题

问题可能原因解决方案
401 UnauthorizedAPI Key 无效检查 Key 是否复制完整,是否在正确的资源下
429 Too Many Requests频率限制降低请求频率,或升级定价层
空结果查询词问题尝试更通用的关键词,检查 market 参数
超时网络问题增加 timeout_seconds,检查网络连接
SSL 错误证书问题更新 Python,或临时禁用 SSL 验证(不推荐)

调试技巧

# 启用详细日志
import logging
logging.basicConfig(level=logging.DEBUG)

# 在搜索工具中添加日志
import logging
logger = logging.getLogger(__name__)

# 在 _search_bing 方法中
logger.debug(f"Request URL: {self.config.base_url}/search")
logger.debug(f"Request params: {params}")

生产环境建议

1. 错误重试机制

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=4, max=10),
    retry=retry_if_exception_type((SearchAPIError, asyncio.TimeoutError))
)
async def search_with_retry(tool, query):
    return await tool.search(query)

2. 缓存机制

from functools import lru_cache
import hashlib

def _make_cache_key(query: str, limit: int) -> str:
    return hashlib.md5(f"{query}:{limit}".encode()).hexdigest()

# 使用 Redis 或内存缓存
results_cache = {}

async def cached_search(tool, query: str, limit: int = 5):
    cache_key = _make_cache_key(query, limit)
    if cache_key in results_cache:
        return results_cache[cache_key]
    
    result = await tool.search(query, limit)
    results_cache[cache_key] = result
    return result

3. 监控埋点

import time
from typing import Callable

async def search_with_metrics(
    tool: WebSearchTool, 
    query: str,
    metrics_callback: Callable
) -> SearchResponse:
    start_time = time.time()
    
    try:
        response = await tool.search(query)
        status = "success"
        result_count = response.total_results
    except Exception as e:
        status = "error"
        result_count = 0
        raise
    finally:
        duration = time.time() - start_time
        metrics_callback({
            "operation": "web_search",
            "duration": duration,
            "status": status,
            "result_count": result_count,
            "query_length": len(query)
        })

扩展阅读