Logo
热心市民王先生

实施指南

技术研究 实施指南 代码示例

memU集成的具体实施步骤和代码示例

环境准备

系统要求

依赖版本要求说明
Python3.13+核心运行环境
PostgreSQL16+持久化存储(可选)
pgvectorlatest向量扩展(PostgreSQL)
Docker20+容器化部署(可选)

安装步骤

方式一:pip安装(推荐)

# 克隆仓库
git clone https://github.com/NevaMind-AI/memU.git
cd memU

# 安装依赖
pip install -e .

方式二:使用uv(开发模式)

# 安装uv
pip install uv

# 安装项目
make install

环境变量配置

# .env文件
OPENAI_API_KEY=sk-your-openai-key
OPENROUTER_API_KEY=sk-or-your-key  # 如果使用OpenRouter
DASHSCOPE_API_KEY=sk-your-key       # 如果使用阿里云

基础配置

内存模式(测试/开发)

最简单的配置,无需数据库:

from memu import MemUService

service = MemUService(
    llm_profiles={
        "default": {
            "base_url": "https://api.openai.com/v1",
            "api_key": "your_api_key",
            "chat_model": "gpt-4o",
            "embed_model": "text-embedding-3-small"
        }
    },
    database_config={
        "metadata_store": {"provider": "inmemory"}
    }
)

持久化模式(生产)

from memu import MemUService

service = MemUService(
    llm_profiles={
        "default": {
            "base_url": "https://api.openai.com/v1",
            "api_key": "your_api_key",
            "chat_model": "gpt-4o",
            "embed_model": "text-embedding-3-small"
        }
    },
    database_config={
        "metadata_store": {
            "provider": "postgres",
            "url": "postgresql://user:pass@localhost:5432/memu"
        }
    }
)

核心操作示例

示例1:对话记忆管理

import asyncio
from memu import MemUService

async def conversation_memory_example():
    """对话记忆示例"""
    
    # 初始化服务
    service = MemUService(
        llm_profiles={
            "default": {
                "api_key": "your_api_key",
                "chat_model": "gpt-4o"
            }
        },
        database_config={"metadata_store": {"provider": "inmemory"}}
    )
    
    # 模拟对话数据
    conversation = [
        {"role": "user", "content": "我喜欢喝咖啡,尤其是美式"},
        {"role": "assistant", "content": "好的,我记住了你喜欢美式咖啡"},
        {"role": "user", "content": "我是软件工程师,主要用Python开发"},
        {"role": "assistant", "content": "了解了,你是Python开发者"},
        {"role": "user", "content": "周末我喜欢去爬山"}
    ]
    
    # 存储记忆
    result = await service.memorize(
        resource_url=conversation,
        modality="conversation",
        user={"user_id": "user_001"}
    )
    
    print("存储的记忆条目:")
    for item in result.get("items", []):
        print(f"  - {item}")
    
    # 检索记忆
    retrieval = await service.retrieve(
        queries=[
            {"role": "user", "content": {"text": "这个用户喜欢什么?"}}
        ],
        where={"user_id": "user_001"},
        method="rag"
    )
    
    print("\n检索结果:")
    for item in retrieval.get("items", []):
        print(f"  - {item}")

# 运行示例
asyncio.run(conversation_memory_example())

示例2:技能提取

import asyncio
from memu import MemUService

async def skill_extraction_example():
    """从执行日志中提取技能"""
    
    service = MemUService(
        llm_profiles={
            "default": {
                "api_key": "your_api_key",
                "chat_model": "gpt-4o"
            }
        },
        database_config={"metadata_store": {"provider": "inmemory"}}
    )
    
    # 模拟执行日志
    execution_log = """
    任务:部署Flask应用到Docker
    
    步骤1:创建Dockerfile
    FROM python:3.11-slim
    WORKDIR /app
    COPY requirements.txt .
    RUN pip install -r requirements.txt
    COPY . .
    CMD ["python", "app.py"]
    
    步骤2:构建镜像
    docker build -t my-flask-app .
    
    步骤3:运行容器
    docker run -d -p 5000:5000 my-flask-app
    
    结果:成功部署
    """
    
    # 提取技能
    result = await service.memorize(
        resource_url=execution_log,
        modality="document",
        user={"agent_id": "dev_agent_001"}
    )
    
    print("提取的技能/知识:")
    for item in result.get("items", []):
        print(f"  - {item}")

asyncio.run(skill_extraction_example())

示例3:多模态记忆

import asyncio
from memu import MemUService

async def multimodal_example():
    """多模态记忆示例"""
    
    service = MemUService(
        llm_profiles={
            "default": {
                "api_key": "your_api_key",
                "chat_model": "gpt-4o"
            }
        },
        database_config={"metadata_store": {"provider": "inmemory"}}
    )
    
    # 存储文档
    doc_result = await service.memorize(
        resource_url="path/to/document.pdf",
        modality="document",
        user={"user_id": "user_001"}
    )
    
    # 存储图片(需要LLM支持Vision)
    image_result = await service.memorize(
        resource_url="path/to/image.png",
        modality="image",
        user={"user_id": "user_001"}
    )
    
    # 跨模态检索
    retrieval = await service.retrieve(
        queries=[
            {"role": "user", "content": {"text": "找出所有与项目相关的内容"}}
        ],
        where={"user_id": "user_001"},
        method="rag"
    )
    
    print("跨模态检索结果:")
    for resource in retrieval.get("resources", []):
        print(f"  - {resource.get('type')}: {resource.get('url')}")

asyncio.run(multimodal_example())

高级配置

自定义提示词

from memu import MemUService

service = MemUService(
    llm_profiles={...},
    pipeline_config={
        "memory_item_extraction": {
            "prompt_template": """
            你是一个专业的记忆提取助手。
            请从以下对话中提取关键信息:
            
            对话内容:
            {conversation}
            
            请提取:
            1. 用户偏好
            2. 重要事实
            3. 待办事项
            
            以JSON格式返回。
            """
        }
    }
)

批量处理

import asyncio
from memu import MemUService

async def batch_memorize(service: MemUService, resources: list):
    """批量记忆处理"""
    
    tasks = [
        service.memorize(
            resource_url=resource["url"],
            modality=resource["type"],
            user={"user_id": resource["user_id"]}
        )
        for resource in resources
    ]
    
    results = await asyncio.gather(*tasks)
    
    total_items = sum(len(r.get("items", [])) for r in results)
    print(f"批量处理完成,共提取 {total_items} 条记忆")
    
    return results

测试验证

运行官方测试

# 设置API Key
export OPENAI_API_KEY=your_api_key

# 内存模式测试
cd tests
python test_inmemory.py

# PostgreSQL模式测试
python test_postgres.py

# OpenRouter测试
export OPENROUTER_API_KEY=your_key
python test_openrouter.py

质量检查

# 运行代码质量检查
make check

# 包含:
# - Ruff lint
# - Black format
# - mypy type check
# - deptry dependency analysis

常见问题

Q1: 如何处理大量历史数据?

# 使用异步批量处理
async def migrate_historical_data(service, data_source):
    batch_size = 100
    for i in range(0, len(data_source), batch_size):
        batch = data_source[i:i+batch_size]
        await batch_memorize(service, batch)
        await asyncio.sleep(1)  # 避免API限流

Q2: 如何优化Token消耗?

# 1. 使用RAG模式进行初步筛选
initial = await service.retrieve(query, method="rag")

# 2. 只在必要时使用LLM模式
if needs_deep_reasoning:
    detailed = await service.retrieve(query, method="llm")

# 3. 使用缓存
service = MemUService(
    ...,
    cache_config={"enabled": True, "ttl": 3600}
)

Q3: 多用户场景如何隔离?

# 始终在查询中指定user_id
result = await service.retrieve(
    queries=queries,
    where={"user_id": "specific_user_id"}  # 确保隔离
)

参考资料