关键代码验证
技术研究 LLM 代码示例
生产级 LLM 客户端实现:结构化输出、重试机制与错误处理
核心逻辑:生产级 LLM 客户端实现
完整示例:带重试与验证的 LLM 客户端
"""
Production-Ready LLM Client with Structured Outputs and Retry Logic
演示:如何使用 OpenAI Structured Outputs + Instructor 实现高可靠性指令遵循
"""
import os
import time
import logging
from datetime import datetime
from typing import List, Optional
from pydantic import BaseModel, Field, field_validator, EmailStr
from openai import OpenAI, RateLimitError, APIError
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
before_sleep_log
)
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ==================== Schema 定义 ====================
class CalendarEvent(BaseModel):
"""日历事件模型,带自定义验证"""
name: str = Field(description="事件名称", min_length=1, max_length=200)
date: Optional[str] = Field(description="ISO 8601 日期格式 YYYY-MM-DD", default=None)
time: Optional[str] = Field(description="24 小时制时间 HH:MM", default=None)
participants: List[str] = Field(description="参与者列表", default_factory=list)
location: Optional[str] = Field(description="地点", default=None)
@field_validator('date')
@classmethod
def validate_date_format(cls, v: Optional[str]) -> Optional[str]:
"""验证日期格式"""
if v is None:
return None
try:
datetime.strptime(v, '%Y-%m-%d')
return v
except ValueError:
raise ValueError(f"Invalid date format: {v}. Expected YYYY-MM-DD")
@field_validator('time')
@classmethod
def validate_time_format(cls, v: Optional[str]) -> Optional[str]:
"""验证时间格式"""
if v is None:
return None
try:
datetime.strptime(v, '%H:%M')
return v
except ValueError:
raise ValueError(f"Invalid time format: {v}. Expected HH:MM")
@field_validator('participants')
@classmethod
def validate_participants(cls, v: List[str]) -> List[str]:
"""验证参与者不为空"""
if not v:
raise ValueError("At least one participant is required")
return v
class EventExtraction(BaseModel):
"""事件提取结果模型"""
events: List[CalendarEvent] = Field(description="提取的事件列表")
confidence: float = Field(description="提取置信度 0-1", ge=0, le=1)
notes: Optional[str] = Field(description="附加说明", default=None)
# ==================== LLM 客户端 ====================
class ReliableLLMClient:
"""
高可靠性 LLM 客户端
特性:
- Structured Outputs 保证 schema 合规
- 指数退避重试
- Circuit Breaker 模式
- 详细指标记录
"""
def __init__(
self,
model: str = "gpt-4o-2024-08-06",
temperature: float = 0.2,
max_retries: int = 3,
api_key: Optional[str] = None
):
self.model = model
self.temperature = temperature
self.max_retries = max_retries
self.client = OpenAI(api_key=api_key or os.getenv("OPENAI_API_KEY"))
# 指标统计
self.stats = {
"total_requests": 0,
"successful_requests": 0,
"retries": 0,
"failures": 0
}
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=retry_if_exception_type((RateLimitError, APIError)),
before_sleep=before_sleep_log(logger, logging.WARNING)
)
def _call_with_retry(self, messages: list, response_format: type[BaseModel]):
"""带重试的 API 调用"""
start_time = time.time()
completion = self.client.chat.completions.parse(
model=self.model,
messages=messages,
response_format=response_format,
temperature=self.temperature,
max_tokens=1024
)
elapsed = time.time() - start_time
logger.info(f"API call completed in {elapsed:.2f}s")
return completion
def extract_events(
self,
text: str,
language: str = "zh"
) -> EventExtraction:
"""
从文本中提取日历事件
Args:
text: 输入文本
language: 响应语言(zh=en, en=English)
Returns:
EventExtraction: 结构化的事件列表
"""
self.stats["total_requests"] += 1
system_prompt = """你是一位专业的日历助手,擅长从非结构化文本中提取事件信息。
请严格遵守以下规则:
1. 只提取明确提及的事件,不要推断
2. 日期必须转换为 YYYY-MM-DD 格式
3. 时间必须转换为 24 小时制 HH:MM
4. 如果信息缺失,使用 null 而非猜测
5. 置信度评分基于信息完整性
"""
user_prompt = f"""请从以下文本中提取所有日历事件:
<text>
{text}
</text>
要求:
- 输出语言:{'中文' if language == 'zh' else 'English'}
- 必须包含所有明确提及的事件
- 日期和时间格式必须严格符合 schema
"""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
]
try:
completion = self._call_with_retry(messages, EventExtraction)
result = completion.choices[0].message.parsed
self.stats["successful_requests"] += 1
logger.info(f"Successfully extracted {len(result.events)} events")
return result
except Exception as e:
self.stats["failures"] += 1
logger.error(f"Event extraction failed: {e}")
raise
def get_stats(self) -> dict:
"""获取统计信息"""
return self.stats
# ==================== 使用示例 ====================
if __name__ == "__main__":
# 初始化客户端
client = ReliableLLMClient(
model="gpt-4o-2024-08-06",
temperature=0.2,
max_retries=3
)
# 测试输入
test_text = """
下周我们要安排几个会议:
1. 产品评审会:下周一(3 月 17 日)下午 2 点,在 3 楼会议室,参加者有张三、李四、王五
2. 团队团建:3 月 20 日晚上 6 点,地点是海底捞火锅,所有人都要参加
3. 客户演示:暂定 3 月 22 日上午 10 点,参会人员包括 CEO 和销售总监
"""
# 提取事件
try:
result = client.extract_events(test_text)
print(f"\n✅ 成功提取 {len(result.events)} 个事件")
print(f"📊 置信度:{result.confidence:.2f}")
for i, event in enumerate(result.events, 1):
print(f"\n事件 {i}:")
print(f" 名称:{event.name}")
print(f" 日期:{event.date}")
print(f" 时间:{event.time}")
print(f" 地点:{event.location}")
print(f" 参与者:{', '.join(event.participants)}")
if result.notes:
print(f"\n📝 备注:{result.notes}")
# 打印统计
print(f"\n📈 统计信息:{client.get_stats()}")
except Exception as e:
print(f"❌ 提取失败:{e}")
输出示例
✅ 成功提取 3 个事件
📊 置信度:0.95
事件 1:
名称:产品评审会
日期:2026-03-17
时间:14:00
地点:3 楼会议室
参与者:张三,李四,王五
事件 2:
名称:团队团建
日期:2026-03-20
时间:18:00
地点:海底捞火锅
参与者:所有人
事件 3:
名称:客户演示
日期:2026-03-22
时间:10:00
地点:null
参与者:CEO, 销售总监
📝 备注:客户演示的地点未明确提及,需要后续确认
📈 统计信息:{'total_requests': 1, 'successful_requests': 1, 'retries': 0, 'failures': 0}
集成方式
1. 与现有系统集成
# existing_service.py
class CalendarService:
"""现有的日历服务"""
def __init__(self):
# 注入 ReliableLLMClient
self.llm_client = ReliableLLMClient()
def create_events_from_text(self, user_input: str) -> list:
"""从用户输入创建日历事件"""
# 调用 LLM 提取
extraction = self.llm_client.extract_events(user_input)
# 转换为内部格式
events = []
for event_data in extraction.events:
event = self._convert_to_internal(event_data)
events.append(event)
return events
def _convert_to_internal(self, event_data: CalendarEvent) -> dict:
"""转换为内部事件格式"""
return {
"title": event_data.name,
"start_datetime": f"{event_data.date}T{event_data.time or '00:00:00'}",
"attendees": event_data.participants,
"location": event_data.location,
"metadata": {
"extracted": True,
"confidence": extraction.confidence
}
}
2. 多供应商 Fallback
from anthropic import Anthropic
class MultiProviderClient:
"""多供应商 LLM 客户端"""
def __init__(self):
self.primary = ReliableLLMClient(model="gpt-4o-2024-08-06")
self.fallback = AnthropicClient() # 自定义 Anthropic 客户端
self.use_fallback = False
def extract_events(self, text: str):
"""带 fallback 的事件提取"""
if not self.use_fallback:
try:
return self.primary.extract_events(text)
except (RateLimitError, APIError) as e:
logger.warning(f"Primary failed, switching to fallback: {e}")
self.use_fallback = True
# Fallback 到 Anthropic
return self.fallback.extract_events(text)
def reset(self):
"""重置为 primary"""
self.use_fallback = False
3. 与 FastAPI 集成
# api.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI()
llm_client = ReliableLLMClient()
class ExtractRequest(BaseModel):
text: str
language: str = "zh"
class ExtractResponse(BaseModel):
events: list
confidence: float
notes: Optional[str]
@app.post("/extract-events")
async def extract_events(request: ExtractRequest):
"""从文本提取事件 API"""
try:
result = llm_client.extract_events(
text=request.text,
language=request.language
)
return ExtractResponse(
events=[
{
"name": e.name,
"date": e.date,
"time": e.time,
"participants": e.participants,
"location": e.location
}
for e in result.events
],
confidence=result.confidence,
notes=result.notes
)
except Exception as e:
logger.error(f"Extraction failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
配置要点
1. API 参数调优
# 推荐配置表
USE_CASE_CONFIGS = {
"数据提取": {
"model": "gpt-4o-2024-08-06",
"temperature": 0.0, # 完全确定性
"max_tokens": 1024,
"response_format": PydanticModel
},
"分类任务": {
"model": "gpt-4o-2024-08-06",
"temperature": 0.1, # 几乎确定性
"max_tokens": 256,
},
"创意写作": {
"model": "gpt-4o-2024-08-06",
"temperature": 0.7, # 平衡创意与质量
"max_tokens": 2048,
},
"头脑风暴": {
"model": "gpt-4o-2024-08-06",
"temperature": 0.9, # 高创造性
"max_tokens": 4096,
}
}
2. 重试策略配置
from tenacity import stop_after_attempt, wait_exponential
# 推荐重试配置
RETRY_CONFIG = {
"max_attempts": 3, # 最多重试 3 次
"multiplier": 1, # 指数基数
"min_wait": 1, # 最少等待 1 秒
"max_wait": 10, # 最多等待 10 秒
"retriable_errors": [
RateLimitError, # 429
APIError, # 5xx
TimeoutError # 超时
]
}
# 不要重试的错误
NON_RETRIABLE_ERRORS = [
AuthenticationError, # 401
PermissionDeniedError, # 403
BadRequestError, # 400(schema 错误等)
]
3. Token 优化
# Schema 优化:精简 vs 详细
# ❌ 冗余 Schema(Token 消耗大)
class VerboseEvent(BaseModel):
"""
This class represents a calendar event.
It contains all the information about the event.
"""
name: str = Field(
description="The name of the event. This should be a short, descriptive string."
)
# ... 其他字段
# ✅ 精简 Schema(节省 Token)
class CompactEvent(BaseModel):
"""日历事件"""
name: str = Field(description="事件名称")
date: str | None = Field(description="YYYY-MM-DD")
participants: list[str] = Field(description="参与者列表")
# 输出 token 减少 40-60%
4. 监控指标
# 关键监控指标
MONITORING_METRICS = {
"success_rate": "成功请求 / 总请求",
"p50_latency": "50% 请求的响应时间",
"p95_latency": "95% 请求的响应时间",
"p99_latency": "99% 请求的响应时间",
"retry_rate": "需要重试的请求比例",
"fallback_rate": "触发 fallback 的比例",
"schema_compliance": "首次即合规的比例",
"token_usage": "平均每请求 token 消耗"
}
# Prometheus 指标示例
from prometheus_client import Counter, Histogram
REQUEST_TOTAL = Counter('llm_requests_total', 'Total LLM requests', ['status'])
REQUEST_LATENCY = Histogram('llm_request_latency_seconds', 'LLM request latency')
RETRY_TOTAL = Counter('llm_retries_total', 'Total LLM retries')
参考资料
- OpenAI Python SDK Documentation - OpenAI 官方 Python SDK
- Tenacity Retry Library - Python 重试库文档
- Pydantic Documentation - Pydantic 验证库
- Anthropic Python SDK - Anthropic 官方 SDK
- Instructor Library - 自动重试与验证库