关键代码验证
技术研究 代码示例 LLM
幻觉检测与验证的核心代码实现示例
核心实现示例
1. 基础 Prompt 约束模板
# system_prompt.py
from typing import List
def create_system_prompt(
domain: str,
knowledge_cutoff: str,
allowed_sources: List[str],
escalate_threshold: float = 0.85
) -> str:
"""
生成防幻觉 System Prompt 模板
"""
return f"""
# 角色定义
你是{domain}领域的专业助手。
# 知识边界
- 知识截止时间:{knowledge_cutoff}
- 仅使用以下来源的信息:{', '.join(allowed_sources)}
- 对于{knowledge_cutoff}之后的事件,明确说明信息可能不完整
# 响应规则
1. **准确性优先**:如果不确定,回答"我没有足够信息回答这个问题"
2. **来源引用**:每个事实性声明必须附带来源
3. **置信度评估**:在回答末尾添加置信度评分 (0-1)
4. **禁止行为**:
- 绝不编造数据、引用或来源
- 绝不推测未知信息
- 绝不提供超出你专业范围的建议
# 升级机制
如果置信度低于{escalate_threshold},明确建议用户咨询人工专家。
# 输出格式
```json
{{
"answer": "你的回答",
"sources": ["来源 1", "来源 2"],
"confidence": 0.0-1.0,
"reasoning": "简要推理过程"
}}
"""
使用示例
system_prompt = create_system_prompt( domain=“法律咨询”, knowledge_cutoff=“2024 年 12 月”, allowed_sources=[“中国法律法规数据库”, “最高人民法院司法解释”], escalate_threshold=0.9 )
---
### 2. JSON Schema 约束输出
```python
# schema_validation.py
from pydantic import BaseModel, Field, validator
from typing import List, Optional, Literal
import json
class CitedFact(BaseModel):
"""原子事实声明及引用"""
statement: str = Field(..., description="事实声明")
source: str = Field(..., description="来源文档/URL")
confidence: float = Field(..., ge=0, le=1, description="置信度")
class StructuredResponse(BaseModel):
"""结构化响应 Schema"""
answer: str = Field(..., description="完整回答")
facts: List[CitedFact] = Field(..., description="关键事实列表")
overall_confidence: float = Field(..., ge=0, le=1)
reasoning_trace: List[str] = Field(..., description="推理步骤")
requires_escalation: bool = Field(..., description="是否需要人工审核")
@validator('facts')
def validate_facts(cls, v):
if len(v) == 0:
raise ValueError("必须包含至少一个事实声明")
return v
@validator('overall_confidence')
def validate_confidence(cls, v, values):
# 如果置信度低,必须标记为需要升级
if v < 0.7 and not values.get('requires_escalation'):
raise ValueError("低置信度必须标记为需要升级")
return v
class Config:
schema_extra = {
"example": {
"answer": "根据...",
"facts": [
{"statement": "...", "source": "...", "confidence": 0.95}
],
"overall_confidence": 0.92,
"reasoning_trace": ["步骤 1", "步骤 2"],
"requires_escalation": False
}
}
# 验证函数
def validate_llm_response(response_json: dict) -> StructuredResponse:
"""验证 LLM 响应是否符合 Schema"""
try:
return StructuredResponse(**response_json)
except Exception as e:
raise ValueError(f"Schema 验证失败:{str(e)}")
3. RAG 检索增强实现
# rag_pipeline.py
from langchain.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains import RetrievalQA
from typing import List, Tuple
import numpy as np
class HallucinationAwareRAG:
"""防幻觉 RAG 系统"""
def __init__(self, documents: List[str], similarity_threshold: float = 0.7):
# 文本分块
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=50,
length_function=len
)
# 混合检索:BM25 + 向量
self.chunks = self.text_splitter.split_documents(documents)
self.embeddings = OpenAIEmbeddings()
# 向量检索
self.vectorstore = Chroma.from_documents(
documents=self.chunks,
embedding=self.embeddings
)
self.similarity_threshold = similarity_threshold
def retrieve_with_confidence(self, query: str, k: int = 3) -> Tuple[List[str], float]:
"""
检索相关文档并返回置信度
"""
results = self.vectorstore.similarity_search_with_score(query, k=k)
if not results:
return [], 0.0
# 计算平均相似度作为检索置信度
scores = [score for _, score in results]
avg_similarity = 1 - np.mean(scores) # Chroma 返回距离,转换为相似度
# 过滤低相似度结果
filtered_results = [
doc for doc, score in results
if (1 - score) >= self.similarity_threshold
]
return [doc.page_content for doc in filtered_results], avg_similarity
def generate_with_rag(self, query: str, llm) -> dict:
"""
完整的 RAG 生成流程
"""
# 1. 检索
context, retrieval_confidence = self.retrieve_with_confidence(query)
if not context or retrieval_confidence < 0.6:
return {
"answer": "我没有找到足够的相关信息来回答这个问题。",
"retrieval_confidence": retrieval_confidence,
"requires_escalation": True
}
# 2. 构建 Prompt(强制仅使用检索内容)
prompt = f"""仅使用以下信息回答问题。如果信息不足,说"信息不足"。
检索到的信息:
{' '.join(context)}
问题:{query}
回答(JSON 格式):
"""
# 3. 生成
response = llm.generate(prompt)
# 4. 验证(检查是否引用了检索内容)
sources_cited = self._verify_citations(response, context)
return {
"answer": response,
"context_used": context,
"retrieval_confidence": retrieval_confidence,
"sources_cited": sources_cited,
"requires_escalation": not sources_cited
}
def _verify_citations(self, response: str, context: List[str]) -> bool:
"""验证回答是否引用了检索内容"""
# 简单实现:检查是否有重叠的关键词
context_text = ' '.join(context).lower()
response_lower = response.lower()
# 提取 context 中的关键短语
key_phrases = [word for word in context_text.split() if len(word) > 5]
# 检查是否有足够多的短语出现在回答中
matches = sum(1 for phrase in key_phrases if phrase in response_lower)
return matches >= len(key_phrases) * 0.3 # 至少 30% 覆盖
4. Self-Consistency 交叉验证
# self_consistency.py
from typing import List, Dict
from collections import Counter
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from sentence_transformers import SentenceTransformer
class SelfConsistencyChecker:
"""自洽性验证器"""
def __init__(self, llm_client, model: str = "gpt-4", num_samples: int = 5):
self.llm = llm_client
self.model = model
self.num_samples = num_samples
self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
def generate_multiple_responses(self, prompt: str) -> List[str]:
"""生成多个独立响应"""
responses = []
for _ in range(self.num_samples):
response = self.llm.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=1.0 # 高温度增加多样性
)
responses.append(response.choices[0].message.content)
return responses
def cluster_by_semantic_similarity(self, responses: List[str]) -> Dict:
"""
按语义相似度聚类响应
返回最一致的集群
"""
if len(responses) < 2:
return {"cluster": responses, "confidence": 1.0}
# 生成嵌入
embeddings = self.embedding_model.encode(responses)
# 计算相似度矩阵
similarity_matrix = cosine_similarity(embeddings)
# 简单聚类:找到最相似的响应组
threshold = 0.7
clusters = []
visited = set()
for i in range(len(responses)):
if i in visited:
continue
cluster = [i]
visited.add(i)
for j in range(i + 1, len(responses)):
if j not in visited and similarity_matrix[i][j] > threshold:
cluster.append(j)
visited.add(j)
clusters.append(cluster)
# 选择最大的集群
largest_cluster = max(clusters, key=len)
cluster_responses = [responses[i] for i in largest_cluster]
# 计算置信度(集群大小 / 总样本数)
confidence = len(largest_cluster) / len(responses)
return {
"cluster": cluster_responses,
"confidence": confidence,
"all_responses": responses,
"num_clusters": len(clusters)
}
def verify(self, prompt: str) -> Dict:
"""
完整的自洽性验证流程
"""
# 1. 生成多个响应
responses = self.generate_multiple_responses(prompt)
# 2. 聚类
clustering_result = self.cluster_by_semantic_similarity(responses)
# 3. 决策
if clustering_result['confidence'] >= 0.8:
verdict = "HIGH_CONSISTENCY"
elif clustering_result['confidence'] >= 0.5:
verdict = "MEDIUM_CONSISTENCY"
else:
verdict = "LOW_CONSISTENCY_POTENTIAL_HALLUCINATION"
return {
"verdict": verdict,
"final_answer": clustering_result['cluster'][0], # 选择集群代表
"confidence": clustering_result['confidence'],
"consistency_details": clustering_result
}
# 使用示例
checker = SelfConsistencyChecker(llm_client, num_samples=5)
result = checker.verify("量子纠缠的原理是什么?")
print(f"验证结果:{result['verdict']}")
print(f"置信度:{result['confidence']:.2f}")
print(f"最终答案:{result['final_answer']}")
5. UQLM 置信度评分集成
# confidence_scoring.py
import asyncio
from uqlm import BlackBoxUQ, WhiteBoxUQ
from uqlm.scorers import (
SemanticNegentropy,
EntailmentProbability,
LengthNormalizedSequenceProbability
)
class UQLMConfidenceScorer:
"""使用 UQLM 进行置信度评分"""
def __init__(self, llm_client, model: str):
self.llm = llm_client
self.model = model
# 黑盒评分器(基于一致性)
self.black_box_uq = BlackBoxUQ(
llm=llm_client,
scorers=[
SemanticNegentropy(),
EntailmentProbability()
],
use_best=True,
num_responses=5 # 生成 5 个样本
)
# 白盒评分器(基于 token 概率,需 logprobs 支持)
self.white_box_uq = WhiteBoxUQ(
llm=llm_client,
scorers=[
LengthNormalizedSequenceProbability()
]
)
async def score_response(self, prompt: str) -> Dict:
"""
对单个 prompt 生成响应并评分
"""
# 黑盒评分
bb_results = await self.black_box_uq.generate_and_score([prompt])
# 提取最佳评分
best_response = bb_results['responses'][0]
confidence_score = bb_results['scores'][0]['semantic_negentropy']
return {
"response": best_response,
"confidence": confidence_score,
"verdict": "RELIABLE" if confidence_score > 0.7 else "UNRELIABLE",
"all_scores": bb_results['scores'][0]
}
def should_escalate(self, score_result: Dict, threshold: float = 0.7) -> bool:
"""判断是否需要人工审核"""
return score_result['confidence'] < threshold
# 使用示例
async def main():
scorer = UQLMConfidenceScorer(llm_client, model="gpt-4")
result = await scorer.score_response("法国的首都是哪里?")
print(f"回答:{result['response']}")
print(f"置信度:{result['confidence']:.3f}")
print(f"需要升级:{scorer.should_escalate(result)}")
# asyncio.run(main())
6. 文件操作验证(针对 Agent 任务)
# file_operation_verifier.py
import os
import hashlib
import shutil
from pathlib import Path
from typing import Dict, List, Tuple
from datetime import datetime
class FileOperationVerifier:
"""文件操作验证器"""
def __init__(self, working_dir: str):
self.working_dir = Path(working_dir)
self.snapshots = {} # 保存操作前快照
def create_snapshot(self, label: str = "pre_operation") -> str:
"""创建当前目录状态的快照"""
snapshot = {}
for root, dirs, files in os.walk(self.working_dir):
for file in files:
file_path = Path(root) / file
rel_path = file_path.relative_to(self.working_dir)
snapshot[str(rel_path)] = {
"hash": self._file_hash(file_path),
"size": file_path.stat().st_size,
"mtime": file_path.stat().st_mtime
}
self.snapshots[label] = snapshot
return label
def _file_hash(self, file_path: Path) -> str:
"""计算文件哈希"""
hasher = hashlib.sha256()
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b''):
hasher.update(chunk)
return hasher.hexdigest()
def verify_file_created(self, expected_path: str) -> Tuple[bool, str]:
"""验证文件是否被创建"""
file_path = self.working_dir / expected_path
if not file_path.exists():
return False, f"文件未创建:{expected_path}"
if file_path.stat().st_size == 0:
return False, f"文件为空:{expected_path}"
return True, "文件创建成功"
def verify_file_modified(self, file_path: str, snapshot_label: str = "pre_operation") -> Tuple[bool, str]:
"""验证文件是否被修改"""
if snapshot_label not in self.snapshots:
return False, "快照不存在"
snapshot = self.snapshots[snapshot_label]
rel_path = file_path
if rel_path not in snapshot:
return True, "文件是新增的" # 新增也算修改
old_hash = snapshot[rel_path]["hash"]
new_hash = self._file_hash(self.working_dir / rel_path)
if old_hash != new_hash:
return True, "文件内容已修改"
else:
return False, "文件未发生变化"
def verify_file_content(self, file_path: str, expected_content: str) -> Tuple[bool, str]:
"""验证文件内容是否符合预期"""
full_path = self.working_dir / file_path
if not full_path.exists():
return False, "文件不存在"
actual_content = full_path.read_text()
if actual_content.strip() != expected_content.strip():
return False, "文件内容不匹配"
return True, "文件内容验证通过"
def detect_unexpected_changes(self, snapshot_label: str = "pre_operation") -> List[str]:
"""检测意外的文件变更"""
if snapshot_label not in self.snapshots:
return []
snapshot = self.snapshots[snapshot_label]
changes = []
# 检查现有文件
for rel_path, old_info in snapshot.items():
file_path = self.working_dir / rel_path
if not file_path.exists():
changes.append(f"文件被删除:{rel_path}")
elif self._file_hash(file_path) != old_info["hash"]:
changes.append(f"文件被修改:{rel_path}")
# 检查新增文件
current_files = set()
for root, dirs, files in os.walk(self.working_dir):
for file in files:
file_path = Path(root) / file
rel_path = str(file_path.relative_to(self.working_dir))
current_files.add(rel_path)
for rel_path in current_files:
if rel_path not in snapshot:
changes.append(f"文件被新增:{rel_path}")
return changes
def rollback_to_snapshot(self, snapshot_label: str) -> bool:
"""回滚到快照状态"""
if snapshot_label not in self.snapshots:
return False
snapshot = self.snapshots[snapshot_label]
# 删除当前所有文件
for root, dirs, files in os.walk(self.working_dir, topdown=False):
for file in files:
(Path(root) / file).unlink()
for dir in dirs:
(Path(root) / dir).rmdir()
# 从快照恢复(需要保存文件内容,此处简化)
# 实际实现需保存文件内容而非仅哈希
return True
# 使用示例
def verify_agent_file_operations(agent_result: Dict, working_dir: str) -> Dict:
"""验证 Agent 文件操作任务"""
verifier = FileOperationVerifier(working_dir)
# 操作前快照
verifier.create_snapshot("pre_operation")
# 执行 Agent 操作(由 Agent 框架处理)
# agent_result = agent.execute(task)
# 验证
results = {
"expected_files_created": [],
"unexpected_changes": [],
"verification_passed": True
}
# 验证预期文件
for expected_file in agent_result.get('expected_files', []):
success, msg = verifier.verify_file_created(expected_file)
results["expected_files_created"].append({
"file": expected_file,
"success": success,
"message": msg
})
if not success:
results["verification_passed"] = False
# 检测意外变更
unexpected = verifier.detect_unexpected_changes("pre_operation")
results["unexpected_changes"] = unexpected
if unexpected:
results["verification_passed"] = False
return results
7. 完整验证流水线
# verification_pipeline.py
from typing import Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum
class VerificationStatus(Enum):
PASS = "pass"
FAIL = "fail"
NEEDS_REVIEW = "needs_review"
@dataclass
class VerificationResult:
status: VerificationStatus
confidence: float
details: Dict[str, Any]
requires_escalation: bool
class ComprehensiveVerificationPipeline:
"""综合验证流水线"""
def __init__(self, config: Dict):
self.config = config
# 初始化各个验证器
# self.schema_validator = SchemaValidator(config['schema'])
# self.rag_checker = RAGChecker(config['vectorstore'])
# self.confidence_scorer = ConfidenceScorer(config['llm'])
# self.consistency_checker = ConsistencyChecker(config['llm'])
def verify(self, task_input: str, llm_output: Dict, context: Optional[Dict] = None) -> VerificationResult:
"""
完整验证流程
"""
results = {
"schema_valid": False,
"facts_verified": False,
"confidence_score": 0.0,
"consistency_check": None
}
# 1. Schema 验证
try:
# validated = self.schema_validator.validate(llm_output)
results["schema_valid"] = True
except Exception as e:
return VerificationResult(
status=VerificationStatus.FAIL,
confidence=0.0,
details={"error": f"Schema 验证失败:{str(e)}"},
requires_escalation=True
)
# 2. 事实核查(如有 RAG 上下文)
if context:
# facts_verified = self.rag_checker.verify(llm_output, context['retrieved_docs'])
results["facts_verified"] = True # 简化示例
# 3. 置信度评分
# confidence = await self.confidence_scorer.score(task_input, llm_output)
results["confidence_score"] = 0.92 # 简化示例
# 4. 自洽性检查(可选,高成本)
if self.config.get('enable_consistency_check', False):
# consistency = await self.consistency_checker.verify(task_input)
results["consistency_check"] = "high_consistency" # 简化示例
# 综合决策
overall_confidence = results["confidence_score"]
requires_escalation = (
overall_confidence < self.config.get('escalation_threshold', 0.7) or
not results["facts_verified"]
)
if requires_escalation:
status = VerificationStatus.NEEDS_REVIEW
elif overall_confidence >= 0.85:
status = VerificationStatus.PASS
else:
status = VerificationStatus.NEEDS_REVIEW
return VerificationResult(
status=status,
confidence=overall_confidence,
details=results,
requires_escalation=requires_escalation
)
def generate_report(self, result: VerificationResult) -> str:
"""生成验证报告"""
report = []
report.append(f"验证状态:{result.status.value}")
report.append(f"置信度:{result.confidence:.2f}")
report.append(f"需要升级:{result.requires_escalation}")
report.append("")
report.append("详细结果:")
for key, value in result.details.items():
report.append(f" - {key}: {value}")
return "\n".join(report)
# 使用示例
config = {
'escalation_threshold': 0.7,
'enable_consistency_check': True,
'schema': {...} # JSON Schema
}
pipeline = ComprehensiveVerificationPipeline(config)
result = pipeline.verify(
task_input="分析这份销售报告",
llm_output={...},
context={'retrieved_docs': [...]}
)
print(pipeline.generate_report(result))
部署配置示例
Docker Compose 部署
# docker-compose.yml
version: '3.8'
services:
llm-gateway:
image: llm-hallucination-mitigation:latest
ports:
- "8000:8000"
environment:
- LLM_API_KEY=${LLM_API_KEY}
- ESCALATION_THRESHOLD=0.7
- ENABLE_CONSISTENCY_CHECK=true
volumes:
- ./config:/app/config
- ./logs:/app/logs
vector-db:
image: chromadb/chroma:latest
ports:
- "8001:8000"
volumes:
- chroma-data:/chroma/chroma
monitoring:
image: grafana/grafana:latest
ports:
- "3000:3000"
volumes:
- grafana-data:/var/lib/grafana
- ./monitoring/dashboards:/etc/grafana/provisioning/dashboards
volumes:
chroma-data:
grafana-data:
测试用例
# test_verification.py
import pytest
from verification_pipeline import ComprehensiveVerificationPipeline
class TestHallucinationMitigation:
@pytest.fixture
def pipeline(self):
config = {
'escalation_threshold': 0.7,
'enable_consistency_check': False
}
return ComprehensiveVerificationPipeline(config)
def test_high_confidence_pass(self, pipeline):
"""高置信度应直接通过"""
result = pipeline.verify(
task_input="法国首都是哪里?",
llm_output={
"answer": "巴黎",
"confidence": 0.99
}
)
assert result.status == VerificationStatus.PASS
assert result.confidence > 0.85
assert not result.requires_escalation
def test_low_confidence_escalation(self, pipeline):
"""低置信度应升级"""
result = pipeline.verify(
task_input="2050 年世界杯在哪里举办?",
llm_output={
"answer": "尚未公布",
"confidence": 0.3
}
)
assert result.status == VerificationStatus.NEEDS_REVIEW
assert result.requires_escalation
def test_schema_validation_fail(self, pipeline):
"""Schema 验证失败应拒绝"""
result = pipeline.verify(
task_input="测试",
llm_output={"invalid_field": "value"} # 不符合 schema
)
assert result.status == VerificationStatus.FAIL
assert result.requires_escalation