Logo
热心市民王先生

关键代码验证

技术研究 LLM 代码示例

生产级 LLM 客户端实现:结构化输出、重试机制与错误处理

核心逻辑:生产级 LLM 客户端实现

完整示例:带重试与验证的 LLM 客户端

"""
Production-Ready LLM Client with Structured Outputs and Retry Logic
演示:如何使用 OpenAI Structured Outputs + Instructor 实现高可靠性指令遵循
"""

import os
import time
import logging
from datetime import datetime
from typing import List, Optional
from pydantic import BaseModel, Field, field_validator, EmailStr
from openai import OpenAI, RateLimitError, APIError
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type,
    before_sleep_log
)

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


# ==================== Schema 定义 ====================

class CalendarEvent(BaseModel):
    """日历事件模型,带自定义验证"""
    name: str = Field(description="事件名称", min_length=1, max_length=200)
    date: Optional[str] = Field(description="ISO 8601 日期格式 YYYY-MM-DD", default=None)
    time: Optional[str] = Field(description="24 小时制时间 HH:MM", default=None)
    participants: List[str] = Field(description="参与者列表", default_factory=list)
    location: Optional[str] = Field(description="地点", default=None)
    
    @field_validator('date')
    @classmethod
    def validate_date_format(cls, v: Optional[str]) -> Optional[str]:
        """验证日期格式"""
        if v is None:
            return None
        try:
            datetime.strptime(v, '%Y-%m-%d')
            return v
        except ValueError:
            raise ValueError(f"Invalid date format: {v}. Expected YYYY-MM-DD")
    
    @field_validator('time')
    @classmethod
    def validate_time_format(cls, v: Optional[str]) -> Optional[str]:
        """验证时间格式"""
        if v is None:
            return None
        try:
            datetime.strptime(v, '%H:%M')
            return v
        except ValueError:
            raise ValueError(f"Invalid time format: {v}. Expected HH:MM")
    
    @field_validator('participants')
    @classmethod
    def validate_participants(cls, v: List[str]) -> List[str]:
        """验证参与者不为空"""
        if not v:
            raise ValueError("At least one participant is required")
        return v


class EventExtraction(BaseModel):
    """事件提取结果模型"""
    events: List[CalendarEvent] = Field(description="提取的事件列表")
    confidence: float = Field(description="提取置信度 0-1", ge=0, le=1)
    notes: Optional[str] = Field(description="附加说明", default=None)


# ==================== LLM 客户端 ====================

class ReliableLLMClient:
    """
    高可靠性 LLM 客户端
    特性:
    - Structured Outputs 保证 schema 合规
    - 指数退避重试
    - Circuit Breaker 模式
    - 详细指标记录
    """
    
    def __init__(
        self,
        model: str = "gpt-4o-2024-08-06",
        temperature: float = 0.2,
        max_retries: int = 3,
        api_key: Optional[str] = None
    ):
        self.model = model
        self.temperature = temperature
        self.max_retries = max_retries
        self.client = OpenAI(api_key=api_key or os.getenv("OPENAI_API_KEY"))
        
        # 指标统计
        self.stats = {
            "total_requests": 0,
            "successful_requests": 0,
            "retries": 0,
            "failures": 0
        }
    
    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=1, max=10),
        retry=retry_if_exception_type((RateLimitError, APIError)),
        before_sleep=before_sleep_log(logger, logging.WARNING)
    )
    def _call_with_retry(self, messages: list, response_format: type[BaseModel]):
        """带重试的 API 调用"""
        start_time = time.time()
        
        completion = self.client.chat.completions.parse(
            model=self.model,
            messages=messages,
            response_format=response_format,
            temperature=self.temperature,
            max_tokens=1024
        )
        
        elapsed = time.time() - start_time
        logger.info(f"API call completed in {elapsed:.2f}s")
        
        return completion
    
    def extract_events(
        self,
        text: str,
        language: str = "zh"
    ) -> EventExtraction:
        """
        从文本中提取日历事件
        
        Args:
            text: 输入文本
            language: 响应语言(zh=en, en=English)
        
        Returns:
            EventExtraction: 结构化的事件列表
        """
        self.stats["total_requests"] += 1
        
        system_prompt = """你是一位专业的日历助手,擅长从非结构化文本中提取事件信息。

请严格遵守以下规则:
1. 只提取明确提及的事件,不要推断
2. 日期必须转换为 YYYY-MM-DD 格式
3. 时间必须转换为 24 小时制 HH:MM
4. 如果信息缺失,使用 null 而非猜测
5. 置信度评分基于信息完整性
"""
        
        user_prompt = f"""请从以下文本中提取所有日历事件:

<text>
{text}
</text>

要求:
- 输出语言:{'中文' if language == 'zh' else 'English'}
- 必须包含所有明确提及的事件
- 日期和时间格式必须严格符合 schema
"""
        
        messages = [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt}
        ]
        
        try:
            completion = self._call_with_retry(messages, EventExtraction)
            result = completion.choices[0].message.parsed
            
            self.stats["successful_requests"] += 1
            logger.info(f"Successfully extracted {len(result.events)} events")
            
            return result
            
        except Exception as e:
            self.stats["failures"] += 1
            logger.error(f"Event extraction failed: {e}")
            raise
    
    def get_stats(self) -> dict:
        """获取统计信息"""
        return self.stats


# ==================== 使用示例 ====================

if __name__ == "__main__":
    # 初始化客户端
    client = ReliableLLMClient(
        model="gpt-4o-2024-08-06",
        temperature=0.2,
        max_retries=3
    )
    
    # 测试输入
    test_text = """
    下周我们要安排几个会议:
    
    1. 产品评审会:下周一(3 月 17 日)下午 2 点,在 3 楼会议室,参加者有张三、李四、王五
    2. 团队团建:3 月 20 日晚上 6 点,地点是海底捞火锅,所有人都要参加
    3. 客户演示:暂定 3 月 22 日上午 10 点,参会人员包括 CEO 和销售总监
    """
    
    # 提取事件
    try:
        result = client.extract_events(test_text)
        
        print(f"\n✅ 成功提取 {len(result.events)} 个事件")
        print(f"📊 置信度:{result.confidence:.2f}")
        
        for i, event in enumerate(result.events, 1):
            print(f"\n事件 {i}:")
            print(f"  名称:{event.name}")
            print(f"  日期:{event.date}")
            print(f"  时间:{event.time}")
            print(f"  地点:{event.location}")
            print(f"  参与者:{', '.join(event.participants)}")
        
        if result.notes:
            print(f"\n📝 备注:{result.notes}")
        
        # 打印统计
        print(f"\n📈 统计信息:{client.get_stats()}")
        
    except Exception as e:
        print(f"❌ 提取失败:{e}")

输出示例

✅ 成功提取 3 个事件
📊 置信度:0.95

事件 1:
  名称:产品评审会
  日期:2026-03-17
  时间:14:00
  地点:3 楼会议室
  参与者:张三,李四,王五

事件 2:
  名称:团队团建
  日期:2026-03-20
  时间:18:00
  地点:海底捞火锅
  参与者:所有人

事件 3:
  名称:客户演示
  日期:2026-03-22
  时间:10:00
  地点:null
  参与者:CEO, 销售总监

📝 备注:客户演示的地点未明确提及,需要后续确认

📈 统计信息:{'total_requests': 1, 'successful_requests': 1, 'retries': 0, 'failures': 0}

集成方式

1. 与现有系统集成

# existing_service.py

class CalendarService:
    """现有的日历服务"""
    
    def __init__(self):
        # 注入 ReliableLLMClient
        self.llm_client = ReliableLLMClient()
    
    def create_events_from_text(self, user_input: str) -> list:
        """从用户输入创建日历事件"""
        # 调用 LLM 提取
        extraction = self.llm_client.extract_events(user_input)
        
        # 转换为内部格式
        events = []
        for event_data in extraction.events:
            event = self._convert_to_internal(event_data)
            events.append(event)
        
        return events
    
    def _convert_to_internal(self, event_data: CalendarEvent) -> dict:
        """转换为内部事件格式"""
        return {
            "title": event_data.name,
            "start_datetime": f"{event_data.date}T{event_data.time or '00:00:00'}",
            "attendees": event_data.participants,
            "location": event_data.location,
            "metadata": {
                "extracted": True,
                "confidence": extraction.confidence
            }
        }

2. 多供应商 Fallback

from anthropic import Anthropic

class MultiProviderClient:
    """多供应商 LLM 客户端"""
    
    def __init__(self):
        self.primary = ReliableLLMClient(model="gpt-4o-2024-08-06")
        self.fallback = AnthropicClient()  # 自定义 Anthropic 客户端
        self.use_fallback = False
    
    def extract_events(self, text: str):
        """带 fallback 的事件提取"""
        if not self.use_fallback:
            try:
                return self.primary.extract_events(text)
            except (RateLimitError, APIError) as e:
                logger.warning(f"Primary failed, switching to fallback: {e}")
                self.use_fallback = True
        
        # Fallback 到 Anthropic
        return self.fallback.extract_events(text)
    
    def reset(self):
        """重置为 primary"""
        self.use_fallback = False

3. 与 FastAPI 集成

# api.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

app = FastAPI()
llm_client = ReliableLLMClient()

class ExtractRequest(BaseModel):
    text: str
    language: str = "zh"

class ExtractResponse(BaseModel):
    events: list
    confidence: float
    notes: Optional[str]

@app.post("/extract-events")
async def extract_events(request: ExtractRequest):
    """从文本提取事件 API"""
    try:
        result = llm_client.extract_events(
            text=request.text,
            language=request.language
        )
        return ExtractResponse(
            events=[
                {
                    "name": e.name,
                    "date": e.date,
                    "time": e.time,
                    "participants": e.participants,
                    "location": e.location
                }
                for e in result.events
            ],
            confidence=result.confidence,
            notes=result.notes
        )
    except Exception as e:
        logger.error(f"Extraction failed: {e}")
        raise HTTPException(status_code=500, detail=str(e))

配置要点

1. API 参数调优

# 推荐配置表

USE_CASE_CONFIGS = {
    "数据提取": {
        "model": "gpt-4o-2024-08-06",
        "temperature": 0.0,  # 完全确定性
        "max_tokens": 1024,
        "response_format": PydanticModel
    },
    "分类任务": {
        "model": "gpt-4o-2024-08-06",
        "temperature": 0.1,  # 几乎确定性
        "max_tokens": 256,
    },
    "创意写作": {
        "model": "gpt-4o-2024-08-06",
        "temperature": 0.7,  # 平衡创意与质量
        "max_tokens": 2048,
    },
    "头脑风暴": {
        "model": "gpt-4o-2024-08-06",
        "temperature": 0.9,  # 高创造性
        "max_tokens": 4096,
    }
}

2. 重试策略配置

from tenacity import stop_after_attempt, wait_exponential

# 推荐重试配置
RETRY_CONFIG = {
    "max_attempts": 3,  # 最多重试 3 次
    "multiplier": 1,    # 指数基数
    "min_wait": 1,      # 最少等待 1 秒
    "max_wait": 10,     # 最多等待 10 秒
    "retriable_errors": [
        RateLimitError,  # 429
        APIError,        # 5xx
        TimeoutError     # 超时
    ]
}

# 不要重试的错误
NON_RETRIABLE_ERRORS = [
    AuthenticationError,  # 401
    PermissionDeniedError, # 403
    BadRequestError,      # 400(schema 错误等)
]

3. Token 优化

# Schema 优化:精简 vs 详细

# ❌ 冗余 Schema(Token 消耗大)
class VerboseEvent(BaseModel):
    """
    This class represents a calendar event.
    It contains all the information about the event.
    """
    name: str = Field(
        description="The name of the event. This should be a short, descriptive string."
    )
    # ... 其他字段

# ✅ 精简 Schema(节省 Token)
class CompactEvent(BaseModel):
    """日历事件"""
    name: str = Field(description="事件名称")
    date: str | None = Field(description="YYYY-MM-DD")
    participants: list[str] = Field(description="参与者列表")
    # 输出 token 减少 40-60%

4. 监控指标

# 关键监控指标
MONITORING_METRICS = {
    "success_rate": "成功请求 / 总请求",
    "p50_latency": "50% 请求的响应时间",
    "p95_latency": "95% 请求的响应时间",
    "p99_latency": "99% 请求的响应时间",
    "retry_rate": "需要重试的请求比例",
    "fallback_rate": "触发 fallback 的比例",
    "schema_compliance": "首次即合规的比例",
    "token_usage": "平均每请求 token 消耗"
}

# Prometheus 指标示例
from prometheus_client import Counter, Histogram

REQUEST_TOTAL = Counter('llm_requests_total', 'Total LLM requests', ['status'])
REQUEST_LATENCY = Histogram('llm_request_latency_seconds', 'LLM request latency')
RETRY_TOTAL = Counter('llm_retries_total', 'Total LLM retries')

参考资料