Harness模式开发最佳实践 - 关键代码验证
AI代理 代码示例 Node.js BFF
提供Node.js BFF实现示例、沙箱自动化脚本和状态持久化方案的实际代码
Node.js BFF实现示例
场景:用户认证BFF服务
以下是一个完整的BFF服务实现,展示Harness Agent如何将下游用户服务封装为前端友好的API。
项目结构:
user-bff/
├── src/
│ ├── controllers/
│ │ └── authController.ts
│ ├── services/
│ │ └── userService.ts
│ ├── middleware/
│ │ ├── auth.ts
│ │ └── errorHandler.ts
│ ├── types/
│ │ └── index.ts
│ └── app.ts
├── tests/
│ ├── unit/
│ └── integration/
├── Dockerfile
└── package.json
核心实现代码:
// src/types/index.ts
export interface User {
id: string;
email: string;
name: string;
role: 'user' | 'admin';
createdAt: Date;
}
export interface AuthResponse {
user: User;
accessToken: string;
refreshToken: string;
expiresIn: number;
}
export interface ApiError {
code: string;
message: string;
details?: Record<string, unknown>;
}
// src/services/userService.ts
import axios, { AxiosInstance } from 'axios';
import { User, AuthResponse } from '../types';
export class UserService {
private client: AxiosInstance;
constructor() {
this.client = axios.create({
baseURL: process.env.USER_SERVICE_URL,
timeout: 5000,
headers: {
'Content-Type': 'application/json',
'X-API-Key': process.env.USER_SERVICE_API_KEY
}
});
// 响应拦截器:统一错误处理
this.client.interceptors.response.use(
(response) => response,
(error) => {
if (error.response) {
throw new ServiceError(
error.response.data?.message || 'Service error',
error.response.status,
error.response.data?.code
);
}
throw new ServiceError('Network error', 503);
}
);
}
async authenticate(email: string, password: string): Promise<AuthResponse> {
const response = await this.client.post('/api/v1/auth/login', {
email,
password
});
return this.transformAuthResponse(response.data);
}
async refreshToken(refreshToken: string): Promise<AuthResponse> {
const response = await this.client.post('/api/v1/auth/refresh', {
refreshToken
});
return this.transformAuthResponse(response.data);
}
async getUserById(userId: string): Promise<User> {
const response = await this.client.get(`/api/v1/users/${userId}`);
return this.transformUser(response.data);
}
private transformAuthResponse(data: any): AuthResponse {
return {
user: this.transformUser(data.user),
accessToken: data.access_token,
refreshToken: data.refresh_token,
expiresIn: data.expires_in
};
}
private transformUser(data: any): User {
return {
id: data.id,
email: data.email,
name: data.name,
role: data.role,
createdAt: new Date(data.created_at)
};
}
}
class ServiceError extends Error {
constructor(
message: string,
public statusCode: number,
public code?: string
) {
super(message);
this.name = 'ServiceError';
}
}
// src/controllers/authController.ts
import { Request, Response, NextFunction } from 'express';
import { UserService } from '../services/userService';
import { body, validationResult } from 'express-validator';
export class AuthController {
private userService: UserService;
constructor() {
this.userService = new UserService();
}
// 输入验证规则
loginValidation = [
body('email').isEmail().normalizeEmail(),
body('password').isLength({ min: 6 })
];
login = async (
req: Request,
res: Response,
next: NextFunction
): Promise<void> => {
try {
// 验证输入
const errors = validationResult(req);
if (!errors.isEmpty()) {
res.status(400).json({
code: 'VALIDATION_ERROR',
message: 'Invalid input',
details: errors.array()
});
return;
}
const { email, password } = req.body;
// 调用下游服务
const authResponse = await this.userService.authenticate(
email,
password
);
// 设置HTTP-only Cookie(安全存储refreshToken)
res.cookie('refreshToken', authResponse.refreshToken, {
httpOnly: true,
secure: process.env.NODE_ENV === 'production',
sameSite: 'strict',
maxAge: 7 * 24 * 60 * 60 * 1000 // 7天
});
// 只返回accessToken和用户信息给前端
res.json({
success: true,
data: {
user: authResponse.user,
accessToken: authResponse.accessToken,
expiresIn: authResponse.expiresIn
}
});
} catch (error) {
next(error);
}
};
refresh = async (
req: Request,
res: Response,
next: NextFunction
): Promise<void> => {
try {
const refreshToken = req.cookies.refreshToken;
if (!refreshToken) {
res.status(401).json({
code: 'UNAUTHORIZED',
message: 'No refresh token provided'
});
return;
}
const authResponse = await this.userService.refreshToken(refreshToken);
// 更新Cookie
res.cookie('refreshToken', authResponse.refreshToken, {
httpOnly: true,
secure: process.env.NODE_ENV === 'production',
sameSite: 'strict',
maxAge: 7 * 24 * 60 * 60 * 1000
});
res.json({
success: true,
data: {
user: authResponse.user,
accessToken: authResponse.accessToken,
expiresIn: authResponse.expiresIn
}
});
} catch (error) {
// Token无效或过期,清除Cookie
res.clearCookie('refreshToken');
res.status(401).json({
code: 'UNAUTHORIZED',
message: 'Invalid or expired refresh token'
});
}
};
}
// src/middleware/errorHandler.ts
import { Request, Response, NextFunction } from 'express';
export const errorHandler = (
error: Error,
req: Request,
res: Response,
next: NextFunction
): void => {
console.error('Error:', error);
// 区分错误类型
if (error.name === 'ServiceError') {
const serviceError = error as any;
res.status(serviceError.statusCode || 500).json({
code: serviceError.code || 'SERVICE_ERROR',
message: serviceError.message
});
return;
}
if (error.name === 'ValidationError') {
res.status(400).json({
code: 'VALIDATION_ERROR',
message: error.message
});
return;
}
// 默认错误
res.status(500).json({
code: 'INTERNAL_ERROR',
message: process.env.NODE_ENV === 'production'
? 'Internal server error'
: error.message
});
};
// src/app.ts
import express from 'express';
import cookieParser from 'cookie-parser';
import helmet from 'helmet';
import cors from 'cors';
import { AuthController } from './controllers/authController';
import { errorHandler } from './middleware/errorHandler';
const app = express();
const authController = new AuthController();
// 安全中间件
app.use(helmet());
app.use(cors({
origin: process.env.ALLOWED_ORIGINS?.split(',') || ['http://localhost:3000'],
credentials: true
}));
app.use(express.json());
app.use(cookieParser());
// 健康检查
app.get('/health', (req, res) => {
res.json({ status: 'ok', timestamp: new Date().toISOString() });
});
// 认证路由
app.post('/api/v1/auth/login',
authController.loginValidation,
authController.login
);
app.post('/api/v1/auth/refresh', authController.refresh);
// 错误处理
app.use(errorHandler);
const PORT = process.env.PORT || 3001;
app.listen(PORT, () => {
console.log(`User BFF service running on port ${PORT}`);
});
export default app;
测试示例:
// tests/integration/auth.test.ts
import request from 'supertest';
import app from '../../src/app';
import { UserService } from '../../src/services/userService';
// Mock下游服务
jest.mock('../../src/services/userService');
describe('Auth API Integration Tests', () => {
const mockUserService = UserService as jest.MockedClass<typeof UserService>;
beforeEach(() => {
jest.clearAllMocks();
});
describe('POST /api/v1/auth/login', () => {
it('should authenticate user successfully', async () => {
const mockResponse = {
user: {
id: 'user-123',
email: 'test@example.com',
name: 'Test User',
role: 'user',
createdAt: new Date()
},
accessToken: 'mock-access-token',
refreshToken: 'mock-refresh-token',
expiresIn: 3600
};
mockUserService.prototype.authenticate.mockResolvedValue(mockResponse);
const response = await request(app)
.post('/api/v1/auth/login')
.send({
email: 'test@example.com',
password: 'password123'
});
expect(response.status).toBe(200);
expect(response.body.success).toBe(true);
expect(response.body.data.user.email).toBe('test@example.com');
expect(response.body.data.accessToken).toBe('mock-access-token');
// 验证Cookie设置
expect(response.headers['set-cookie']).toBeDefined();
expect(response.headers['set-cookie'][0]).toContain('HttpOnly');
});
it('should return 400 for invalid email', async () => {
const response = await request(app)
.post('/api/v1/auth/login')
.send({
email: 'invalid-email',
password: 'password123'
});
expect(response.status).toBe(400);
expect(response.body.code).toBe('VALIDATION_ERROR');
});
it('should handle service errors', async () => {
mockUserService.prototype.authenticate.mockRejectedValue(
new Error('Invalid credentials')
);
const response = await request(app)
.post('/api/v1/auth/login')
.send({
email: 'test@example.com',
password: 'wrong-password'
});
expect(response.status).toBe(401);
});
});
describe('POST /api/v1/auth/refresh', () => {
it('should refresh token successfully', async () => {
const mockResponse = {
user: { id: 'user-123', email: 'test@example.com', name: 'Test', role: 'user', createdAt: new Date() },
accessToken: 'new-access-token',
refreshToken: 'new-refresh-token',
expiresIn: 3600
};
mockUserService.prototype.refreshToken.mockResolvedValue(mockResponse);
const response = await request(app)
.post('/api/v1/auth/refresh')
.set('Cookie', ['refreshToken=valid-refresh-token']);
expect(response.status).toBe(200);
expect(response.body.data.accessToken).toBe('new-access-token');
});
it('should return 401 without refresh token', async () => {
const response = await request(app)
.post('/api/v1/auth/refresh');
expect(response.status).toBe(401);
expect(response.body.code).toBe('UNAUTHORIZED');
});
});
});
沙箱自动化脚本
Firecracker自动化管理
以下脚本实现了Firecracker MicroVM的自动拉起、监控和销毁。
#!/usr/bin/env python3
"""
Firecracker MicroVM 自动化管理脚本
支持:创建、启动、监控、销毁、检查点
"""
import json
import socket
import subprocess
import time
import os
import signal
from pathlib import Path
from typing import Optional, Dict, Any
import requests
class FirecrackerManager:
def __init__(self, config_dir: str = "/etc/firecracker"):
self.config_dir = Path(config_dir)
self.instances: Dict[str, dict] = {}
def create_vm(
self,
vm_id: str,
vcpu_count: int = 2,
mem_size_mib: int = 512,
rootfs_path: str = None,
kernel_path: str = None,
network_config: dict = None
) -> dict:
"""创建新的MicroVM实例"""
# 分配API socket
api_socket = f"/tmp/firecracker-{vm_id}.sock"
# 确保不重复使用
if os.path.exists(api_socket):
os.remove(api_socket)
# 准备Firecracker配置
vm_config = {
"boot-source": {
"kernel_image_path": kernel_path or "/opt/firecracker/vmlinux-5.10",
"boot_args": "console=ttyS0 reboot=k panic=1 pci=off"
},
"drives": [
{
"drive_id": "rootfs",
"path_on_host": rootfs_path or "/opt/firecracker/node18-rootfs.ext4",
"is_root_device": True,
"is_read_only": False
}
],
"machine-config": {
"vcpu_count": vcpu_count,
"mem_size_mib": mem_size_mib,
"smt": False
},
"network-interfaces": []
}
# 配置网络(如果提供)
if network_config:
vm_config["network-interfaces"].append({
"iface_id": "eth0",
"guest_mac": network_config.get("guest_mac", f"AA:FC:00:00:{vm_id[-4:-2]}:{vm_id[-2:]}"),
"host_dev_name": network_config.get("tap_device", f"tap{vm_id[-3:]}")
})
# 保存实例信息
self.instances[vm_id] = {
"api_socket": api_socket,
"config": vm_config,
"process": None,
"status": "created",
"created_at": time.time()
}
return self.instances[vm_id]
def start_vm(self, vm_id: str) -> bool:
"""启动MicroVM实例"""
if vm_id not in self.instances:
raise ValueError(f"VM {vm_id} not found")
instance = self.instances[vm_id]
api_socket = instance["api_socket"]
# 使用Jailer启动(增强安全性)
jailer_cmd = [
"/usr/bin/jailer",
"--id", vm_id,
"--uid", "1000",
"--gid", "1000",
"--chroot-base-dir", "/srv/jailer",
"--exec-file", "/usr/bin/firecracker",
"--", "--api-sock", api_socket
]
# 启动进程
process = subprocess.Popen(
jailer_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
instance["process"] = process
# 等待API socket就绪(最多10秒)
for _ in range(100):
if os.path.exists(api_socket):
break
time.sleep(0.1)
else:
self.destroy_vm(vm_id)
raise RuntimeError(f"VM {vm_id} failed to start: API socket not created")
# 配置VM
if not self._configure_vm(vm_id):
self.destroy_vm(vm_id)
raise RuntimeError(f"VM {vm_id} configuration failed")
instance["status"] = "running"
instance["started_at"] = time.time()
print(f"VM {vm_id} started successfully (PID: {process.pid})")
return True
def _configure_vm(self, vm_id: str) -> bool:
"""通过API配置VM"""
instance = self.instances[vm_id]
api_socket = instance["api_socket"]
config = instance["config"]
# 创建Unix socket session
session = requests.Session()
session.mount("http://", UnixSocketAdapter(api_socket))
try:
# 配置boot source
response = session.put(
"http://localhost/boot-source",
json=config["boot-source"]
)
response.raise_for_status()
# 配置drives
for drive in config["drives"]:
response = session.put(
f"http://localhost/drives/{drive['drive_id']}",
json=drive
)
response.raise_for_status()
# 配置machine
response = session.put(
"http://localhost/machine-config",
json=config["machine-config"]
)
response.raise_for_status()
# 配置network
for iface in config["network-interfaces"]:
response = session.put(
f"http://localhost/network-interfaces/{iface['iface_id']}",
json=iface
)
response.raise_for_status()
# 启动实例
response = session.put(
"http://localhost/actions",
json={"action_type": "InstanceStart"}
)
response.raise_for_status()
return True
except Exception as e:
print(f"Failed to configure VM {vm_id}: {e}")
return False
def create_checkpoint(self, vm_id: str, checkpoint_id: str) -> bool:
"""创建VM检查点"""
if vm_id not in self.instances:
return False
instance = self.instances[vm_id]
api_socket = instance["api_socket"]
session = requests.Session()
session.mount("http://", UnixSocketAdapter(api_socket))
try:
response = session.put(
"http://localhost/snapshot/create",
json={
"snapshot_type": "Full",
"snapshot_path": f"/var/checkpoints/{vm_id}-{checkpoint_id}.snap",
"mem_file_path": f"/var/checkpoints/{vm_id}-{checkpoint_id}.mem"
}
)
response.raise_for_status()
print(f"Checkpoint {checkpoint_id} created for VM {vm_id}")
return True
except Exception as e:
print(f"Failed to create checkpoint: {e}")
return False
def restore_from_checkpoint(self, vm_id: str, checkpoint_id: str) -> bool:
"""从检查点恢复VM"""
# 先销毁现有实例
if vm_id in self.instances and self.instances[vm_id]["status"] == "running":
self.destroy_vm(vm_id)
# 重新创建并恢复
# ... 实现细节省略
pass
def get_vm_status(self, vm_id: str) -> dict:
"""获取VM状态"""
if vm_id not in self.instances:
return {"status": "not_found"}
instance = self.instances[vm_id]
status = {
"vm_id": vm_id,
"status": instance["status"],
"created_at": instance.get("created_at"),
"started_at": instance.get("started_at"),
"uptime": None
}
if instance.get("started_at"):
status["uptime"] = time.time() - instance["started_at"]
# 检查进程状态
if instance.get("process"):
process = instance["process"]
if process.poll() is not None:
status["status"] = "exited"
status["exit_code"] = process.returncode
return status
def destroy_vm(self, vm_id: str) -> bool:
"""销毁VM实例"""
if vm_id not in self.instances:
return False
instance = self.instances[vm_id]
# 发送停止信号
if instance.get("process"):
try:
instance["process"].terminate()
instance["process"].wait(timeout=5)
except subprocess.TimeoutExpired:
instance["process"].kill()
instance["process"].wait()
# 清理socket文件
api_socket = instance["api_socket"]
if os.path.exists(api_socket):
os.remove(api_socket)
# 清理Jailer目录
jailer_dir = f"/srv/jailer/firecracker/{vm_id}"
if os.path.exists(jailer_dir):
subprocess.run(["rm", "-rf", jailer_dir], check=False)
instance["status"] = "destroyed"
print(f"VM {vm_id} destroyed")
return True
def list_vms(self) -> list:
"""列出所有VM"""
return [
{
"vm_id": vm_id,
**self.get_vm_status(vm_id)
}
for vm_id in self.instances.keys()
]
class UnixSocketAdapter(requests.adapters.HTTPAdapter):
"""支持Unix Socket的HTTP适配器"""
def __init__(self, socket_path: str):
self.socket_path = socket_path
super().__init__()
def init_poolmanager(self, *args, **kwargs):
kwargs["socket_options"] = [(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)]
return super().init_poolmanager(*args, **kwargs)
def send(self, request, **kwargs):
# 替换连接为Unix Socket
import urllib3
poolmanager = urllib3.PoolManager(
num_pools=1,
socket_options=[(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)]
)
# 使用UnixSocketConnectionPool
conn = poolmanager.connection_from_host(
"localhost",
port=80,
scheme="http"
)
# 实际上应该使用UnixSocket,这里简化处理
# 真实实现需要使用urllib3的UnixSocketConnectionPool
return super().send(request, **kwargs)
# CLI接口
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Firecracker VM Manager")
parser.add_argument("action", choices=["create", "start", "stop", "status", "list", "checkpoint"])
parser.add_argument("--vm-id", help="VM identifier")
parser.add_argument("--vcpu", type=int, default=2)
parser.add_argument("--memory", type=int, default=512)
parser.add_argument("--rootfs", help="Path to rootfs image")
parser.add_argument("--checkpoint-id", help="Checkpoint identifier")
args = parser.parse_args()
manager = FirecrackerManager()
if args.action == "create":
vm = manager.create_vm(
vm_id=args.vm_id,
vcpu_count=args.vcpu,
mem_size_mib=args.memory,
rootfs_path=args.rootfs
)
print(f"VM {args.vm_id} created")
elif args.action == "start":
manager.start_vm(args.vm_id)
elif args.action == "stop":
manager.destroy_vm(args.vm_id)
elif args.action == "status":
status = manager.get_vm_status(args.vm_id)
print(json.dumps(status, indent=2))
elif args.action == "list":
vms = manager.list_vms()
print(json.dumps(vms, indent=2))
elif args.action == "checkpoint":
manager.create_checkpoint(args.vm_id, args.checkpoint_id)
状态持久化方案
Git-based状态管理
以下实现展示了如何使用Git作为状态持久化后端,支持断点续传。
// lib/checkpoint/gitCheckpoint.ts
import { execSync } from 'child_process';
import { existsSync, readFileSync, writeFileSync } from 'fs';
import { join } from 'path';
interface CheckpointMetadata {
checkpointId: string;
taskId: string;
step: number;
timestamp: string;
description: string;
externalState: Record<string, any>;
}
export class GitCheckpointManager {
private workspacePath: string;
private metadataPath: string;
constructor(workspacePath: string) {
this.workspacePath = workspacePath;
this.metadataPath = join(workspacePath, '.harness', 'checkpoints.json');
this.ensureGitRepo();
}
private ensureGitRepo() {
const gitDir = join(this.workspacePath, '.git');
if (!existsSync(gitDir)) {
this.execGit('init');
this.execGit('config user.email "harness@agent.ai"');
this.execGit('config user.name "Harness Agent"');
}
}
/**
* 创建检查点
*/
async createCheckpoint(
taskId: string,
step: number,
description: string,
externalState: Record<string, any> = {}
): Promise<string> {
const checkpointId = `cp-${Date.now()}-${step}`;
// 保存元数据
const metadata: CheckpointMetadata = {
checkpointId,
taskId,
step,
timestamp: new Date().toISOString(),
description,
externalState
};
this.saveMetadata(metadata);
// Git commit
this.execGit('add -A');
this.execGit(`commit -m "[checkpoint] ${checkpointId}: ${description}" --allow-empty`);
// 推送到远程(如果配置了)
try {
this.execGit('push origin checkpoint-branch');
} catch {
// 远程可能未配置,忽略错误
}
console.log(`Checkpoint created: ${checkpointId}`);
return checkpointId;
}
/**
* 恢复到指定检查点
*/
async restoreCheckpoint(checkpointId: string): Promise<CheckpointMetadata | null> {
try {
// 找到对应的commit
const log = this.execGit(`log --all --oneline --grep="${checkpointId}"`);
if (!log) {
console.error(`Checkpoint ${checkpointId} not found`);
return null;
}
const commitHash = log.split(' ')[0];
// 重置到该commit
this.execGit(`reset --hard ${commitHash}`);
// 清理未跟踪的文件
this.execGit('clean -fd');
// 读取元数据
const metadata = this.loadMetadata();
if (metadata?.checkpointId === checkpointId) {
console.log(`Restored to checkpoint: ${checkpointId} (step ${metadata.step})`);
return metadata;
}
return null;
} catch (error) {
console.error(`Failed to restore checkpoint: ${error}`);
return null;
}
}
/**
* 获取最新的检查点
*/
async getLatestCheckpoint(): Promise<CheckpointMetadata | null> {
try {
const log = this.execGit('log --all --oneline --grep="\\[checkpoint\\]" -1');
if (!log) return null;
const metadata = this.loadMetadata();
return metadata;
} catch {
return null;
}
}
/**
* 列出所有检查点
*/
async listCheckpoints(): Promise<CheckpointMetadata[]> {
try {
const logs = this.execGit('log --all --oneline --grep="\\[checkpoint\\]"');
if (!logs) return [];
// 这里简化处理,实际应该解析每个commit的元数据
const metadata = this.loadAllMetadata();
return metadata;
} catch {
return [];
}
}
/**
* 验证状态一致性
*/
async verifyConsistency(): Promise<boolean> {
try {
// 检查Git状态
const status = this.execGit('status --porcelain');
if (status) {
console.warn('Uncommitted changes detected');
return false;
}
// 验证文件哈希(可选)
const manifestPath = join(this.workspacePath, '.harness', 'manifest.sha256');
if (existsSync(manifestPath)) {
this.execGit('ls-files -s | sha256sum > current.manifest');
const diff = this.execGit('diff --no-index .harness/manifest.sha256 current.manifest || true');
if (diff) {
console.warn('File manifest mismatch');
return false;
}
}
return true;
} catch (error) {
console.error(`Consistency check failed: ${error}`);
return false;
}
}
private saveMetadata(metadata: CheckpointMetadata) {
const checkpointDir = join(this.workspacePath, '.harness');
if (!existsSync(checkpointDir)) {
execSync(`mkdir -p ${checkpointDir}`);
}
writeFileSync(
this.metadataPath,
JSON.stringify(metadata, null, 2)
);
}
private loadMetadata(): CheckpointMetadata | null {
try {
if (!existsSync(this.metadataPath)) return null;
const data = readFileSync(this.metadataPath, 'utf-8');
return JSON.parse(data);
} catch {
return null;
}
}
private loadAllMetadata(): CheckpointMetadata[] {
// 简化实现,实际应该遍历所有commit
const metadata = this.loadMetadata();
return metadata ? [metadata] : [];
}
private execGit(command: string): string {
try {
return execSync(`git ${command}`, {
cwd: this.workspacePath,
encoding: 'utf-8',
stdio: ['pipe', 'pipe', 'pipe']
}).trim();
} catch (error: any) {
if (error.stderr) {
throw new Error(`Git command failed: ${error.stderr}`);
}
throw error;
}
}
}
心跳与健康检查服务
// lib/health/heartbeat.ts
import EventEmitter from 'events';
interface HealthStatus {
taskId: string;
timestamp: number;
currentStep: number;
totalSteps: number;
cpuUsage: number;
memoryUsage: number;
diskUsage: number;
lastActivity: number;
}
export class HeartbeatService extends EventEmitter {
private taskId: string;
private interval: number;
private timeout: number;
private timer: NodeJS.Timer | null = null;
private lastHeartbeat: number = 0;
private currentStep: number = 0;
private totalSteps: number = 0;
constructor(
taskId: string,
options: {
interval?: number; // 心跳间隔(毫秒),默认30000
timeout?: number; // 超时时间(毫秒),默认90000
} = {}
) {
super();
this.taskId = taskId;
this.interval = options.interval || 30000;
this.timeout = options.timeout || 90000;
}
start(totalSteps: number) {
this.totalSteps = totalSteps;
this.lastHeartbeat = Date.now();
this.timer = setInterval(() => {
this.sendHeartbeat();
}, this.interval);
// 启动超时检测
this.startTimeoutCheck();
console.log(`Heartbeat service started for task ${this.taskId}`);
}
updateStep(step: number) {
this.currentStep = step;
this.lastHeartbeat = Date.now();
}
private async sendHeartbeat() {
const status: HealthStatus = {
taskId: this.taskId,
timestamp: Date.now(),
currentStep: this.currentStep,
totalSteps: this.totalSteps,
cpuUsage: await this.getCPUUsage(),
memoryUsage: await this.getMemoryUsage(),
diskUsage: await this.getDiskUsage(),
lastActivity: this.lastHeartbeat
};
this.emit('heartbeat', status);
// 发送到监控服务
try {
await fetch('http://monitoring.internal/heartbeat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(status)
});
} catch (error) {
console.error('Failed to send heartbeat:', error);
}
}
private startTimeoutCheck() {
setInterval(() => {
const timeSinceLastActivity = Date.now() - this.lastHeartbeat;
if (timeSinceLastActivity > this.timeout) {
this.emit('timeout', {
taskId: this.taskId,
timeSinceLastActivity,
currentStep: this.currentStep
});
}
}, this.timeout);
}
private async getCPUUsage(): Promise<number> {
// 使用Node.js内置API获取CPU使用率
const startUsage = process.cpuUsage();
await new Promise(resolve => setTimeout(resolve, 100));
const endUsage = process.cpuUsage(startUsage);
return (endUsage.user + endUsage.system) / 1000000; // 转换为秒
}
private async getMemoryUsage(): Promise<number> {
const usage = process.memoryUsage();
return usage.heapUsed / 1024 / 1024; // 转换为MB
}
private async getDiskUsage(): Promise<number> {
// 简化实现,实际应该使用系统调用
return 0;
}
stop() {
if (this.timer) {
clearInterval(this.timer);
this.timer = null;
}
console.log(`Heartbeat service stopped for task ${this.taskId}`);
}
}
故障恢复协调器
// lib/recovery/recoveryCoordinator.ts
import { GitCheckpointManager } from '../checkpoint/gitCheckpoint';
import { HeartbeatService } from '../health/heartbeat';
interface RecoveryContext {
taskId: string;
workspacePath: string;
onRecover: (step: number) => Promise<void>;
onFailure: (error: Error) => Promise<void>;
}
export class RecoveryCoordinator {
private context: RecoveryContext;
private checkpointManager: GitCheckpointManager;
private heartbeatService: HeartbeatService;
private isRecovering: boolean = false;
constructor(context: RecoveryContext) {
this.context = context;
this.checkpointManager = new GitCheckpointManager(context.workspacePath);
this.heartbeatService = new HeartbeatService(context.taskId);
}
async startMonitoring(totalSteps: number) {
this.heartbeatService.start(totalSteps);
// 监听超时事件
this.heartbeatService.on('timeout', async (data) => {
console.warn(`Task ${data.taskId} timed out at step ${data.currentStep}`);
await this.attemptRecovery();
});
}
async attemptRecovery(): Promise<boolean> {
if (this.isRecovering) {
console.log('Recovery already in progress');
return false;
}
this.isRecovering = true;
console.log('Starting recovery process...');
try {
// 1. 获取最新检查点
const checkpoint = await this.checkpointManager.getLatestCheckpoint();
if (!checkpoint) {
console.error('No checkpoint found, cannot recover');
await this.context.onFailure(new Error('No checkpoint available'));
return false;
}
// 2. 恢复到检查点
const restored = await this.checkpointManager.restoreCheckpoint(
checkpoint.checkpointId
);
if (!restored) {
console.error('Failed to restore checkpoint');
await this.context.onFailure(new Error('Checkpoint restore failed'));
return false;
}
// 3. 验证状态一致性
const consistent = await this.checkpointManager.verifyConsistency();
if (!consistent) {
console.warn('State consistency check failed, proceeding with caution');
}
// 4. 恢复外部状态
await this.restoreExternalState(restored.externalState);
// 5. 通知业务层从指定步骤继续
console.log(`Recovery successful, resuming from step ${restored.step}`);
await this.context.onRecover(restored.step);
return true;
} catch (error) {
console.error('Recovery failed:', error);
await this.context.onFailure(error as Error);
return false;
} finally {
this.isRecovering = false;
}
}
private async restoreExternalState(state: Record<string, any>) {
// 恢复外部系统状态
if (state.issueId) {
console.log(`Restoring context for issue: ${state.issueId}`);
}
if (state.apiCalls) {
console.log(`Noting ${state.apiCalls.length} previous API calls`);
}
// 实际实现需要根据具体外部系统调整
}
async createCheckpoint(
step: number,
description: string,
externalState: Record<string, any> = {}
) {
// 更新心跳
this.heartbeatService.updateStep(step);
// 创建Git检查点
await this.checkpointManager.createCheckpoint(
this.context.taskId,
step,
description,
externalState
);
}
stop() {
this.heartbeatService.stop();
}
}