Logo
热心市民王先生

关键代码验证

技术研究 代码实现 算法示例

记忆管理核心算法的概念性代码实现和配置示例

时间衰减算法实现

核心权重计算类

"""
AI Agent Memory Management - Time Decay Implementation
基于艾宾浩斯遗忘曲线的记忆权重计算系统
"""

import math
import time
from dataclasses import dataclass, field
from typing import List, Optional, Dict
from enum import Enum


class MemoryLayer(Enum):
    """记忆分层"""
    CORE = "core"           # 核心层:权重 > 0.8
    ACTIVE = "active"       # 活跃层:0.5-0.8
    LONG_TERM = "long_term" # 长期层:0.2-0.5
    ARCHIVE = "archive"     # 归档层:< 0.2


@dataclass
class MemoryTrace:
    """记忆追踪对象"""
    id: str
    content: str
    created_at: float = field(default_factory=time.time)
    last_accessed: float = field(default_factory=time.time)
    access_count: int = 0
    base_importance: float = 1.0  # 基础重要性 (0-1)
    token_count: int = 0
    metadata: Dict = field(default_factory=dict)
    
    def age_hours(self) -> float:
        """返回记忆年龄(小时)"""
        return (time.time() - self.created_at) / 3600
    
    def hours_since_access(self) -> float:
        """返回距上次访问的小时数"""
        return (time.time() - self.last_accessed) / 3600


class TimeDecayMemoryManager:
    """时间衰减记忆管理器"""
    
    def __init__(self, config: Optional[Dict] = None):
        """
        初始化记忆管理器
        
        配置参数:
        - decay_rate: 基础衰减率 (每小时), 默认 0.0001
        - access_boost_max: 访问增强最大值,默认 0.5
        - recency_window: 近期窗口 (小时), 默认 168 (7 天)
        - recency_bonus: 近期加成,默认 0.2
        - layer_thresholds: 分层阈值
        """
        self.config = {
            'decay_rate': 0.0001,
            'access_boost_max': 0.5,
            'recency_window': 168,  # 7 天
            'recency_bonus': 0.2,
            'layer_thresholds': {
                'core_min': 0.8,
                'active_min': 0.5,
                'long_term_min': 0.2
            }
        }
        if config:
            self.config.update(config)
        
        self.memories: Dict[str, MemoryTrace] = {}
    
    def calculate_weight(self, memory: MemoryTrace) -> float:
        """
        计算记忆的当前权重
        
        公式:
        W = B × (T + A + R)
        
        其中:
        - B: 基础重要性
        - T: 时间权重 = exp(-λt)
        - A: 访问增强 = log(n+1) / log(max) × max_boost
        - R: 近期加成 (7 天内访问过则 +0.2)
        """
        # 1. 时间权重 (指数衰减)
        age_hours = memory.age_hours()
        time_weight = math.exp(-self.config['decay_rate'] * age_hours)
        
        # 2. 访问增强 (对数增长)
        access_boost = (
            math.log(memory.access_count + 1) / 
            math.log(100)  # 100 次访问达到最大
        ) * self.config['access_boost_max']
        access_boost = min(access_boost, self.config['access_boost_max'])
        
        # 3. 近期加成
        hours_since_access = memory.hours_since_access()
        recency_bonus = (
            self.config['recency_bonus'] 
            if hours_since_access < self.config['recency_window']
            else 0.0
        )
        
        # 4. 综合权重
        weight = memory.base_importance * (time_weight + access_boost + recency_bonus)
        
        # 5. 限制在 0-1 范围内
        return min(max(weight, 0.0), 1.0)
    
    def get_layer(self, weight: float) -> MemoryLayer:
        """根据权重确定记忆所在层"""
        thresholds = self.config['layer_thresholds']
        if weight >= thresholds['core_min']:
            return MemoryLayer.CORE
        elif weight >= thresholds['active_min']:
            return MemoryLayer.ACTIVE
        elif weight >= thresholds['long_term_min']:
            return MemoryLayer.LONG_TERM
        else:
            return MemoryLayer.ARCHIVE
    
    def access_memory(self, memory_id: str) -> Optional[MemoryTrace]:
        """访问记忆,更新访问统计并重新计算权重"""
        if memory_id not in self.memories:
            return None
        
        memory = self.memories[memory_id]
        memory.access_count += 1
        memory.last_accessed = time.time()
        
        return memory
    
    def add_memory(self, memory: MemoryTrace) -> str:
        """添加新记忆"""
        self.memories[memory.id] = memory
        return memory.id
    
    def retrieve(self, token_budget: int = 50000) -> List[MemoryTrace]:
        """
        基于优先级的记忆检索
        
        策略:
        1. 按层检索,核心层优先
        2. 每层内按权重降序
        3. token 预算用尽时停止
        """
        results = []
        tokens_used = 0
        
        # 预算分配
        budget_ratio = {
            MemoryLayer.CORE: 0.4,
            MemoryLayer.ACTIVE: 0.35,
            MemoryLayer.LONG_TERM: 0.2,
            MemoryLayer.ARCHIVE: 0.05
        }
        
        # 按层检索
        for layer in [MemoryLayer.CORE, MemoryLayer.ACTIVE, 
                      MemoryLayer.LONG_TERM, MemoryLayer.ARCHIVE]:
            # 获取该层所有记忆
            layer_memories = [
                m for m in self.memories.values()
                if self.get_layer(self.calculate_weight(m)) == layer
            ]
            
            # 按权重排序
            layer_memories.sort(
                key=lambda m: self.calculate_weight(m),
                reverse=True
            )
            
            # 该层的 token 预算
            layer_budget = int(token_budget * budget_ratio[layer])
            layer_tokens = 0
            
            # 添加记忆
            for memory in layer_memories:
                if layer_tokens + memory.token_count > layer_budget:
                    break
                if tokens_used + memory.token_count > token_budget:
                    break
                
                results.append(memory)
                tokens_used += memory.token_count
                layer_tokens += memory.token_count
        
        return results
    
    def update_all_weights(self) -> Dict[str, float]:
        """批量更新所有记忆的权重(定时任务)"""
        weights = {}
        for memory_id, memory in self.memories.items():
            weights[memory_id] = self.calculate_weight(memory)
        return weights

使用示例

# 初始化记忆管理器
manager = TimeDecayMemoryManager(config={
    'decay_rate': 0.0001,  # 每小时衰减 0.01%
    'recency_window': 168,  # 7 天
})

# 添加记忆
memory = MemoryTrace(
    id="mem_001",
    content="项目使用 JWT 进行认证,token 有效期 24 小时",
    base_importance=0.9,  # 核心设计决策
    token_count=50,
    metadata={"type": "design_decision", "module": "auth"}
)
manager.add_memory(memory)

# 访问记忆(强化)
manager.access_memory("mem_001")

# 检索记忆(在 token 预算内返回最高优先级的记忆)
retrieved = manager.retrieve(token_budget=50000)

# 批量更新权重(每小时运行一次)
weights = manager.update_all_weights()

记忆存储结构设计

SQLite + 向量数据库混合存储

-- SQLite 关系存储 (记忆元数据)
CREATE TABLE memories (
    id TEXT PRIMARY KEY,
    content_hash TEXT UNIQUE,
    layer TEXT NOT NULL,  -- core, active, long_term, archive
    base_importance REAL DEFAULT 1.0,
    created_at REAL NOT NULL,
    last_accessed REAL NOT NULL,
    access_count INTEGER DEFAULT 0,
    token_count INTEGER NOT NULL,
    content_preview TEXT,  -- 前 100 字符用于快速查看
    metadata_json TEXT     -- JSON 格式元数据
);

-- 索引优化
CREATE INDEX idx_layer ON memories(layer);
CREATE INDEX idx_weight ON memories(base_importance, last_accessed);
CREATE INDEX idx_created ON memories(created_at);
CREATE INDEX idx_hash ON memories(content_hash);

Python 数据模型

from dataclasses import dataclass, asdict
import json
import hashlib


@dataclass
class StoredMemory:
    """持久化记忆对象"""
    id: str
    content: str
    embedding: List[float]  # 向量嵌入
    layer: str
    base_importance: float
    created_at: float
    last_accessed: float
    access_count: int
    token_count: int
    metadata: Dict
    
    def to_dict(self) -> Dict:
        """转换为字典用于序列化"""
        return asdict(self)
    
    @classmethod
    def from_dict(cls, data: Dict) -> 'StoredMemory':
        """从字典重建对象"""
        return cls(**data)
    
    def content_hash(self) -> str:
        """计算内容哈希用于去重"""
        return hashlib.sha256(self.content.encode()).hexdigest()[:16]


class MemoryStorage:
    """记忆持久化存储"""
    
    def __init__(self, sqlite_path: str, vector_db_url: str):
        import sqlite3
        from qdrant_client import QdrantClient
        
        self.sqlite = sqlite3.connect(sqlite_path)
        self.vector_db = QdrantClient(url=vector_db_url)
        self._init_schema()
    
    def _init_schema(self):
        """初始化数据库模式"""
        cursor = self.sqlite.cursor()
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS memories (
                id TEXT PRIMARY KEY,
                content_hash TEXT UNIQUE,
                layer TEXT,
                base_importance REAL,
                created_at REAL,
                last_accessed REAL,
                access_count INTEGER,
                token_count INTEGER,
                content_preview TEXT,
                metadata_json TEXT
            )
        """)
        self.sqlite.commit()
    
    def store(self, memory: StoredMemory):
        """存储记忆到 SQLite 和向量数据库"""
        # 1. SQLite 存储元数据
        cursor = self.sqlite.cursor()
        cursor.execute("""
            INSERT OR REPLACE INTO memories 
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, (
            memory.id,
            memory.content_hash(),
            memory.layer,
            memory.base_importance,
            memory.created_at,
            memory.last_accessed,
            memory.access_count,
            memory.token_count,
            memory.content[:100],
            json.dumps(memory.metadata)
        ))
        self.sqlite.commit()
        
        # 2. 向量数据库存储嵌入
        self.vector_db.upsert(
            collection_name="memories",
            points=[{
                "id": memory.id,
                "vector": memory.embedding,
                "payload": {
                    "layer": memory.layer,
                    "importance": memory.base_importance
                }
            }]
        )
    
    def retrieve_by_similarity(
        self, 
        query_embedding: List[float],
        layer: Optional[str] = None,
        top_k: int = 20
    ) -> List[StoredMemory]:
        """按相似度检索记忆"""
        # 向量检索
        results = self.vector_db.search(
            collection_name="memories",
            query_vector=query_embedding,
            limit=top_k,
            filter={"layer": layer} if layer else None
        )
        
        # 从 SQLite 获取完整数据
        memories = []
        for result in results:
            cursor = self.sqlite.cursor()
            cursor.execute(
                "SELECT * FROM memories WHERE id = ?",
                (result.id,)
            )
            row = cursor.fetchone()
            if row:
                memories.append(self._row_to_memory(row, result.vector))
        
        return memories
    
    def _row_to_memory(self, row, vector: List[float]) -> StoredMemory:
        """将数据库行转换为 StoredMemory 对象"""
        return StoredMemory(
            id=row[0],
            content=row[1],  # 需要从向量库获取完整内容
            embedding=vector,
            layer=row[2],
            base_importance=row[3],
            created_at=row[4],
            last_accessed=row[5],
            access_count=row[6],
            token_count=row[7],
            metadata=json.loads(row[9])
        )

记忆提取逻辑

分层检索策略

from typing import Tuple


class HierarchicalRetriever:
    """分层记忆检索器"""
    
    def __init__(self, memory_manager: TimeDecayMemoryManager):
        self.manager = memory_manager
    
    def retrieve_with_context(
        self,
        query: str,
        query_embedding: Optional[List[float]] = None,
        token_budget: int = 50000
    ) -> Tuple[List[MemoryTrace], Dict]:
        """
        带上下文的记忆检索
        
        返回:
        - 检索到的记忆列表
        - 检索统计信息
        """
        stats = {
            'query': query,
            'total_memories': len(self.manager.memories),
            'layers': {},
            'tokens_used': 0,
            'retrieval_time_ms': 0
        }
        
        import time
        start = time.time()
        
        # 1. 基础权重检索
        base_results = self.manager.retrieve(token_budget)
        
        # 2. 如果有查询向量,进行语义重排序
        if query_embedding:
            base_results = self._rerank_by_similarity(
                base_results,
                query_embedding
            )
        
        # 3. 统计各层分布
        for memory in base_results:
            layer = self.manager.get_layer(
                self.manager.calculate_weight(memory)
            ).value
            if layer not in stats['layers']:
                stats['layers'][layer] = {'count': 0, 'tokens': 0}
            stats['layers'][layer]['count'] += 1
            stats['layers'][layer]['tokens'] += memory.token_count
            stats['tokens_used'] += memory.token_count
        
        stats['retrieval_time_ms'] = (time.time() - start) * 1000
        
        return base_results, stats
    
    def _rerank_by_similarity(
        self,
        memories: List[MemoryTrace],
        query_embedding: List[float]
    ) -> List[MemoryTrace]:
        """
        使用向量相似度重排序
        
        注意:实际实现需要访问向量数据库
        这里简化为基于元数据的启发式排序
        """
        # 简化版本:基于 metadata 中的关键词匹配
        scored = []
        for memory in memories:
            score = self._compute_relevance_score(
                memory,
                query_embedding  # 实际应计算余弦相似度
            )
            scored.append((score, memory))
        
        scored.sort(key=lambda x: x[0], reverse=True)
        return [m for _, m in scored]
    
    def _compute_relevance_score(
        self,
        memory: MemoryTrace,
        query_embedding: List[float]
    ) -> float:
        """计算记忆与查询的相关性分数"""
        # 简化实现:基于权重和访问频率
        weight = self.manager.calculate_weight(memory)
        freq_score = min(memory.access_count / 10, 1.0)
        return 0.7 * weight + 0.3 * freq_score

配置文件示例

YAML 配置

# config/memory_config.yaml

memory:
  # 衰减配置
  decay:
    rate: 0.0001          # 每小时衰减率
    half_life: 168        # 半衰期 (小时) ≈ 7 天
    min_weight: 0.01      # 最小权重
    
  # 分层阈值
  layers:
    core_min: 0.8         # 核心层下限
    active_min: 0.5       # 活跃层下限
    long_term_min: 0.2    # 长期层下限
    # < 0.2 自动归档
    
  # Token 预算
  budget:
    total: 50000          # 总 token 预算
    core_ratio: 0.4       # 核心层 40%
    active_ratio: 0.35    # 活跃层 35%
    long_term_ratio: 0.2  # 长期层 20%
    archive_ratio: 0.05   # 归档层 5%
    
  # 强化配置
  reinforcement:
    access_boost_max: 0.5     # 访问增强最大 +0.5
    recency_window: 168       # 7 天 (小时)
    recency_bonus: 0.2        # 近期访问 +0.2
    
  # 存储配置
  storage:
    sqlite_path: ./data/memories.db
    vector_db_url: http://localhost:6333
    vector_dimension: 768     # UniXcoder 嵌入维度
    collection_name: memories
    
  # 后台任务
  background_tasks:
    weight_update_interval: 3600    # 每小时更新权重
    archival_check_interval: 86400  # 每天检查归档
    cleanup_threshold: 0.01         # 权重<0.01 可物理删除

大型代码仓库特殊配置

# config/large_codebase_config.yaml

# 针对 100K+ 行代码的优化配置
memory:
  # 更快的衰减 (信息更新频繁)
  decay:
    rate: 0.0002          # 2 倍衰减率
    half_life: 84         # 3.5 天半衰期
    
  # 更激进的分层
  layers:
    core_min: 0.85        # 只保留最重要的
    active_min: 0.6
    long_term_min: 0.3
    
  # 代码特定元数据
  code_metadata:
    track_file_changes: true
    track_function_calls: true
    ast_summarization: true  # 启用 AST 摘要
    
  # 与代码图谱集成
  code_graph:
    enabled: true
    parser: tree-sitter
    languages:
      - python
      - typescript
      - javascript
      - go
    incremental_index: true

本节要点

  • 时间衰减算法核心公式:W = B × (exp(-λt) + log(n+1) + R)
  • 分层存储使用 SQLite (元数据) + 向量数据库 (嵌入)
  • 检索策略按层分配 token 预算,核心层占 40%
  • 配置参数可根据场景调整,大型代码仓库需更快衰减