Skip to content

实施指南

环境准备

系统要求

  • 操作系统:Linux/macOS/Windows(Windows 需要 WSL)
  • Bun:v1.0 或更高版本
  • Node.js:可选(如需使用某些仅支持 Node.js 的库)
  • OpenCode:已安装并可在 PATH 中访问

安装依赖

创建项目并安装必要的包:

bash
# 初始化项目
bun init -y

# 安装 Telegram Bot 库
bun add grammy

# 安装进程持久化库(可选)
bun add fs-extra

# 安装类型定义
bun add -d @types/node

项目结构

telegram-opencode-bot/
├── src/
│   ├── index.ts          # 入口文件
│   ├── process-manager.ts    # 进程管理
│   ├── stream-listener.ts    # 流监听
│   ├── output-parser.ts      # 输出解析
│   ├── notify-strategy.ts    # 通知策略
│   ├── telegram-notifier.ts  # Telegram 通知
│   ├── config.ts             # 配置管理
│   └── utils.ts              # 工具函数
├── config/
│   └── bot.config.json      # 机器人配置
├── data/
│   └── processes.json       # 进程存储(运行时生成)
├── package.json
├── tsconfig.json
└── .env.example             # 环境变量示例

配置说明

环境变量 (.env)

bash
# Telegram Bot Token(必填)
TELEGRAM_BOT_TOKEN=your_bot_token_here

# 允许的用户 ID(逗号分隔,必填)
ALLOWED_USER_IDS=123456789,987654321

# OpenCode 可执行文件路径(可选,默认使用 PATH 中的 opencode)
OPENCODE_PATH=/usr/local/bin/opencode

# 进程超时时间(毫秒,默认 30 分钟)
PROCESS_TIMEOUT_MS=1800000

# 最大并发进程数(默认 3)
MAX_CONCURRENT_PROCESSES=3

机器人配置 (config/bot.config.json)

json
{
  "notificationRules": [
    {
      "eventType": "error",
      "throttleMs": 0,
      "includeFullOutput": true,
      "priority": "high"
    },
    {
      "eventType": "completion",
      "throttleMs": 0,
      "includeFullOutput": false,
      "priority": "medium"
    },
    {
      "eventType": "tool_call",
      "throttleMs": 5000,
      "includeFullOutput": false,
      "priority": "medium"
    },
    {
      "eventType": "progress",
      "throttleMs": 30000,
      "includeFullOutput": false,
      "priority": "low"
    }
  ],
  "messageTemplates": {
    "start": "🚀 OpenCode 已启动,PID: {pid}",
    "error": "❌ 错误:{error}",
    "progress": "⏳ {message}",
    "completion": "✅ 任务完成!耗时:{duration}",
    "killed": "🛑 进程已终止"
  }
}

核心代码实现

1. 进程管理器 (process-manager.ts)

typescript
import { spawn, type ChildProcess } from 'child_process';
import fs from 'fs-extra';
import path from 'path';

interface ProcessInfo {
  pid: number;
  chatId: number;
  messageId: number;
  startTime: number;
  prompt: string;
  status: 'running' | 'stopped' | 'failed';
}

export class ProcessManager {
  private processes: Map<number, ProcessInfo> = new Map();
  private processMap: Map<number, ChildProcess> = new Map();
  private storePath: string;

  constructor() {
    this.storePath = path.join(process.cwd(), 'data', 'processes.json');
    this.loadProcesses();
  }

  async start(prompt: string, chatId: number, messageId: number): Promise<number> {
    const opencodePath = process.env.OPENCODE_PATH || 'opencode';
    const args = ['--prompt=' + prompt];

    const child = spawn(opencodePath, args, {
      stdio: ['ignore', 'pipe', 'pipe'],
      detached: false,
    });

    const processInfo: ProcessInfo = {
      pid: child.pid!,
      chatId,
      messageId,
      startTime: Date.now(),
      prompt,
      status: 'running',
    };

    this.processes.set(chatId, processInfo);
    this.processMap.set(child.pid!, child);
    await this.saveProcesses();

    child.on('exit', (code, signal) => {
      processInfo.status = signal === 'SIGKILL' || signal === 'SIGTERM'
        ? 'stopped'
        : code === 0 ? 'stopped' : 'failed';
      this.processes.delete(chatId);
      this.processMap.delete(child.pid!);
      this.saveProcesses();
    });

    return child.pid!;
  }

  async stop(chatId: number): Promise<boolean> {
    const processInfo = this.processes.get(chatId);
    if (!processInfo) return false;

    const child = this.processMap.get(processInfo.pid);
    if (!child) return false;

    // 先尝试优雅终止
    child.kill('SIGTERM');

    // 5 秒后强制终止
    setTimeout(() => {
      if (child.killed === false) {
        child.kill('SIGKILL');
      }
    }, 5000);

    return true;
  }

  async getStatus(chatId: number): Promise<ProcessInfo | null> {
    return this.processes.get(chatId) || null;
  }

  getProcess(pid: number): ChildProcess | undefined {
    return this.processMap.get(pid);
  }

  private async saveProcesses(): Promise<void> {
    await fs.ensureDir(path.dirname(this.storePath));
    const data = Array.from(this.processes.values());
    await fs.writeJson(this.storePath, data, { spaces: 2 });
  }

  private loadProcesses(): void {
    if (fs.existsSync(this.storePath)) {
      const data = fs.readJsonSync(this.storePath) as ProcessInfo[];
      data.forEach((info: ProcessInfo) => {
        if (info.status === 'running') {
          // 清理僵尸进程记录
          this.processes.set(info.chatId, { ...info, status: 'failed' });
        }
      });
    }
  }
}

2. 流监听器 (stream-listener.ts)

typescript
import type { ChildProcess } from 'child_process';

type StreamCallback = (chunk: string, type: 'stdout' | 'stderr') => void;

export class StreamListener {
  private callbacks: StreamCallback[] = [];
  private buffer: Map<'stdout' | 'stderr', string> = new Map([
    ['stdout', ''],
    ['stderr', ''],
  ]);

  onChunk(callback: StreamCallback) {
    this.callbacks.push(callback);
  }

  listen(process: ChildProcess): void {
    if (process.stdout) {
      process.stdout.setEncoding('utf-8');
      process.stdout.on('data', (data: string) => {
        this.handleChunk(data, 'stdout');
      });
    }

    if (process.stderr) {
      process.stderr.setEncoding('utf-8');
      process.stderr.on('data', (data: string) => {
        this.handleChunk(data, 'stderr');
      });
    }
  }

  private handleChunk(chunk: string, type: 'stdout' | 'stderr'): void {
    // 清除 ANSI 颜色代码
    const cleanChunk = chunk.replace(/\x1b\[[0-9;]*m/g, '');

    // 追加到缓冲区
    const currentBuffer = this.buffer.get(type) || '';
    const newBuffer = currentBuffer + cleanChunk;
    this.buffer.set(type, newBuffer);

    // 触发回调
    this.callbacks.forEach(callback => callback(cleanChunk, type));

    // 如果缓冲区太大,定期清理
    if (newBuffer.length > 100000) {
      this.buffer.set(type, newBuffer.slice(-50000));
    }
  }

  getBuffer(type: 'stdout' | 'stderr'): string {
    return this.buffer.get(type) || '';
  }

  clearBuffer(type?: 'stdout' | 'stderr'): void {
    if (type) {
      this.buffer.set(type, '');
    } else {
      this.buffer.clear();
    }
  }
}

3. 输出解析器 (output-parser.ts)

typescript
export interface ParsedEvent {
  type: 'tool_call' | 'file_edit' | 'error' | 'progress' | 'completion' | 'other';
  content: string;
  timestamp: number;
  rawOutput: string;
}

export class OutputParser {
  private lastNotifyTime: Map<string, number> = new Map();

  parse(output: string): ParsedEvent[] {
    const events: ParsedEvent[] = [];
    const lines = output.split('\n');
    const now = Date.now();

    for (const line of lines) {
      const trimmed = line.trim();
      if (!trimmed) continue;

      // 识别工具调用
      const toolMatch = trimmed.match(/(?:Using|Running|Executing)\s+([a-z_-]+)/i);
      if (toolMatch) {
        events.push({
          type: 'tool_call',
          content: `正在执行工具: ${toolMatch[1]}`,
          timestamp: now,
          rawOutput: trimmed,
        });
        continue;
      }

      // 识别文件编辑
      const fileMatch = trimmed.match(/(?:Wrote|Modified|Created|Deleted)\s+([^\s]+)/);
      if (fileMatch) {
        events.push({
          type: 'file_edit',
          content: `文件已修改: ${fileMatch[1]}`,
          timestamp: now,
          rawOutput: trimmed,
        });
        continue;
      }

      // 识别错误
      if (trimmed.match(/(?:Error|Failed|Exception)/i)) {
        events.push({
          type: 'error',
          content: trimmed,
          timestamp: now,
          rawOutput: trimmed,
        });
        continue;
      }

      // 识别完成
      if (trimmed.match(/(?:Done|Complete|Finished)/i)) {
        events.push({
          type: 'completion',
          content: '任务完成',
          timestamp: now,
          rawOutput: trimmed,
        });
        continue;
      }

      // 其他输出作为进度
      events.push({
        type: 'progress',
        content: trimmed,
        timestamp: now,
        rawOutput: trimmed,
      });
    }

    return events;
  }
}

4. Telegram Bot 主入口 (index.ts)

typescript
import { Bot } from 'grammy';
import { ProcessManager } from './process-manager.js';
import { StreamListener } from './stream-listener.js';
import { OutputParser } from './output-parser.js';
import { TelegramNotifier } from './telegram-notifier.js';

const bot = new Bot(process.env.TELEGRAM_BOT_TOKEN!);
const processManager = new ProcessManager();
const outputParser = new OutputParser();
const telegramNotifier = new TelegramNotifier(bot);

// 用户权限验证
function isAuthorized(chatId: number): boolean {
  const allowedIds = process.env.ALLOWED_USER_IDS?.split(',').map(Number) || [];
  return allowedIds.includes(chatId);
}

// 主指令处理
bot.command('opencode', async (ctx) => {
  const chatId = ctx.chat.id;
  const messageId = ctx.message.message_id;

  if (!isAuthorized(chatId)) {
    await ctx.reply('❌ 未授权访问');
    return;
  }

  const prompt = ctx.message.text.replace('/opencode', '').trim();
  if (!prompt) {
    await ctx.reply('请提供提示词:/opencode <prompt>');
    return;
  }

  // 检查是否有正在运行的进程
  const existingProcess = await processManager.getStatus(chatId);
  if (existingProcess && existingProcess.status === 'running') {
    await ctx.reply('⚠️ 已有进程在运行,请先使用 /kill 终止');
    return;
  }

  try {
    const pid = await processManager.start(prompt, chatId, messageId);
    const process = processManager.getProcess(pid);

    if (process) {
      const streamListener = new StreamListener();
      streamListener.listen(process);

      streamListener.onChunk(async (chunk, type) => {
        if (type === 'stdout') {
          const events = outputParser.parse(chunk);

          for (const event of events) {
            const shouldNotify = telegramNotifier.shouldNotify(event);
            if (shouldNotify) {
              await telegramNotifier.notify(chatId, messageId, event, chunk);
            }
          }
        }
      });

      await ctx.reply(`🚀 OpenCode 已启动,PID: ${pid}`);
    }
  } catch (error) {
    await ctx.reply(`❌ 启动失败:${(error as Error).message}`);
  }
});

// Kill 指令
bot.command('kill', async (ctx) => {
  const chatId = ctx.chat.id;

  if (!isAuthorized(chatId)) {
    await ctx.reply('❌ 未授权访问');
    return;
  }

  const success = await processManager.stop(chatId);

  if (success) {
    await ctx.reply('🛑 进程已终止');
  } else {
    await ctx.reply('❌ 没有运行中的进程');
  }
});

// 启动机器人
bot.start();
console.log('Telegram Bot 已启动');

部署指南

使用 PM2 守护进程

bash
# 安装 PM2
bun install -g pm2

# 启动机器人
pm2 start src/index.ts --name telegram-opencode-bot

# 设置开机自启
pm2 startup
pm2 save

使用 Systemd(Linux)

创建 /etc/systemd/system/telegram-opencode-bot.service

ini
[Unit]
Description=Telegram OpenCode Bot
After=network.target

[Service]
Type=simple
User=your_username
WorkingDirectory=/path/to/telegram-opencode-bot
Environment=PATH=/usr/bin:/home/your_username/.bun/bin
Environment=TELEGRAM_BOT_TOKEN=your_token
Environment=ALLOWED_USER_IDS=123456789
ExecStart=/home/your_username/.bun/bin/bun run src/index.ts
Restart=always
RestartSec=10

[Install]
WantedBy=multi-user.target

启动服务:

bash
sudo systemctl enable telegram-opencode-bot
sudo systemctl start telegram-opencode-bot
sudo systemctl status telegram-opencode-bot

测试

本地测试

bash
# 启动机器人
bun run src/index.ts

# 在 Telegram 中发送测试消息
/opencode write a hello world function in python

测试清单

  • [ ] 机器人能够启动并响应指令
  • [ ] OpenCode 进程正常启动并执行任务
  • [ ] 输出能够实时发送到 Telegram
  • [ ] /kill 指令能够终止进程
  • [ ] 权限控制正常工作
  • [ ] 进程崩溃后能够清理状态
  • [ ] 长时间运行无内存泄漏

故障排查

常见问题

问题可能原因解决方案
机器人无响应Token 错误检查环境变量中的 TELEGRAM_BOT_TOKEN
OpenCode 无法启动PATH 配置问题设置 OPENCODE_PATH 环境变量
输出未显示流监听失败检查 OpenCode 的输出是否发送到 stdout
进程无法终止PID 错误检查进程存储文件是否正确
消息发送失败API 限流增加节流时间

调试模式

设置环境变量启用详细日志:

bash
DEBUG=1 bun run src/index.ts

参考资料