Logo
热心市民王先生

关键代码验证

代码示例 JSONL 实现 最佳实践

JSONL 格式的读写实现和典型用例代码示例

4.1 基础读写操作

4.1.1 Python 实现

写入 JSONL

import json
from datetime import datetime

def write_message(filepath: str, role: str, content: str, **metadata):
    """
    向 JSONL 文件追加一条消息记录
    
    Args:
        filepath: JSONL 文件路径
        role: 消息角色(user/assistant/system)
        content: 消息内容
        **metadata: 额外元数据
    """
    record = {
        "timestamp": datetime.utcnow().isoformat() + "Z",
        "role": role,
        "content": content,
        **metadata
    }
    
    with open(filepath, 'a', encoding='utf-8') as f:
        f.write(json.dumps(record, ensure_ascii=False) + '\n')

# 使用示例
write_message(
    'session.jsonl',
    role='user',
    content='帮我实现一个排序函数',
    session_id='abc123',
    message_id='msg_001'
)

读取 JSONL(流式)

import json

def read_messages_streaming(filepath: str):
    """
    流式读取 JSONL 文件,内存占用恒定
    
    Yields:
        dict: 解析后的消息记录
    """
    with open(filepath, 'r', encoding='utf-8') as f:
        for line_num, line in enumerate(f, 1):
            line = line.strip()
            if not line:  # 跳过空行
                continue
            try:
                yield json.loads(line)
            except json.JSONDecodeError as e:
                print(f"警告:第{line_num}行解析失败:{e}")
                continue

# 使用示例
for message in read_messages_streaming('session.jsonl'):
    print(f"[{message['role']}] {message['content']}")

批量读取(带过滤)

def filter_messages(filepath: str, role: str = None, session_id: str = None):
    """
    按条件过滤消息
    
    Args:
        filepath: JSONL 文件路径
        role: 按角色过滤(可选)
        session_id: 按会话 ID 过滤(可选)
    
    Returns:
        list[dict]: 符合条件的消息列表
    """
    results = []
    for msg in read_messages_streaming(filepath):
        if role and msg.get('role') != role:
            continue
        if session_id and msg.get('session_id') != session_id:
            continue
        results.append(msg)
    return results

4.1.2 TypeScript/Node.js 实现

写入 JSONL

import * as fs from 'fs';
import { appendFile } from 'fs/promises';

interface Message {
  timestamp: string;
  role: 'user' | 'assistant' | 'system';
  content: string;
  sessionId?: string;
  messageId?: string;
  metadata?: Record<string, any>;
}

async function writeMessage(
  filepath: string,
  message: Omit<Message, 'timestamp'>
): Promise<void> {
  const record: Message = {
    timestamp: new Date().toISOString(),
    ...message,
  };
  
  const line = JSON.stringify(record) + '\n';
  await appendFile(filepath, line, { encoding: 'utf-8' });
}

// 使用示例
await writeMessage('session.jsonl', {
  role: 'user',
  content: '帮我实现一个排序函数',
  sessionId: 'abc123',
  messageId: 'msg_001',
});

流式读取 JSONL

import { createInterface } from 'readline';
import { createReadStream } from 'fs';

async function* readMessagesStream(filepath: string): AsyncGenerator<Message> {
  const fileStream = createReadStream(filepath, { encoding: 'utf-8' });
  const rl = createInterface({
    input: fileStream,
    crlfDelay: Infinity, // 识别所有换行符
  });

  for await (const line of rl) {
    if (!line.trim()) continue; // 跳过空行
    try {
      yield JSON.parse(line) as Message;
    } catch (error) {
      console.warn(`解析失败:${error}`);
    }
  }
}

// 使用示例
for await (const message of readMessagesStream('session.jsonl')) {
  console.log(`[${message.role}] ${message.content}`);
}

4.1.3 Rust 实现

写入 JSONL

use serde::Serialize;
use std::fs::OpenOptions;
use std::io::{BufWriter, Write};

#[derive(Serialize)]
struct Message<'a> {
    timestamp: String,
    role: &'a str,
    content: &'a str,
    session_id: &'a str,
    message_id: &'a str,
}

fn write_message(filepath: &str, message: &Message) -> std::io::Result<()> {
    let file = OpenOptions::new()
        .create(true)
        .append(true)
        .open(filepath)?;
    
    let mut writer = BufWriter::new(file);
    serde_json::to_writer(&mut writer, &message)?;
    writer.write_all(b"\n")?;
    writer.flush()?;
    
    Ok(())
}

流式读取 JSONL

use serde::Deserialize;
use std::fs::File;
use std::io::{BufRead, BufReader};

#[derive(Deserialize)]
struct Message {
    timestamp: String,
    role: String,
    content: String,
    session_id: Option<String>,
    message_id: Option<String>,
}

fn read_messages(filepath: &str) -> std::io::Result<Vec<Message>> {
    let file = File::open(filepath)?;
    let reader = BufReader::new(file);
    let mut messages = Vec::new();
    
    for line in reader.lines() {
        let line = line?;
        if line.trim().is_empty() {
            continue;
        }
        match serde_json::from_str::<Message>(&line) {
            Ok(msg) => messages.push(msg),
            Err(e) => eprintln!("解析失败:{}", e),
        }
    }
    
    Ok(messages)
}

4.2 高级用例

4.2.1 会话恢复

def resume_session(filepath: str, session_id: str):
    """
    恢复指定会话的上下文,用于 AI 工具继续对话
    
    Args:
        filepath: JSONL 文件路径
        session_id: 会话 ID
    
    Returns:
        list[dict]: 会话历史消息
    """
    messages = []
    for msg in read_messages_streaming(filepath):
        if msg.get('session_id') == session_id:
            # 转换为 AI API 期望的格式
            messages.append({
                'role': msg['role'],
                'content': msg['content']
            })
    return messages

# 使用示例(Claude API)
conversation_history = resume_session('history.jsonl', 'abc123')
response = claude_client.messages.create(
    model='claude-sonnet-4-5-20250929',
    max_tokens=1024,
    messages=conversation_history
)

4.2.2 对话导出

def export_session_to_markdown(filepath: str, session_id: str, output_path: str):
    """
    将 JSONL 会话导出为 Markdown 格式,便于分享和归档
    
    Args:
        filepath: JSONL 文件路径
        session_id: 会话 ID
        output_path: 输出 Markdown 文件路径
    """
    messages = resume_session(filepath, session_id)
    
    with open(output_path, 'w', encoding='utf-8') as f:
        f.write(f"# Session: {session_id}\n\n")
        f.write(f"Exported at: {datetime.utcnow().isoformat()}Z\n\n")
        f.write("---\n\n")
        
        for msg in messages:
            role = msg['role'].upper()
            content = msg['content']
            timestamp = msg.get('timestamp', 'Unknown')
            
            f.write(f"## {role} ({timestamp})\n\n")
            f.write(f"{content}\n\n")
            f.write("---\n\n")
    
    print(f"会话已导出到:{output_path}")

4.2.3 使用统计

from collections import Counter
from datetime import datetime, timedelta

def analyze_usage(filepath: str, days: int = 7):
    """
    分析最近 N 天的使用情况
    
    Args:
        filepath: JSONL 文件路径
        days: 分析天数
    
    Returns:
        dict: 统计信息
    """
    cutoff = datetime.utcnow() - timedelta(days=days)
    
    stats = {
        'total_messages': 0,
        'user_messages': 0,
        'assistant_messages': 0,
        'sessions': set(),
        'daily_counts': Counter()
    }
    
    for msg in read_messages_streaming(filepath):
        try:
            timestamp = datetime.fromisoformat(msg['timestamp'].replace('Z', '+00:00'))
            if timestamp.replace(tzinfo=None) < cutoff:
                continue
        except:
            continue
        
        stats['total_messages'] += 1
        stats[msg['role'] + '_messages'] = stats.get(msg['role'] + '_messages', 0) + 1
        stats['sessions'].add(msg.get('session_id', 'unknown'))
        
        day_key = timestamp.strftime('%Y-%m-%d')
        stats['daily_counts'][day_key] += 1
    
    stats['sessions'] = len(stats['sessions'])
    return stats

# 使用示例
usage = analyze_usage('history.jsonl', days=30)
print(f"过去 30 天:")
print(f"  总消息数:{usage['total_messages']}")
print(f"  会话数:{usage['sessions']}")
print(f"  用户消息:{usage['user_messages']}")
print(f"  AI 回复:{usage['assistant_messages']}")

4.3 性能基准测试

4.3.1 写入性能对比

import time
import json

def benchmark_write_jsonl(num_records: int = 10000):
    """JSONL 追加写入基准测试"""
    start = time.time()
    
    with open('test.jsonl', 'w') as f:
        for i in range(num_records):
            record = {"id": i, "message": f"Test message {i}"}
            f.write(json.dumps(record) + '\n')
    
    elapsed = time.time() - start
    print(f"JSONL 写入 {num_records} 条记录:{elapsed:.3f}秒")
    return elapsed

def benchmark_write_json_array(num_records: int = 10000):
    """JSON 数组追加写入基准测试(模拟低效场景)"""
    # 模拟每次追加都重写整个文件
    for i in range(num_records):
        with open('test.json', 'r') as f:
            data = json.load(f) if f.read() else []
        data.append({"id": i, "message": f"Test message {i}"})
        with open('test.json', 'w') as f:
            json.dump(data, f)
    
    # 注:此测试极慢,实际不建议这样使用

# 运行测试
jsonl_time = benchmark_write_jsonl(10000)
# json_array_time = benchmark_write_json_array(10000)  # 不运行,太慢

典型结果

  • JSONL:10,000 条记录 ≈ 0.5 秒
  • JSON 数组:10,000 条记录 ≈ 30+ 秒(60 倍性能差距)

4.3.2 内存占用对比

import tracemalloc

def measure_memory_jsonl(filepath: str):
    """测量 JSONL 流式读取的内存占用"""
    tracemalloc.start()
    
    count = 0
    for msg in read_messages_streaming(filepath):
        count += 1
    
    current, peak = tracemalloc.get_traced_memory()
    tracemalloc.stop()
    
    print(f"JSONL: 处理 {count} 条记录,峰值内存:{peak / 1024 / 1024:.2f} MB")
    return peak

def measure_memory_json_array(filepath: str):
    """测量 JSON 数组读取的内存占用"""
    tracemalloc.start()
    
    with open(filepath, 'r') as f:
        data = json.load(f)
        count = len(data)
    
    current, peak = tracemalloc.get_traced_memory()
    tracemalloc.stop()
    
    print(f"JSON: 处理 {count} 条记录,峰值内存:{peak / 1024 / 1024:.2f} MB")
    return peak

# 测试结果(1GB 文件):
# JSONL: ~2 MB 峰值内存
# JSON: ~1000+ MB 峰值内存

参考资料