Logo
热心市民王先生

关键代码验证

技术研究 代码示例 LLM

幻觉检测与验证的核心代码实现示例

核心实现示例

1. 基础 Prompt 约束模板

# system_prompt.py
from typing import List

def create_system_prompt(
    domain: str,
    knowledge_cutoff: str,
    allowed_sources: List[str],
    escalate_threshold: float = 0.85
) -> str:
    """
    生成防幻觉 System Prompt 模板
    """
    return f"""
# 角色定义
你是{domain}领域的专业助手。

# 知识边界
- 知识截止时间:{knowledge_cutoff}
- 仅使用以下来源的信息:{', '.join(allowed_sources)}
- 对于{knowledge_cutoff}之后的事件,明确说明信息可能不完整

# 响应规则
1. **准确性优先**:如果不确定,回答"我没有足够信息回答这个问题"
2. **来源引用**:每个事实性声明必须附带来源
3. **置信度评估**:在回答末尾添加置信度评分 (0-1)
4. **禁止行为**:
   - 绝不编造数据、引用或来源
   - 绝不推测未知信息
   - 绝不提供超出你专业范围的建议

# 升级机制
如果置信度低于{escalate_threshold},明确建议用户咨询人工专家。

# 输出格式
```json
{{
  "answer": "你的回答",
  "sources": ["来源 1", "来源 2"],
  "confidence": 0.0-1.0,
  "reasoning": "简要推理过程"
}}

"""

使用示例

system_prompt = create_system_prompt( domain=“法律咨询”, knowledge_cutoff=“2024 年 12 月”, allowed_sources=[“中国法律法规数据库”, “最高人民法院司法解释”], escalate_threshold=0.9 )


---

### 2. JSON Schema 约束输出

```python
# schema_validation.py
from pydantic import BaseModel, Field, validator
from typing import List, Optional, Literal
import json

class CitedFact(BaseModel):
    """原子事实声明及引用"""
    statement: str = Field(..., description="事实声明")
    source: str = Field(..., description="来源文档/URL")
    confidence: float = Field(..., ge=0, le=1, description="置信度")

class StructuredResponse(BaseModel):
    """结构化响应 Schema"""
    answer: str = Field(..., description="完整回答")
    facts: List[CitedFact] = Field(..., description="关键事实列表")
    overall_confidence: float = Field(..., ge=0, le=1)
    reasoning_trace: List[str] = Field(..., description="推理步骤")
    requires_escalation: bool = Field(..., description="是否需要人工审核")
    
    @validator('facts')
    def validate_facts(cls, v):
        if len(v) == 0:
            raise ValueError("必须包含至少一个事实声明")
        return v
    
    @validator('overall_confidence')
    def validate_confidence(cls, v, values):
        # 如果置信度低,必须标记为需要升级
        if v < 0.7 and not values.get('requires_escalation'):
            raise ValueError("低置信度必须标记为需要升级")
        return v
    
    class Config:
        schema_extra = {
            "example": {
                "answer": "根据...",
                "facts": [
                    {"statement": "...", "source": "...", "confidence": 0.95}
                ],
                "overall_confidence": 0.92,
                "reasoning_trace": ["步骤 1", "步骤 2"],
                "requires_escalation": False
            }
        }

# 验证函数
def validate_llm_response(response_json: dict) -> StructuredResponse:
    """验证 LLM 响应是否符合 Schema"""
    try:
        return StructuredResponse(**response_json)
    except Exception as e:
        raise ValueError(f"Schema 验证失败:{str(e)}")

3. RAG 检索增强实现

# rag_pipeline.py
from langchain.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains import RetrievalQA
from typing import List, Tuple
import numpy as np

class HallucinationAwareRAG:
    """防幻觉 RAG 系统"""
    
    def __init__(self, documents: List[str], similarity_threshold: float = 0.7):
        # 文本分块
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=500,
            chunk_overlap=50,
            length_function=len
        )
        
        # 混合检索:BM25 + 向量
        self.chunks = self.text_splitter.split_documents(documents)
        self.embeddings = OpenAIEmbeddings()
        
        # 向量检索
        self.vectorstore = Chroma.from_documents(
            documents=self.chunks,
            embedding=self.embeddings
        )
        
        self.similarity_threshold = similarity_threshold
    
    def retrieve_with_confidence(self, query: str, k: int = 3) -> Tuple[List[str], float]:
        """
        检索相关文档并返回置信度
        """
        results = self.vectorstore.similarity_search_with_score(query, k=k)
        
        if not results:
            return [], 0.0
        
        # 计算平均相似度作为检索置信度
        scores = [score for _, score in results]
        avg_similarity = 1 - np.mean(scores)  # Chroma 返回距离,转换为相似度
        
        # 过滤低相似度结果
        filtered_results = [
            doc for doc, score in results 
            if (1 - score) >= self.similarity_threshold
        ]
        
        return [doc.page_content for doc in filtered_results], avg_similarity
    
    def generate_with_rag(self, query: str, llm) -> dict:
        """
        完整的 RAG 生成流程
        """
        # 1. 检索
        context, retrieval_confidence = self.retrieve_with_confidence(query)
        
        if not context or retrieval_confidence < 0.6:
            return {
                "answer": "我没有找到足够的相关信息来回答这个问题。",
                "retrieval_confidence": retrieval_confidence,
                "requires_escalation": True
            }
        
        # 2. 构建 Prompt(强制仅使用检索内容)
        prompt = f"""仅使用以下信息回答问题。如果信息不足,说"信息不足"。

检索到的信息:
{' '.join(context)}

问题:{query}

回答(JSON 格式):
"""
        
        # 3. 生成
        response = llm.generate(prompt)
        
        # 4. 验证(检查是否引用了检索内容)
        sources_cited = self._verify_citations(response, context)
        
        return {
            "answer": response,
            "context_used": context,
            "retrieval_confidence": retrieval_confidence,
            "sources_cited": sources_cited,
            "requires_escalation": not sources_cited
        }
    
    def _verify_citations(self, response: str, context: List[str]) -> bool:
        """验证回答是否引用了检索内容"""
        # 简单实现:检查是否有重叠的关键词
        context_text = ' '.join(context).lower()
        response_lower = response.lower()
        
        # 提取 context 中的关键短语
        key_phrases = [word for word in context_text.split() if len(word) > 5]
        
        # 检查是否有足够多的短语出现在回答中
        matches = sum(1 for phrase in key_phrases if phrase in response_lower)
        return matches >= len(key_phrases) * 0.3  # 至少 30% 覆盖

4. Self-Consistency 交叉验证

# self_consistency.py
from typing import List, Dict
from collections import Counter
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from sentence_transformers import SentenceTransformer

class SelfConsistencyChecker:
    """自洽性验证器"""
    
    def __init__(self, llm_client, model: str = "gpt-4", num_samples: int = 5):
        self.llm = llm_client
        self.model = model
        self.num_samples = num_samples
        self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
    
    def generate_multiple_responses(self, prompt: str) -> List[str]:
        """生成多个独立响应"""
        responses = []
        for _ in range(self.num_samples):
            response = self.llm.chat.completions.create(
                model=self.model,
                messages=[{"role": "user", "content": prompt}],
                temperature=1.0  # 高温度增加多样性
            )
            responses.append(response.choices[0].message.content)
        return responses
    
    def cluster_by_semantic_similarity(self, responses: List[str]) -> Dict:
        """
        按语义相似度聚类响应
        返回最一致的集群
        """
        if len(responses) < 2:
            return {"cluster": responses, "confidence": 1.0}
        
        # 生成嵌入
        embeddings = self.embedding_model.encode(responses)
        
        # 计算相似度矩阵
        similarity_matrix = cosine_similarity(embeddings)
        
        # 简单聚类:找到最相似的响应组
        threshold = 0.7
        clusters = []
        visited = set()
        
        for i in range(len(responses)):
            if i in visited:
                continue
            
            cluster = [i]
            visited.add(i)
            
            for j in range(i + 1, len(responses)):
                if j not in visited and similarity_matrix[i][j] > threshold:
                    cluster.append(j)
                    visited.add(j)
            
            clusters.append(cluster)
        
        # 选择最大的集群
        largest_cluster = max(clusters, key=len)
        cluster_responses = [responses[i] for i in largest_cluster]
        
        # 计算置信度(集群大小 / 总样本数)
        confidence = len(largest_cluster) / len(responses)
        
        return {
            "cluster": cluster_responses,
            "confidence": confidence,
            "all_responses": responses,
            "num_clusters": len(clusters)
        }
    
    def verify(self, prompt: str) -> Dict:
        """
        完整的自洽性验证流程
        """
        # 1. 生成多个响应
        responses = self.generate_multiple_responses(prompt)
        
        # 2. 聚类
        clustering_result = self.cluster_by_semantic_similarity(responses)
        
        # 3. 决策
        if clustering_result['confidence'] >= 0.8:
            verdict = "HIGH_CONSISTENCY"
        elif clustering_result['confidence'] >= 0.5:
            verdict = "MEDIUM_CONSISTENCY"
        else:
            verdict = "LOW_CONSISTENCY_POTENTIAL_HALLUCINATION"
        
        return {
            "verdict": verdict,
            "final_answer": clustering_result['cluster'][0],  # 选择集群代表
            "confidence": clustering_result['confidence'],
            "consistency_details": clustering_result
        }

# 使用示例
checker = SelfConsistencyChecker(llm_client, num_samples=5)
result = checker.verify("量子纠缠的原理是什么?")

print(f"验证结果:{result['verdict']}")
print(f"置信度:{result['confidence']:.2f}")
print(f"最终答案:{result['final_answer']}")

5. UQLM 置信度评分集成

# confidence_scoring.py
import asyncio
from uqlm import BlackBoxUQ, WhiteBoxUQ
from uqlm.scorers import (
    SemanticNegentropy,
    EntailmentProbability,
    LengthNormalizedSequenceProbability
)

class UQLMConfidenceScorer:
    """使用 UQLM 进行置信度评分"""
    
    def __init__(self, llm_client, model: str):
        self.llm = llm_client
        self.model = model
        
        # 黑盒评分器(基于一致性)
        self.black_box_uq = BlackBoxUQ(
            llm=llm_client,
            scorers=[
                SemanticNegentropy(),
                EntailmentProbability()
            ],
            use_best=True,
            num_responses=5  # 生成 5 个样本
        )
        
        # 白盒评分器(基于 token 概率,需 logprobs 支持)
        self.white_box_uq = WhiteBoxUQ(
            llm=llm_client,
            scorers=[
                LengthNormalizedSequenceProbability()
            ]
        )
    
    async def score_response(self, prompt: str) -> Dict:
        """
        对单个 prompt 生成响应并评分
        """
        # 黑盒评分
        bb_results = await self.black_box_uq.generate_and_score([prompt])
        
        # 提取最佳评分
        best_response = bb_results['responses'][0]
        confidence_score = bb_results['scores'][0]['semantic_negentropy']
        
        return {
            "response": best_response,
            "confidence": confidence_score,
            "verdict": "RELIABLE" if confidence_score > 0.7 else "UNRELIABLE",
            "all_scores": bb_results['scores'][0]
        }
    
    def should_escalate(self, score_result: Dict, threshold: float = 0.7) -> bool:
        """判断是否需要人工审核"""
        return score_result['confidence'] < threshold

# 使用示例
async def main():
    scorer = UQLMConfidenceScorer(llm_client, model="gpt-4")
    result = await scorer.score_response("法国的首都是哪里?")
    
    print(f"回答:{result['response']}")
    print(f"置信度:{result['confidence']:.3f}")
    print(f"需要升级:{scorer.should_escalate(result)}")

# asyncio.run(main())

6. 文件操作验证(针对 Agent 任务)

# file_operation_verifier.py
import os
import hashlib
import shutil
from pathlib import Path
from typing import Dict, List, Tuple
from datetime import datetime

class FileOperationVerifier:
    """文件操作验证器"""
    
    def __init__(self, working_dir: str):
        self.working_dir = Path(working_dir)
        self.snapshots = {}  # 保存操作前快照
    
    def create_snapshot(self, label: str = "pre_operation") -> str:
        """创建当前目录状态的快照"""
        snapshot = {}
        for root, dirs, files in os.walk(self.working_dir):
            for file in files:
                file_path = Path(root) / file
                rel_path = file_path.relative_to(self.working_dir)
                snapshot[str(rel_path)] = {
                    "hash": self._file_hash(file_path),
                    "size": file_path.stat().st_size,
                    "mtime": file_path.stat().st_mtime
                }
        
        self.snapshots[label] = snapshot
        return label
    
    def _file_hash(self, file_path: Path) -> str:
        """计算文件哈希"""
        hasher = hashlib.sha256()
        with open(file_path, 'rb') as f:
            for chunk in iter(lambda: f.read(4096), b''):
                hasher.update(chunk)
        return hasher.hexdigest()
    
    def verify_file_created(self, expected_path: str) -> Tuple[bool, str]:
        """验证文件是否被创建"""
        file_path = self.working_dir / expected_path
        
        if not file_path.exists():
            return False, f"文件未创建:{expected_path}"
        
        if file_path.stat().st_size == 0:
            return False, f"文件为空:{expected_path}"
        
        return True, "文件创建成功"
    
    def verify_file_modified(self, file_path: str, snapshot_label: str = "pre_operation") -> Tuple[bool, str]:
        """验证文件是否被修改"""
        if snapshot_label not in self.snapshots:
            return False, "快照不存在"
        
        snapshot = self.snapshots[snapshot_label]
        rel_path = file_path
        
        if rel_path not in snapshot:
            return True, "文件是新增的"  # 新增也算修改
        
        old_hash = snapshot[rel_path]["hash"]
        new_hash = self._file_hash(self.working_dir / rel_path)
        
        if old_hash != new_hash:
            return True, "文件内容已修改"
        else:
            return False, "文件未发生变化"
    
    def verify_file_content(self, file_path: str, expected_content: str) -> Tuple[bool, str]:
        """验证文件内容是否符合预期"""
        full_path = self.working_dir / file_path
        
        if not full_path.exists():
            return False, "文件不存在"
        
        actual_content = full_path.read_text()
        
        if actual_content.strip() != expected_content.strip():
            return False, "文件内容不匹配"
        
        return True, "文件内容验证通过"
    
    def detect_unexpected_changes(self, snapshot_label: str = "pre_operation") -> List[str]:
        """检测意外的文件变更"""
        if snapshot_label not in self.snapshots:
            return []
        
        snapshot = self.snapshots[snapshot_label]
        changes = []
        
        # 检查现有文件
        for rel_path, old_info in snapshot.items():
            file_path = self.working_dir / rel_path
            if not file_path.exists():
                changes.append(f"文件被删除:{rel_path}")
            elif self._file_hash(file_path) != old_info["hash"]:
                changes.append(f"文件被修改:{rel_path}")
        
        # 检查新增文件
        current_files = set()
        for root, dirs, files in os.walk(self.working_dir):
            for file in files:
                file_path = Path(root) / file
                rel_path = str(file_path.relative_to(self.working_dir))
                current_files.add(rel_path)
        
        for rel_path in current_files:
            if rel_path not in snapshot:
                changes.append(f"文件被新增:{rel_path}")
        
        return changes
    
    def rollback_to_snapshot(self, snapshot_label: str) -> bool:
        """回滚到快照状态"""
        if snapshot_label not in self.snapshots:
            return False
        
        snapshot = self.snapshots[snapshot_label]
        
        # 删除当前所有文件
        for root, dirs, files in os.walk(self.working_dir, topdown=False):
            for file in files:
                (Path(root) / file).unlink()
            for dir in dirs:
                (Path(root) / dir).rmdir()
        
        # 从快照恢复(需要保存文件内容,此处简化)
        # 实际实现需保存文件内容而非仅哈希
        
        return True

# 使用示例
def verify_agent_file_operations(agent_result: Dict, working_dir: str) -> Dict:
    """验证 Agent 文件操作任务"""
    verifier = FileOperationVerifier(working_dir)
    
    # 操作前快照
    verifier.create_snapshot("pre_operation")
    
    # 执行 Agent 操作(由 Agent 框架处理)
    # agent_result = agent.execute(task)
    
    # 验证
    results = {
        "expected_files_created": [],
        "unexpected_changes": [],
        "verification_passed": True
    }
    
    # 验证预期文件
    for expected_file in agent_result.get('expected_files', []):
        success, msg = verifier.verify_file_created(expected_file)
        results["expected_files_created"].append({
            "file": expected_file,
            "success": success,
            "message": msg
        })
        if not success:
            results["verification_passed"] = False
    
    # 检测意外变更
    unexpected = verifier.detect_unexpected_changes("pre_operation")
    results["unexpected_changes"] = unexpected
    if unexpected:
        results["verification_passed"] = False
    
    return results

7. 完整验证流水线

# verification_pipeline.py
from typing import Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum

class VerificationStatus(Enum):
    PASS = "pass"
    FAIL = "fail"
    NEEDS_REVIEW = "needs_review"

@dataclass
class VerificationResult:
    status: VerificationStatus
    confidence: float
    details: Dict[str, Any]
    requires_escalation: bool

class ComprehensiveVerificationPipeline:
    """综合验证流水线"""
    
    def __init__(self, config: Dict):
        self.config = config
        # 初始化各个验证器
        # self.schema_validator = SchemaValidator(config['schema'])
        # self.rag_checker = RAGChecker(config['vectorstore'])
        # self.confidence_scorer = ConfidenceScorer(config['llm'])
        # self.consistency_checker = ConsistencyChecker(config['llm'])
    
    def verify(self, task_input: str, llm_output: Dict, context: Optional[Dict] = None) -> VerificationResult:
        """
        完整验证流程
        """
        results = {
            "schema_valid": False,
            "facts_verified": False,
            "confidence_score": 0.0,
            "consistency_check": None
        }
        
        # 1. Schema 验证
        try:
            # validated = self.schema_validator.validate(llm_output)
            results["schema_valid"] = True
        except Exception as e:
            return VerificationResult(
                status=VerificationStatus.FAIL,
                confidence=0.0,
                details={"error": f"Schema 验证失败:{str(e)}"},
                requires_escalation=True
            )
        
        # 2. 事实核查(如有 RAG 上下文)
        if context:
            # facts_verified = self.rag_checker.verify(llm_output, context['retrieved_docs'])
            results["facts_verified"] = True  # 简化示例
        
        # 3. 置信度评分
        # confidence = await self.confidence_scorer.score(task_input, llm_output)
        results["confidence_score"] = 0.92  # 简化示例
        
        # 4. 自洽性检查(可选,高成本)
        if self.config.get('enable_consistency_check', False):
            # consistency = await self.consistency_checker.verify(task_input)
            results["consistency_check"] = "high_consistency"  # 简化示例
        
        # 综合决策
        overall_confidence = results["confidence_score"]
        requires_escalation = (
            overall_confidence < self.config.get('escalation_threshold', 0.7) or
            not results["facts_verified"]
        )
        
        if requires_escalation:
            status = VerificationStatus.NEEDS_REVIEW
        elif overall_confidence >= 0.85:
            status = VerificationStatus.PASS
        else:
            status = VerificationStatus.NEEDS_REVIEW
        
        return VerificationResult(
            status=status,
            confidence=overall_confidence,
            details=results,
            requires_escalation=requires_escalation
        )
    
    def generate_report(self, result: VerificationResult) -> str:
        """生成验证报告"""
        report = []
        report.append(f"验证状态:{result.status.value}")
        report.append(f"置信度:{result.confidence:.2f}")
        report.append(f"需要升级:{result.requires_escalation}")
        report.append("")
        report.append("详细结果:")
        for key, value in result.details.items():
            report.append(f"  - {key}: {value}")
        
        return "\n".join(report)

# 使用示例
config = {
    'escalation_threshold': 0.7,
    'enable_consistency_check': True,
    'schema': {...}  # JSON Schema
}

pipeline = ComprehensiveVerificationPipeline(config)
result = pipeline.verify(
    task_input="分析这份销售报告",
    llm_output={...},
    context={'retrieved_docs': [...]}
)

print(pipeline.generate_report(result))

部署配置示例

Docker Compose 部署

# docker-compose.yml
version: '3.8'

services:
  llm-gateway:
    image: llm-hallucination-mitigation:latest
    ports:
      - "8000:8000"
    environment:
      - LLM_API_KEY=${LLM_API_KEY}
      - ESCALATION_THRESHOLD=0.7
      - ENABLE_CONSISTENCY_CHECK=true
    volumes:
      - ./config:/app/config
      - ./logs:/app/logs
  
  vector-db:
    image: chromadb/chroma:latest
    ports:
      - "8001:8000"
    volumes:
      - chroma-data:/chroma/chroma
  
  monitoring:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    volumes:
      - grafana-data:/var/lib/grafana
      - ./monitoring/dashboards:/etc/grafana/provisioning/dashboards
  
volumes:
  chroma-data:
  grafana-data:

测试用例

# test_verification.py
import pytest
from verification_pipeline import ComprehensiveVerificationPipeline

class TestHallucinationMitigation:
    
    @pytest.fixture
    def pipeline(self):
        config = {
            'escalation_threshold': 0.7,
            'enable_consistency_check': False
        }
        return ComprehensiveVerificationPipeline(config)
    
    def test_high_confidence_pass(self, pipeline):
        """高置信度应直接通过"""
        result = pipeline.verify(
            task_input="法国首都是哪里?",
            llm_output={
                "answer": "巴黎",
                "confidence": 0.99
            }
        )
        assert result.status == VerificationStatus.PASS
        assert result.confidence > 0.85
        assert not result.requires_escalation
    
    def test_low_confidence_escalation(self, pipeline):
        """低置信度应升级"""
        result = pipeline.verify(
            task_input="2050 年世界杯在哪里举办?",
            llm_output={
                "answer": "尚未公布",
                "confidence": 0.3
            }
        )
        assert result.status == VerificationStatus.NEEDS_REVIEW
        assert result.requires_escalation
    
    def test_schema_validation_fail(self, pipeline):
        """Schema 验证失败应拒绝"""
        result = pipeline.verify(
            task_input="测试",
            llm_output={"invalid_field": "value"}  # 不符合 schema
        )
        assert result.status == VerificationStatus.FAIL
        assert result.requires_escalation