Appearance
实施指南
环境准备
系统要求
- 操作系统:Linux/macOS/Windows(Windows 需要 WSL)
- Bun:v1.0 或更高版本
- Node.js:可选(如需使用某些仅支持 Node.js 的库)
- OpenCode:已安装并可在 PATH 中访问
安装依赖
创建项目并安装必要的包:
bash
# 初始化项目
bun init -y
# 安装 Telegram Bot 库
bun add grammy
# 安装进程持久化库(可选)
bun add fs-extra
# 安装类型定义
bun add -d @types/node项目结构
telegram-opencode-bot/
├── src/
│ ├── index.ts # 入口文件
│ ├── process-manager.ts # 进程管理
│ ├── stream-listener.ts # 流监听
│ ├── output-parser.ts # 输出解析
│ ├── notify-strategy.ts # 通知策略
│ ├── telegram-notifier.ts # Telegram 通知
│ ├── config.ts # 配置管理
│ └── utils.ts # 工具函数
├── config/
│ └── bot.config.json # 机器人配置
├── data/
│ └── processes.json # 进程存储(运行时生成)
├── package.json
├── tsconfig.json
└── .env.example # 环境变量示例配置说明
环境变量 (.env)
bash
# Telegram Bot Token(必填)
TELEGRAM_BOT_TOKEN=your_bot_token_here
# 允许的用户 ID(逗号分隔,必填)
ALLOWED_USER_IDS=123456789,987654321
# OpenCode 可执行文件路径(可选,默认使用 PATH 中的 opencode)
OPENCODE_PATH=/usr/local/bin/opencode
# 进程超时时间(毫秒,默认 30 分钟)
PROCESS_TIMEOUT_MS=1800000
# 最大并发进程数(默认 3)
MAX_CONCURRENT_PROCESSES=3机器人配置 (config/bot.config.json)
json
{
"notificationRules": [
{
"eventType": "error",
"throttleMs": 0,
"includeFullOutput": true,
"priority": "high"
},
{
"eventType": "completion",
"throttleMs": 0,
"includeFullOutput": false,
"priority": "medium"
},
{
"eventType": "tool_call",
"throttleMs": 5000,
"includeFullOutput": false,
"priority": "medium"
},
{
"eventType": "progress",
"throttleMs": 30000,
"includeFullOutput": false,
"priority": "low"
}
],
"messageTemplates": {
"start": "🚀 OpenCode 已启动,PID: {pid}",
"error": "❌ 错误:{error}",
"progress": "⏳ {message}",
"completion": "✅ 任务完成!耗时:{duration}",
"killed": "🛑 进程已终止"
}
}核心代码实现
1. 进程管理器 (process-manager.ts)
typescript
import { spawn, type ChildProcess } from 'child_process';
import fs from 'fs-extra';
import path from 'path';
interface ProcessInfo {
pid: number;
chatId: number;
messageId: number;
startTime: number;
prompt: string;
status: 'running' | 'stopped' | 'failed';
}
export class ProcessManager {
private processes: Map<number, ProcessInfo> = new Map();
private processMap: Map<number, ChildProcess> = new Map();
private storePath: string;
constructor() {
this.storePath = path.join(process.cwd(), 'data', 'processes.json');
this.loadProcesses();
}
async start(prompt: string, chatId: number, messageId: number): Promise<number> {
const opencodePath = process.env.OPENCODE_PATH || 'opencode';
const args = ['--prompt=' + prompt];
const child = spawn(opencodePath, args, {
stdio: ['ignore', 'pipe', 'pipe'],
detached: false,
});
const processInfo: ProcessInfo = {
pid: child.pid!,
chatId,
messageId,
startTime: Date.now(),
prompt,
status: 'running',
};
this.processes.set(chatId, processInfo);
this.processMap.set(child.pid!, child);
await this.saveProcesses();
child.on('exit', (code, signal) => {
processInfo.status = signal === 'SIGKILL' || signal === 'SIGTERM'
? 'stopped'
: code === 0 ? 'stopped' : 'failed';
this.processes.delete(chatId);
this.processMap.delete(child.pid!);
this.saveProcesses();
});
return child.pid!;
}
async stop(chatId: number): Promise<boolean> {
const processInfo = this.processes.get(chatId);
if (!processInfo) return false;
const child = this.processMap.get(processInfo.pid);
if (!child) return false;
// 先尝试优雅终止
child.kill('SIGTERM');
// 5 秒后强制终止
setTimeout(() => {
if (child.killed === false) {
child.kill('SIGKILL');
}
}, 5000);
return true;
}
async getStatus(chatId: number): Promise<ProcessInfo | null> {
return this.processes.get(chatId) || null;
}
getProcess(pid: number): ChildProcess | undefined {
return this.processMap.get(pid);
}
private async saveProcesses(): Promise<void> {
await fs.ensureDir(path.dirname(this.storePath));
const data = Array.from(this.processes.values());
await fs.writeJson(this.storePath, data, { spaces: 2 });
}
private loadProcesses(): void {
if (fs.existsSync(this.storePath)) {
const data = fs.readJsonSync(this.storePath) as ProcessInfo[];
data.forEach((info: ProcessInfo) => {
if (info.status === 'running') {
// 清理僵尸进程记录
this.processes.set(info.chatId, { ...info, status: 'failed' });
}
});
}
}
}2. 流监听器 (stream-listener.ts)
typescript
import type { ChildProcess } from 'child_process';
type StreamCallback = (chunk: string, type: 'stdout' | 'stderr') => void;
export class StreamListener {
private callbacks: StreamCallback[] = [];
private buffer: Map<'stdout' | 'stderr', string> = new Map([
['stdout', ''],
['stderr', ''],
]);
onChunk(callback: StreamCallback) {
this.callbacks.push(callback);
}
listen(process: ChildProcess): void {
if (process.stdout) {
process.stdout.setEncoding('utf-8');
process.stdout.on('data', (data: string) => {
this.handleChunk(data, 'stdout');
});
}
if (process.stderr) {
process.stderr.setEncoding('utf-8');
process.stderr.on('data', (data: string) => {
this.handleChunk(data, 'stderr');
});
}
}
private handleChunk(chunk: string, type: 'stdout' | 'stderr'): void {
// 清除 ANSI 颜色代码
const cleanChunk = chunk.replace(/\x1b\[[0-9;]*m/g, '');
// 追加到缓冲区
const currentBuffer = this.buffer.get(type) || '';
const newBuffer = currentBuffer + cleanChunk;
this.buffer.set(type, newBuffer);
// 触发回调
this.callbacks.forEach(callback => callback(cleanChunk, type));
// 如果缓冲区太大,定期清理
if (newBuffer.length > 100000) {
this.buffer.set(type, newBuffer.slice(-50000));
}
}
getBuffer(type: 'stdout' | 'stderr'): string {
return this.buffer.get(type) || '';
}
clearBuffer(type?: 'stdout' | 'stderr'): void {
if (type) {
this.buffer.set(type, '');
} else {
this.buffer.clear();
}
}
}3. 输出解析器 (output-parser.ts)
typescript
export interface ParsedEvent {
type: 'tool_call' | 'file_edit' | 'error' | 'progress' | 'completion' | 'other';
content: string;
timestamp: number;
rawOutput: string;
}
export class OutputParser {
private lastNotifyTime: Map<string, number> = new Map();
parse(output: string): ParsedEvent[] {
const events: ParsedEvent[] = [];
const lines = output.split('\n');
const now = Date.now();
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed) continue;
// 识别工具调用
const toolMatch = trimmed.match(/(?:Using|Running|Executing)\s+([a-z_-]+)/i);
if (toolMatch) {
events.push({
type: 'tool_call',
content: `正在执行工具: ${toolMatch[1]}`,
timestamp: now,
rawOutput: trimmed,
});
continue;
}
// 识别文件编辑
const fileMatch = trimmed.match(/(?:Wrote|Modified|Created|Deleted)\s+([^\s]+)/);
if (fileMatch) {
events.push({
type: 'file_edit',
content: `文件已修改: ${fileMatch[1]}`,
timestamp: now,
rawOutput: trimmed,
});
continue;
}
// 识别错误
if (trimmed.match(/(?:Error|Failed|Exception)/i)) {
events.push({
type: 'error',
content: trimmed,
timestamp: now,
rawOutput: trimmed,
});
continue;
}
// 识别完成
if (trimmed.match(/(?:Done|Complete|Finished)/i)) {
events.push({
type: 'completion',
content: '任务完成',
timestamp: now,
rawOutput: trimmed,
});
continue;
}
// 其他输出作为进度
events.push({
type: 'progress',
content: trimmed,
timestamp: now,
rawOutput: trimmed,
});
}
return events;
}
}4. Telegram Bot 主入口 (index.ts)
typescript
import { Bot } from 'grammy';
import { ProcessManager } from './process-manager.js';
import { StreamListener } from './stream-listener.js';
import { OutputParser } from './output-parser.js';
import { TelegramNotifier } from './telegram-notifier.js';
const bot = new Bot(process.env.TELEGRAM_BOT_TOKEN!);
const processManager = new ProcessManager();
const outputParser = new OutputParser();
const telegramNotifier = new TelegramNotifier(bot);
// 用户权限验证
function isAuthorized(chatId: number): boolean {
const allowedIds = process.env.ALLOWED_USER_IDS?.split(',').map(Number) || [];
return allowedIds.includes(chatId);
}
// 主指令处理
bot.command('opencode', async (ctx) => {
const chatId = ctx.chat.id;
const messageId = ctx.message.message_id;
if (!isAuthorized(chatId)) {
await ctx.reply('❌ 未授权访问');
return;
}
const prompt = ctx.message.text.replace('/opencode', '').trim();
if (!prompt) {
await ctx.reply('请提供提示词:/opencode <prompt>');
return;
}
// 检查是否有正在运行的进程
const existingProcess = await processManager.getStatus(chatId);
if (existingProcess && existingProcess.status === 'running') {
await ctx.reply('⚠️ 已有进程在运行,请先使用 /kill 终止');
return;
}
try {
const pid = await processManager.start(prompt, chatId, messageId);
const process = processManager.getProcess(pid);
if (process) {
const streamListener = new StreamListener();
streamListener.listen(process);
streamListener.onChunk(async (chunk, type) => {
if (type === 'stdout') {
const events = outputParser.parse(chunk);
for (const event of events) {
const shouldNotify = telegramNotifier.shouldNotify(event);
if (shouldNotify) {
await telegramNotifier.notify(chatId, messageId, event, chunk);
}
}
}
});
await ctx.reply(`🚀 OpenCode 已启动,PID: ${pid}`);
}
} catch (error) {
await ctx.reply(`❌ 启动失败:${(error as Error).message}`);
}
});
// Kill 指令
bot.command('kill', async (ctx) => {
const chatId = ctx.chat.id;
if (!isAuthorized(chatId)) {
await ctx.reply('❌ 未授权访问');
return;
}
const success = await processManager.stop(chatId);
if (success) {
await ctx.reply('🛑 进程已终止');
} else {
await ctx.reply('❌ 没有运行中的进程');
}
});
// 启动机器人
bot.start();
console.log('Telegram Bot 已启动');部署指南
使用 PM2 守护进程
bash
# 安装 PM2
bun install -g pm2
# 启动机器人
pm2 start src/index.ts --name telegram-opencode-bot
# 设置开机自启
pm2 startup
pm2 save使用 Systemd(Linux)
创建 /etc/systemd/system/telegram-opencode-bot.service:
ini
[Unit]
Description=Telegram OpenCode Bot
After=network.target
[Service]
Type=simple
User=your_username
WorkingDirectory=/path/to/telegram-opencode-bot
Environment=PATH=/usr/bin:/home/your_username/.bun/bin
Environment=TELEGRAM_BOT_TOKEN=your_token
Environment=ALLOWED_USER_IDS=123456789
ExecStart=/home/your_username/.bun/bin/bun run src/index.ts
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target启动服务:
bash
sudo systemctl enable telegram-opencode-bot
sudo systemctl start telegram-opencode-bot
sudo systemctl status telegram-opencode-bot测试
本地测试
bash
# 启动机器人
bun run src/index.ts
# 在 Telegram 中发送测试消息
/opencode write a hello world function in python测试清单
- [ ] 机器人能够启动并响应指令
- [ ] OpenCode 进程正常启动并执行任务
- [ ] 输出能够实时发送到 Telegram
- [ ]
/kill指令能够终止进程 - [ ] 权限控制正常工作
- [ ] 进程崩溃后能够清理状态
- [ ] 长时间运行无内存泄漏
故障排查
常见问题
| 问题 | 可能原因 | 解决方案 |
|---|---|---|
| 机器人无响应 | Token 错误 | 检查环境变量中的 TELEGRAM_BOT_TOKEN |
| OpenCode 无法启动 | PATH 配置问题 | 设置 OPENCODE_PATH 环境变量 |
| 输出未显示 | 流监听失败 | 检查 OpenCode 的输出是否发送到 stdout |
| 进程无法终止 | PID 错误 | 检查进程存储文件是否正确 |
| 消息发送失败 | API 限流 | 增加节流时间 |
调试模式
设置环境变量启用详细日志:
bash
DEBUG=1 bun run src/index.ts参考资料
- grammY Documentation - Telegram Bot 库文档
- Bun Documentation - Bun 运行时文档
- OpenCode GitHub Repository - OpenCode 开源代码仓库
- Telegram Bot API Documentation - Telegram API 文档