关键代码验证
技术研究 代码实现 算法示例
记忆管理核心算法的概念性代码实现和配置示例
时间衰减算法实现
核心权重计算类
"""
AI Agent Memory Management - Time Decay Implementation
基于艾宾浩斯遗忘曲线的记忆权重计算系统
"""
import math
import time
from dataclasses import dataclass, field
from typing import List, Optional, Dict
from enum import Enum
class MemoryLayer(Enum):
"""记忆分层"""
CORE = "core" # 核心层:权重 > 0.8
ACTIVE = "active" # 活跃层:0.5-0.8
LONG_TERM = "long_term" # 长期层:0.2-0.5
ARCHIVE = "archive" # 归档层:< 0.2
@dataclass
class MemoryTrace:
"""记忆追踪对象"""
id: str
content: str
created_at: float = field(default_factory=time.time)
last_accessed: float = field(default_factory=time.time)
access_count: int = 0
base_importance: float = 1.0 # 基础重要性 (0-1)
token_count: int = 0
metadata: Dict = field(default_factory=dict)
def age_hours(self) -> float:
"""返回记忆年龄(小时)"""
return (time.time() - self.created_at) / 3600
def hours_since_access(self) -> float:
"""返回距上次访问的小时数"""
return (time.time() - self.last_accessed) / 3600
class TimeDecayMemoryManager:
"""时间衰减记忆管理器"""
def __init__(self, config: Optional[Dict] = None):
"""
初始化记忆管理器
配置参数:
- decay_rate: 基础衰减率 (每小时), 默认 0.0001
- access_boost_max: 访问增强最大值,默认 0.5
- recency_window: 近期窗口 (小时), 默认 168 (7 天)
- recency_bonus: 近期加成,默认 0.2
- layer_thresholds: 分层阈值
"""
self.config = {
'decay_rate': 0.0001,
'access_boost_max': 0.5,
'recency_window': 168, # 7 天
'recency_bonus': 0.2,
'layer_thresholds': {
'core_min': 0.8,
'active_min': 0.5,
'long_term_min': 0.2
}
}
if config:
self.config.update(config)
self.memories: Dict[str, MemoryTrace] = {}
def calculate_weight(self, memory: MemoryTrace) -> float:
"""
计算记忆的当前权重
公式:
W = B × (T + A + R)
其中:
- B: 基础重要性
- T: 时间权重 = exp(-λt)
- A: 访问增强 = log(n+1) / log(max) × max_boost
- R: 近期加成 (7 天内访问过则 +0.2)
"""
# 1. 时间权重 (指数衰减)
age_hours = memory.age_hours()
time_weight = math.exp(-self.config['decay_rate'] * age_hours)
# 2. 访问增强 (对数增长)
access_boost = (
math.log(memory.access_count + 1) /
math.log(100) # 100 次访问达到最大
) * self.config['access_boost_max']
access_boost = min(access_boost, self.config['access_boost_max'])
# 3. 近期加成
hours_since_access = memory.hours_since_access()
recency_bonus = (
self.config['recency_bonus']
if hours_since_access < self.config['recency_window']
else 0.0
)
# 4. 综合权重
weight = memory.base_importance * (time_weight + access_boost + recency_bonus)
# 5. 限制在 0-1 范围内
return min(max(weight, 0.0), 1.0)
def get_layer(self, weight: float) -> MemoryLayer:
"""根据权重确定记忆所在层"""
thresholds = self.config['layer_thresholds']
if weight >= thresholds['core_min']:
return MemoryLayer.CORE
elif weight >= thresholds['active_min']:
return MemoryLayer.ACTIVE
elif weight >= thresholds['long_term_min']:
return MemoryLayer.LONG_TERM
else:
return MemoryLayer.ARCHIVE
def access_memory(self, memory_id: str) -> Optional[MemoryTrace]:
"""访问记忆,更新访问统计并重新计算权重"""
if memory_id not in self.memories:
return None
memory = self.memories[memory_id]
memory.access_count += 1
memory.last_accessed = time.time()
return memory
def add_memory(self, memory: MemoryTrace) -> str:
"""添加新记忆"""
self.memories[memory.id] = memory
return memory.id
def retrieve(self, token_budget: int = 50000) -> List[MemoryTrace]:
"""
基于优先级的记忆检索
策略:
1. 按层检索,核心层优先
2. 每层内按权重降序
3. token 预算用尽时停止
"""
results = []
tokens_used = 0
# 预算分配
budget_ratio = {
MemoryLayer.CORE: 0.4,
MemoryLayer.ACTIVE: 0.35,
MemoryLayer.LONG_TERM: 0.2,
MemoryLayer.ARCHIVE: 0.05
}
# 按层检索
for layer in [MemoryLayer.CORE, MemoryLayer.ACTIVE,
MemoryLayer.LONG_TERM, MemoryLayer.ARCHIVE]:
# 获取该层所有记忆
layer_memories = [
m for m in self.memories.values()
if self.get_layer(self.calculate_weight(m)) == layer
]
# 按权重排序
layer_memories.sort(
key=lambda m: self.calculate_weight(m),
reverse=True
)
# 该层的 token 预算
layer_budget = int(token_budget * budget_ratio[layer])
layer_tokens = 0
# 添加记忆
for memory in layer_memories:
if layer_tokens + memory.token_count > layer_budget:
break
if tokens_used + memory.token_count > token_budget:
break
results.append(memory)
tokens_used += memory.token_count
layer_tokens += memory.token_count
return results
def update_all_weights(self) -> Dict[str, float]:
"""批量更新所有记忆的权重(定时任务)"""
weights = {}
for memory_id, memory in self.memories.items():
weights[memory_id] = self.calculate_weight(memory)
return weights
使用示例
# 初始化记忆管理器
manager = TimeDecayMemoryManager(config={
'decay_rate': 0.0001, # 每小时衰减 0.01%
'recency_window': 168, # 7 天
})
# 添加记忆
memory = MemoryTrace(
id="mem_001",
content="项目使用 JWT 进行认证,token 有效期 24 小时",
base_importance=0.9, # 核心设计决策
token_count=50,
metadata={"type": "design_decision", "module": "auth"}
)
manager.add_memory(memory)
# 访问记忆(强化)
manager.access_memory("mem_001")
# 检索记忆(在 token 预算内返回最高优先级的记忆)
retrieved = manager.retrieve(token_budget=50000)
# 批量更新权重(每小时运行一次)
weights = manager.update_all_weights()
记忆存储结构设计
SQLite + 向量数据库混合存储
-- SQLite 关系存储 (记忆元数据)
CREATE TABLE memories (
id TEXT PRIMARY KEY,
content_hash TEXT UNIQUE,
layer TEXT NOT NULL, -- core, active, long_term, archive
base_importance REAL DEFAULT 1.0,
created_at REAL NOT NULL,
last_accessed REAL NOT NULL,
access_count INTEGER DEFAULT 0,
token_count INTEGER NOT NULL,
content_preview TEXT, -- 前 100 字符用于快速查看
metadata_json TEXT -- JSON 格式元数据
);
-- 索引优化
CREATE INDEX idx_layer ON memories(layer);
CREATE INDEX idx_weight ON memories(base_importance, last_accessed);
CREATE INDEX idx_created ON memories(created_at);
CREATE INDEX idx_hash ON memories(content_hash);
Python 数据模型
from dataclasses import dataclass, asdict
import json
import hashlib
@dataclass
class StoredMemory:
"""持久化记忆对象"""
id: str
content: str
embedding: List[float] # 向量嵌入
layer: str
base_importance: float
created_at: float
last_accessed: float
access_count: int
token_count: int
metadata: Dict
def to_dict(self) -> Dict:
"""转换为字典用于序列化"""
return asdict(self)
@classmethod
def from_dict(cls, data: Dict) -> 'StoredMemory':
"""从字典重建对象"""
return cls(**data)
def content_hash(self) -> str:
"""计算内容哈希用于去重"""
return hashlib.sha256(self.content.encode()).hexdigest()[:16]
class MemoryStorage:
"""记忆持久化存储"""
def __init__(self, sqlite_path: str, vector_db_url: str):
import sqlite3
from qdrant_client import QdrantClient
self.sqlite = sqlite3.connect(sqlite_path)
self.vector_db = QdrantClient(url=vector_db_url)
self._init_schema()
def _init_schema(self):
"""初始化数据库模式"""
cursor = self.sqlite.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS memories (
id TEXT PRIMARY KEY,
content_hash TEXT UNIQUE,
layer TEXT,
base_importance REAL,
created_at REAL,
last_accessed REAL,
access_count INTEGER,
token_count INTEGER,
content_preview TEXT,
metadata_json TEXT
)
""")
self.sqlite.commit()
def store(self, memory: StoredMemory):
"""存储记忆到 SQLite 和向量数据库"""
# 1. SQLite 存储元数据
cursor = self.sqlite.cursor()
cursor.execute("""
INSERT OR REPLACE INTO memories
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
memory.id,
memory.content_hash(),
memory.layer,
memory.base_importance,
memory.created_at,
memory.last_accessed,
memory.access_count,
memory.token_count,
memory.content[:100],
json.dumps(memory.metadata)
))
self.sqlite.commit()
# 2. 向量数据库存储嵌入
self.vector_db.upsert(
collection_name="memories",
points=[{
"id": memory.id,
"vector": memory.embedding,
"payload": {
"layer": memory.layer,
"importance": memory.base_importance
}
}]
)
def retrieve_by_similarity(
self,
query_embedding: List[float],
layer: Optional[str] = None,
top_k: int = 20
) -> List[StoredMemory]:
"""按相似度检索记忆"""
# 向量检索
results = self.vector_db.search(
collection_name="memories",
query_vector=query_embedding,
limit=top_k,
filter={"layer": layer} if layer else None
)
# 从 SQLite 获取完整数据
memories = []
for result in results:
cursor = self.sqlite.cursor()
cursor.execute(
"SELECT * FROM memories WHERE id = ?",
(result.id,)
)
row = cursor.fetchone()
if row:
memories.append(self._row_to_memory(row, result.vector))
return memories
def _row_to_memory(self, row, vector: List[float]) -> StoredMemory:
"""将数据库行转换为 StoredMemory 对象"""
return StoredMemory(
id=row[0],
content=row[1], # 需要从向量库获取完整内容
embedding=vector,
layer=row[2],
base_importance=row[3],
created_at=row[4],
last_accessed=row[5],
access_count=row[6],
token_count=row[7],
metadata=json.loads(row[9])
)
记忆提取逻辑
分层检索策略
from typing import Tuple
class HierarchicalRetriever:
"""分层记忆检索器"""
def __init__(self, memory_manager: TimeDecayMemoryManager):
self.manager = memory_manager
def retrieve_with_context(
self,
query: str,
query_embedding: Optional[List[float]] = None,
token_budget: int = 50000
) -> Tuple[List[MemoryTrace], Dict]:
"""
带上下文的记忆检索
返回:
- 检索到的记忆列表
- 检索统计信息
"""
stats = {
'query': query,
'total_memories': len(self.manager.memories),
'layers': {},
'tokens_used': 0,
'retrieval_time_ms': 0
}
import time
start = time.time()
# 1. 基础权重检索
base_results = self.manager.retrieve(token_budget)
# 2. 如果有查询向量,进行语义重排序
if query_embedding:
base_results = self._rerank_by_similarity(
base_results,
query_embedding
)
# 3. 统计各层分布
for memory in base_results:
layer = self.manager.get_layer(
self.manager.calculate_weight(memory)
).value
if layer not in stats['layers']:
stats['layers'][layer] = {'count': 0, 'tokens': 0}
stats['layers'][layer]['count'] += 1
stats['layers'][layer]['tokens'] += memory.token_count
stats['tokens_used'] += memory.token_count
stats['retrieval_time_ms'] = (time.time() - start) * 1000
return base_results, stats
def _rerank_by_similarity(
self,
memories: List[MemoryTrace],
query_embedding: List[float]
) -> List[MemoryTrace]:
"""
使用向量相似度重排序
注意:实际实现需要访问向量数据库
这里简化为基于元数据的启发式排序
"""
# 简化版本:基于 metadata 中的关键词匹配
scored = []
for memory in memories:
score = self._compute_relevance_score(
memory,
query_embedding # 实际应计算余弦相似度
)
scored.append((score, memory))
scored.sort(key=lambda x: x[0], reverse=True)
return [m for _, m in scored]
def _compute_relevance_score(
self,
memory: MemoryTrace,
query_embedding: List[float]
) -> float:
"""计算记忆与查询的相关性分数"""
# 简化实现:基于权重和访问频率
weight = self.manager.calculate_weight(memory)
freq_score = min(memory.access_count / 10, 1.0)
return 0.7 * weight + 0.3 * freq_score
配置文件示例
YAML 配置
# config/memory_config.yaml
memory:
# 衰减配置
decay:
rate: 0.0001 # 每小时衰减率
half_life: 168 # 半衰期 (小时) ≈ 7 天
min_weight: 0.01 # 最小权重
# 分层阈值
layers:
core_min: 0.8 # 核心层下限
active_min: 0.5 # 活跃层下限
long_term_min: 0.2 # 长期层下限
# < 0.2 自动归档
# Token 预算
budget:
total: 50000 # 总 token 预算
core_ratio: 0.4 # 核心层 40%
active_ratio: 0.35 # 活跃层 35%
long_term_ratio: 0.2 # 长期层 20%
archive_ratio: 0.05 # 归档层 5%
# 强化配置
reinforcement:
access_boost_max: 0.5 # 访问增强最大 +0.5
recency_window: 168 # 7 天 (小时)
recency_bonus: 0.2 # 近期访问 +0.2
# 存储配置
storage:
sqlite_path: ./data/memories.db
vector_db_url: http://localhost:6333
vector_dimension: 768 # UniXcoder 嵌入维度
collection_name: memories
# 后台任务
background_tasks:
weight_update_interval: 3600 # 每小时更新权重
archival_check_interval: 86400 # 每天检查归档
cleanup_threshold: 0.01 # 权重<0.01 可物理删除
大型代码仓库特殊配置
# config/large_codebase_config.yaml
# 针对 100K+ 行代码的优化配置
memory:
# 更快的衰减 (信息更新频繁)
decay:
rate: 0.0002 # 2 倍衰减率
half_life: 84 # 3.5 天半衰期
# 更激进的分层
layers:
core_min: 0.85 # 只保留最重要的
active_min: 0.6
long_term_min: 0.3
# 代码特定元数据
code_metadata:
track_file_changes: true
track_function_calls: true
ast_summarization: true # 启用 AST 摘要
# 与代码图谱集成
code_graph:
enabled: true
parser: tree-sitter
languages:
- python
- typescript
- javascript
- go
incremental_index: true
本节要点
- 时间衰减算法核心公式:
W = B × (exp(-λt) + log(n+1) + R) - 分层存储使用 SQLite (元数据) + 向量数据库 (嵌入)
- 检索策略按层分配 token 预算,核心层占 40%
- 配置参数可根据场景调整,大型代码仓库需更快衰减