Logo
热心市民王先生

pi-mono 扩展 API - CustomEntry 与记忆增强

详解 pi-mono 的 CustomEntry、CustomMessageEntry、事件监听和向量检索扩展示例

3.1 CustomEntry 状态存储

API 定义

interface CustomEntry {
  type: "custom"
  id: string           // 8 字符十六进制 ID
  parentId: string | null
  timestamp: string    // ISO 时间戳
  customType: string   // 扩展标识符
  data?: any           // 扩展特定数据
}

存储状态

// 在扩展中
export async function activate(context: ExtensionContext) {
  // 存储计数器
  await context.agent.appendCustomEntry('my-extension', { 
    count: 42,
    lastUpdated: Date.now()
  })
}

关键特性

  • 不参与 LLM 上下文:仅存储,不发送给模型
  • 类型隔离:通过 customType 区分不同扩展
  • 可查询:遍历条目过滤 type === 'custom'

读取状态

export async function restoreState(context: ExtensionContext) {
  const entries = context.agent.getEntries()
  
  const myEntries = entries.filter(e => 
    e.type === 'custom' && e.customType === 'my-extension'
  )
  
  // 获取最新状态
  const latest = myEntries[myEntries.length - 1]
  const state = latest?.data
}

应用场景

场景 1:用户偏好存储

await agent.appendCustomEntry('preferences', {
  theme: 'dark',
  compactMode: true,
  autoSaveInterval: 5000
})

场景 2:扩展配置

await agent.appendCustomEntry('memory-plugin', {
  vectorStore: 'chromadb',
  embeddingModel: 'nomic-embed-text',
  topK: 5
})

场景 3:会话元数据

await agent.appendCustomEntry('session-tags', {
  tags: ['feature-auth', 'priority-high'],
  milestone: 'checkpoint-1'
})

3.2 CustomMessageEntry 消息注入

API 定义

interface CustomMessageEntry {
  type: "custom_message"
  id: string
  parentId: string | null
  timestamp: string
  customType: string
  content: string | (TextContent | ImageContent)[]
  display: boolean     // 在 TUI 显示
  details?: any        // 元数据(不发送给 LLM)
}

注入消息到上下文

// 添加参与 LLM 上下文的消息
await context.agent.appendCustomMessageEntry(
  'memory-retrieval',
  '检索到的相关记忆:用户在上次会话中讨论了 JWT 过期时间设置为 7 天',
  true  // display: 在 TUI 显示
)

效果

  • ✅ 消息发送给 LLM
  • ✅ 在 TUI 中以特殊样式显示
  • ✅ 参与上下文构建

隐藏注入

// 仅发送给 LLM,不在 TUI 显示
await context.agent.appendCustomMessageEntry(
  'system-prompt-enhancement',
  '附加系统指令:优先使用 TypeScript',
  false  // display: false
)

多模态内容

import { ImageContent } from '@mariozechner/pi-ai'

const imageContent: ImageContent = {
  type: 'image',
  data: base64ImageData,
  mimeType: 'image/png'
}

await agent.appendCustomMessageEntry(
  'diagram-retrieval',
  [
    { type: 'text', text: '检索到的架构图:' },
    imageContent
  ],
  true
)

3.3 事件监听机制

消息事件

export async function activate(context: ExtensionContext) {
  // 监听所有消息
  context.agent.on('message', (entry: SessionMessageEntry) => {
    console.log('New message:', entry.message.role)
    
    // 可同步到外部存储
    syncToExternalStore(entry)
  })
  
  // 监听特定角色消息
  context.agent.on('message:user', (entry: UserMessage) => {
    console.log('User said:', entry.content)
  })
  
  context.agent.on('message:assistant', (entry: AssistantMessage) => {
    console.log('Assistant responded')
  })
}

工具调用事件

context.agent.on('tool:call', async (toolCall) => {
  console.log(`Tool ${toolCall.name} called with args:`, toolCall.arguments)
  
  // 可记录工具使用统计
  trackToolUsage(toolCall.name)
})

context.agent.on('tool:result', async (result) => {
  console.log(`Tool ${result.toolName} returned:`, result.content)
})

会话生命周期事件

// 会话创建
context.agent.on('session:created', (session) => {
  console.log('Session created:', session.id)
  initializeExtension(session)
})

// 会话保存前
context.agent.on('session:saving', async (session) => {
  // 可注入额外条目
  await context.agent.appendCustomEntry('backup', {
    timestamp: Date.now(),
    reason: 'pre-save-backup'
  })
})

// 会话加载后
context.agent.on('session:loaded', (session) => {
  restoreExtensionState(session)
})

3.4 向量检索扩展示例

架构设计

┌─────────────────────────────────────────────────────────────┐
│                     pi-mono Agent                           │
│                                                             │
│  ┌───────────────┐    ┌───────────────┐    ┌─────────────┐ │
│  │ Message Event │───>│  Embedding    │───>│  Vector DB  │ │
│  │   Listener    │    │   Generator   │    │  (ChromaDB) │ │
│  └───────────────┘    └───────────────┘    └─────────────┘ │
│                            │                      │         │
│                            │                      │         │
│  ┌───────────────┐    ┌────┴────────┐    ┌───────┴────────┐│
│  │ CustomMessage │<───│   Similarity│<───│  Query Vector  ││
│  │   Injection   │    │    Search   │    │   (from query) ││
│  └───────────────┘    └─────────────┘    └────────────────┘│
└─────────────────────────────────────────────────────────────┘

实现代码

// extensions/memory-plugin/src/index.ts
import { ChromaClient } from 'chromadb'
import { getEmbedding } from './embedding'

const chroma = new ChromaClient({ url: 'http://localhost:8000' })
let collection: any

export async function activate(context: ExtensionContext) {
  // 初始化集合
  collection = await chroma.getOrCreateCollection({
    name: 'pi-mono-memory',
    metadata: { description: 'Session memory for pi-mono' }
  })
  
  // 监听消息,同步到向量数据库
  context.agent.on('message', async (entry) => {
    await indexMessage(entry)
  })
  
  // 注册检索工具
  context.agent.tools.push({
    name: 'search_memory',
    description: 'Search conversation history for relevant context',
    parameters: Type.Object({
      query: Type.String({ description: 'Search query' }),
      topK: Type.Optional(Type.Number({ default: 5 }))
    }),
    execute: async (toolCallId, args) => {
      const results = await searchMemory(args.query, args.topK)
      return {
        output: formatResults(results),
        details: { results }
      }
    }
  })
}

async function indexMessage(entry: SessionMessageEntry) {
  const text = JSON.stringify(entry.message)
  const embedding = await getEmbedding(text)
  
  await collection.add({
    ids: [entry.id],
    embeddings: [embedding],
    documents: [text],
    metadatas: [{
      role: entry.message.role,
      timestamp: entry.timestamp
    }]
  })
}

async function searchMemory(query: string, topK: number = 5) {
  const queryEmbedding = await getEmbedding(query)
  
  const results = await collection.query({
    queryEmbeddings: [queryEmbedding],
    n_results: topK
  })
  
  return results.documents[0].map((doc, i) => ({
    id: results.ids[0][i],
    content: doc,
    similarity: 1 - (results.distances?.[0]?.[i] ?? 0)
  }))
}

function formatResults(results: SearchResult[]) {
  return results.map(r => 
    `[${r.id}] (similarity: ${r.similarity.toFixed(2)})\n${r.content}`
  ).join('\n\n')
}

使用示例

# 用户提问
/pi 如何实现 JWT 认证?需要支持刷新令牌

# Agent 调用 search_memory 工具
tool_call: search_memory({
  query: "JWT authentication refresh token",
  topK: 3
})

# 检索结果
[检索到的相关记忆]
[b2c3d4e5] (similarity: 0.92)
用户在上次会话中讨论了 JWT 刷新令牌存储方案,最终选择 Redis

[c3d4e5f6] (similarity: 0.87)
讨论了过期时间设置:访问令牌 15 分钟,刷新令牌 7

[d4e5f6g7] (similarity: 0.83)
实现了 Redis 存储的 refresh token 验证函数

# Agent 基于检索结果回答
基于之前的讨论,您选择了 Redis 存储刷新令牌,过期时间 7 天...

性能优化

// 批量索引
const BATCH_SIZE = 10
let buffer: SessionMessageEntry[] = []

context.agent.on('message', (entry) => {
  buffer.push(entry)
  
  if (buffer.length >= BATCH_SIZE) {
    batchIndex(buffer)
    buffer = []
  }
})

// 定期刷新
setInterval(() => {
  if (buffer.length > 0) {
    batchIndex(buffer)
    buffer = []
  }
}, 5000)  // 5 秒

3.5 扩展示例:时间衰减记忆

基于上一章研究的艾宾浩斯遗忘曲线:

// extensions/decay-memory/src/index.ts
const DECAY_RATE = 0.0001  // 每小时衰减率

interface MemoryWithWeight {
  entry: SessionMessageEntry
  weight: number
  accessCount: number
  lastAccessed: number
}

async function calculateWeight(memory: MemoryWithWeight): Promise<number> {
  const ageHours = (Date.now() - memory.lastAccessed) / 3600000
  
  // 艾宾浩斯遗忘曲线
  const timeWeight = Math.exp(-DECAY_RATE * ageHours)
  
  // 访问频率增强
  const accessBoost = Math.log(memory.accessCount + 1) / 10
  
  // 近期访问奖励
  const recencyBonus = (ageHours < 24) ? 0.2 : 0
  
  return Math.min(timeWeight + accessBoost + recencyBonus, 1.0)
}

context.agent.tools.push({
  name: 'get_important_memories',
  description: 'Get high-weight memories based on time decay',
  parameters: Type.Object({
    threshold: Type.Optional(Type.Number({ default: 0.7 })),
    limit: Type.Optional(Type.Number({ default: 10 }))
  }),
  execute: async (toolCallId, args) => {
    const entries = context.agent.getEntries()
    const memories: MemoryWithWeight[] = []
    
    // 筛选消息条目
    for (const entry of entries) {
      if (entry.type === 'message') {
        memories.push({
          entry,
          weight: 1.0,
          accessCount: 0,
          lastAccessed: new Date(entry.timestamp).getTime()
        })
      }
    }
    
    // 计算权重
    for (const memory of memories) {
      memory.weight = await calculateWeight(memory)
    }
    
    // 过滤并排序
    const important = memories
      .filter(m => m.weight >= args.threshold)
      .sort((a, b) => b.weight - a.weight)
      .slice(0, args.limit)
    
    return {
      output: important.map(m => 
        `[weight: ${m.weight.toFixed(2)}] ${JSON.stringify(m.entry.message.content)}`
      ).join('\n')
    }
  }
})

本章小结

pi-mono 的扩展 API 提供了灵活的增强点:

  1. CustomEntry:存储扩展状态,不参与上下文
  2. CustomMessageEntry:注入消息到 LLM 上下文
  3. 事件监听:拦截消息、工具调用、会话生命周期
  4. 扩展示例:向量检索、时间衰减等高级功能可自行开发

下一章将对比 pi-mono 与主流框架的记忆系统。


参考资料

  1. pi-mono SDK Documentation. https://github.com/badlogic/pi-mono/blob/main/packages/coding-agent/docs/sdk.md
  2. Extension Examples. examples/sdk/ in pi-mono repository
  3. ChromaDB. https://www.trychroma.com/ - 向量数据库