Logo
热心市民王先生

Harness模式开发最佳实践 - 关键代码验证

AI代理 代码示例 Node.js BFF

提供Node.js BFF实现示例、沙箱自动化脚本和状态持久化方案的实际代码

Node.js BFF实现示例

场景:用户认证BFF服务

以下是一个完整的BFF服务实现,展示Harness Agent如何将下游用户服务封装为前端友好的API。

项目结构:

user-bff/
├── src/
│   ├── controllers/
│   │   └── authController.ts
│   ├── services/
│   │   └── userService.ts
│   ├── middleware/
│   │   ├── auth.ts
│   │   └── errorHandler.ts
│   ├── types/
│   │   └── index.ts
│   └── app.ts
├── tests/
│   ├── unit/
│   └── integration/
├── Dockerfile
└── package.json

核心实现代码:

// src/types/index.ts
export interface User {
  id: string;
  email: string;
  name: string;
  role: 'user' | 'admin';
  createdAt: Date;
}

export interface AuthResponse {
  user: User;
  accessToken: string;
  refreshToken: string;
  expiresIn: number;
}

export interface ApiError {
  code: string;
  message: string;
  details?: Record<string, unknown>;
}
// src/services/userService.ts
import axios, { AxiosInstance } from 'axios';
import { User, AuthResponse } from '../types';

export class UserService {
  private client: AxiosInstance;
  
  constructor() {
    this.client = axios.create({
      baseURL: process.env.USER_SERVICE_URL,
      timeout: 5000,
      headers: {
        'Content-Type': 'application/json',
        'X-API-Key': process.env.USER_SERVICE_API_KEY
      }
    });
    
    // 响应拦截器:统一错误处理
    this.client.interceptors.response.use(
      (response) => response,
      (error) => {
        if (error.response) {
          throw new ServiceError(
            error.response.data?.message || 'Service error',
            error.response.status,
            error.response.data?.code
          );
        }
        throw new ServiceError('Network error', 503);
      }
    );
  }
  
  async authenticate(email: string, password: string): Promise<AuthResponse> {
    const response = await this.client.post('/api/v1/auth/login', {
      email,
      password
    });
    
    return this.transformAuthResponse(response.data);
  }
  
  async refreshToken(refreshToken: string): Promise<AuthResponse> {
    const response = await this.client.post('/api/v1/auth/refresh', {
      refreshToken
    });
    
    return this.transformAuthResponse(response.data);
  }
  
  async getUserById(userId: string): Promise<User> {
    const response = await this.client.get(`/api/v1/users/${userId}`);
    return this.transformUser(response.data);
  }
  
  private transformAuthResponse(data: any): AuthResponse {
    return {
      user: this.transformUser(data.user),
      accessToken: data.access_token,
      refreshToken: data.refresh_token,
      expiresIn: data.expires_in
    };
  }
  
  private transformUser(data: any): User {
    return {
      id: data.id,
      email: data.email,
      name: data.name,
      role: data.role,
      createdAt: new Date(data.created_at)
    };
  }
}

class ServiceError extends Error {
  constructor(
    message: string,
    public statusCode: number,
    public code?: string
  ) {
    super(message);
    this.name = 'ServiceError';
  }
}
// src/controllers/authController.ts
import { Request, Response, NextFunction } from 'express';
import { UserService } from '../services/userService';
import { body, validationResult } from 'express-validator';

export class AuthController {
  private userService: UserService;
  
  constructor() {
    this.userService = new UserService();
  }
  
  // 输入验证规则
  loginValidation = [
    body('email').isEmail().normalizeEmail(),
    body('password').isLength({ min: 6 })
  ];
  
  login = async (
    req: Request,
    res: Response,
    next: NextFunction
  ): Promise<void> => {
    try {
      // 验证输入
      const errors = validationResult(req);
      if (!errors.isEmpty()) {
        res.status(400).json({
          code: 'VALIDATION_ERROR',
          message: 'Invalid input',
          details: errors.array()
        });
        return;
      }
      
      const { email, password } = req.body;
      
      // 调用下游服务
      const authResponse = await this.userService.authenticate(
        email,
        password
      );
      
      // 设置HTTP-only Cookie(安全存储refreshToken)
      res.cookie('refreshToken', authResponse.refreshToken, {
        httpOnly: true,
        secure: process.env.NODE_ENV === 'production',
        sameSite: 'strict',
        maxAge: 7 * 24 * 60 * 60 * 1000 // 7天
      });
      
      // 只返回accessToken和用户信息给前端
      res.json({
        success: true,
        data: {
          user: authResponse.user,
          accessToken: authResponse.accessToken,
          expiresIn: authResponse.expiresIn
        }
      });
    } catch (error) {
      next(error);
    }
  };
  
  refresh = async (
    req: Request,
    res: Response,
    next: NextFunction
  ): Promise<void> => {
    try {
      const refreshToken = req.cookies.refreshToken;
      
      if (!refreshToken) {
        res.status(401).json({
          code: 'UNAUTHORIZED',
          message: 'No refresh token provided'
        });
        return;
      }
      
      const authResponse = await this.userService.refreshToken(refreshToken);
      
      // 更新Cookie
      res.cookie('refreshToken', authResponse.refreshToken, {
        httpOnly: true,
        secure: process.env.NODE_ENV === 'production',
        sameSite: 'strict',
        maxAge: 7 * 24 * 60 * 60 * 1000
      });
      
      res.json({
        success: true,
        data: {
          user: authResponse.user,
          accessToken: authResponse.accessToken,
          expiresIn: authResponse.expiresIn
        }
      });
    } catch (error) {
      // Token无效或过期,清除Cookie
      res.clearCookie('refreshToken');
      res.status(401).json({
        code: 'UNAUTHORIZED',
        message: 'Invalid or expired refresh token'
      });
    }
  };
}
// src/middleware/errorHandler.ts
import { Request, Response, NextFunction } from 'express';

export const errorHandler = (
  error: Error,
  req: Request,
  res: Response,
  next: NextFunction
): void => {
  console.error('Error:', error);
  
  // 区分错误类型
  if (error.name === 'ServiceError') {
    const serviceError = error as any;
    res.status(serviceError.statusCode || 500).json({
      code: serviceError.code || 'SERVICE_ERROR',
      message: serviceError.message
    });
    return;
  }
  
  if (error.name === 'ValidationError') {
    res.status(400).json({
      code: 'VALIDATION_ERROR',
      message: error.message
    });
    return;
  }
  
  // 默认错误
  res.status(500).json({
    code: 'INTERNAL_ERROR',
    message: process.env.NODE_ENV === 'production' 
      ? 'Internal server error' 
      : error.message
  });
};
// src/app.ts
import express from 'express';
import cookieParser from 'cookie-parser';
import helmet from 'helmet';
import cors from 'cors';
import { AuthController } from './controllers/authController';
import { errorHandler } from './middleware/errorHandler';

const app = express();
const authController = new AuthController();

// 安全中间件
app.use(helmet());
app.use(cors({
  origin: process.env.ALLOWED_ORIGINS?.split(',') || ['http://localhost:3000'],
  credentials: true
}));

app.use(express.json());
app.use(cookieParser());

// 健康检查
app.get('/health', (req, res) => {
  res.json({ status: 'ok', timestamp: new Date().toISOString() });
});

// 认证路由
app.post('/api/v1/auth/login', 
  authController.loginValidation,
  authController.login
);
app.post('/api/v1/auth/refresh', authController.refresh);

// 错误处理
app.use(errorHandler);

const PORT = process.env.PORT || 3001;
app.listen(PORT, () => {
  console.log(`User BFF service running on port ${PORT}`);
});

export default app;

测试示例:

// tests/integration/auth.test.ts
import request from 'supertest';
import app from '../../src/app';
import { UserService } from '../../src/services/userService';

// Mock下游服务
jest.mock('../../src/services/userService');

describe('Auth API Integration Tests', () => {
  const mockUserService = UserService as jest.MockedClass<typeof UserService>;
  
  beforeEach(() => {
    jest.clearAllMocks();
  });
  
  describe('POST /api/v1/auth/login', () => {
    it('should authenticate user successfully', async () => {
      const mockResponse = {
        user: {
          id: 'user-123',
          email: 'test@example.com',
          name: 'Test User',
          role: 'user',
          createdAt: new Date()
        },
        accessToken: 'mock-access-token',
        refreshToken: 'mock-refresh-token',
        expiresIn: 3600
      };
      
      mockUserService.prototype.authenticate.mockResolvedValue(mockResponse);
      
      const response = await request(app)
        .post('/api/v1/auth/login')
        .send({
          email: 'test@example.com',
          password: 'password123'
        });
      
      expect(response.status).toBe(200);
      expect(response.body.success).toBe(true);
      expect(response.body.data.user.email).toBe('test@example.com');
      expect(response.body.data.accessToken).toBe('mock-access-token');
      
      // 验证Cookie设置
      expect(response.headers['set-cookie']).toBeDefined();
      expect(response.headers['set-cookie'][0]).toContain('HttpOnly');
    });
    
    it('should return 400 for invalid email', async () => {
      const response = await request(app)
        .post('/api/v1/auth/login')
        .send({
          email: 'invalid-email',
          password: 'password123'
        });
      
      expect(response.status).toBe(400);
      expect(response.body.code).toBe('VALIDATION_ERROR');
    });
    
    it('should handle service errors', async () => {
      mockUserService.prototype.authenticate.mockRejectedValue(
        new Error('Invalid credentials')
      );
      
      const response = await request(app)
        .post('/api/v1/auth/login')
        .send({
          email: 'test@example.com',
          password: 'wrong-password'
        });
      
      expect(response.status).toBe(401);
    });
  });
  
  describe('POST /api/v1/auth/refresh', () => {
    it('should refresh token successfully', async () => {
      const mockResponse = {
        user: { id: 'user-123', email: 'test@example.com', name: 'Test', role: 'user', createdAt: new Date() },
        accessToken: 'new-access-token',
        refreshToken: 'new-refresh-token',
        expiresIn: 3600
      };
      
      mockUserService.prototype.refreshToken.mockResolvedValue(mockResponse);
      
      const response = await request(app)
        .post('/api/v1/auth/refresh')
        .set('Cookie', ['refreshToken=valid-refresh-token']);
      
      expect(response.status).toBe(200);
      expect(response.body.data.accessToken).toBe('new-access-token');
    });
    
    it('should return 401 without refresh token', async () => {
      const response = await request(app)
        .post('/api/v1/auth/refresh');
      
      expect(response.status).toBe(401);
      expect(response.body.code).toBe('UNAUTHORIZED');
    });
  });
});

沙箱自动化脚本

Firecracker自动化管理

以下脚本实现了Firecracker MicroVM的自动拉起、监控和销毁。

#!/usr/bin/env python3
"""
Firecracker MicroVM 自动化管理脚本
支持:创建、启动、监控、销毁、检查点
"""

import json
import socket
import subprocess
import time
import os
import signal
from pathlib import Path
from typing import Optional, Dict, Any
import requests

class FirecrackerManager:
    def __init__(self, config_dir: str = "/etc/firecracker"):
        self.config_dir = Path(config_dir)
        self.instances: Dict[str, dict] = {}
        
    def create_vm(
        self,
        vm_id: str,
        vcpu_count: int = 2,
        mem_size_mib: int = 512,
        rootfs_path: str = None,
        kernel_path: str = None,
        network_config: dict = None
    ) -> dict:
        """创建新的MicroVM实例"""
        
        # 分配API socket
        api_socket = f"/tmp/firecracker-{vm_id}.sock"
        
        # 确保不重复使用
        if os.path.exists(api_socket):
            os.remove(api_socket)
        
        # 准备Firecracker配置
        vm_config = {
            "boot-source": {
                "kernel_image_path": kernel_path or "/opt/firecracker/vmlinux-5.10",
                "boot_args": "console=ttyS0 reboot=k panic=1 pci=off"
            },
            "drives": [
                {
                    "drive_id": "rootfs",
                    "path_on_host": rootfs_path or "/opt/firecracker/node18-rootfs.ext4",
                    "is_root_device": True,
                    "is_read_only": False
                }
            ],
            "machine-config": {
                "vcpu_count": vcpu_count,
                "mem_size_mib": mem_size_mib,
                "smt": False
            },
            "network-interfaces": []
        }
        
        # 配置网络(如果提供)
        if network_config:
            vm_config["network-interfaces"].append({
                "iface_id": "eth0",
                "guest_mac": network_config.get("guest_mac", f"AA:FC:00:00:{vm_id[-4:-2]}:{vm_id[-2:]}"),
                "host_dev_name": network_config.get("tap_device", f"tap{vm_id[-3:]}")
            })
        
        # 保存实例信息
        self.instances[vm_id] = {
            "api_socket": api_socket,
            "config": vm_config,
            "process": None,
            "status": "created",
            "created_at": time.time()
        }
        
        return self.instances[vm_id]
    
    def start_vm(self, vm_id: str) -> bool:
        """启动MicroVM实例"""
        if vm_id not in self.instances:
            raise ValueError(f"VM {vm_id} not found")
        
        instance = self.instances[vm_id]
        api_socket = instance["api_socket"]
        
        # 使用Jailer启动(增强安全性)
        jailer_cmd = [
            "/usr/bin/jailer",
            "--id", vm_id,
            "--uid", "1000",
            "--gid", "1000",
            "--chroot-base-dir", "/srv/jailer",
            "--exec-file", "/usr/bin/firecracker",
            "--", "--api-sock", api_socket
        ]
        
        # 启动进程
        process = subprocess.Popen(
            jailer_cmd,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE
        )
        
        instance["process"] = process
        
        # 等待API socket就绪(最多10秒)
        for _ in range(100):
            if os.path.exists(api_socket):
                break
            time.sleep(0.1)
        else:
            self.destroy_vm(vm_id)
            raise RuntimeError(f"VM {vm_id} failed to start: API socket not created")
        
        # 配置VM
        if not self._configure_vm(vm_id):
            self.destroy_vm(vm_id)
            raise RuntimeError(f"VM {vm_id} configuration failed")
        
        instance["status"] = "running"
        instance["started_at"] = time.time()
        
        print(f"VM {vm_id} started successfully (PID: {process.pid})")
        return True
    
    def _configure_vm(self, vm_id: str) -> bool:
        """通过API配置VM"""
        instance = self.instances[vm_id]
        api_socket = instance["api_socket"]
        config = instance["config"]
        
        # 创建Unix socket session
        session = requests.Session()
        session.mount("http://", UnixSocketAdapter(api_socket))
        
        try:
            # 配置boot source
            response = session.put(
                "http://localhost/boot-source",
                json=config["boot-source"]
            )
            response.raise_for_status()
            
            # 配置drives
            for drive in config["drives"]:
                response = session.put(
                    f"http://localhost/drives/{drive['drive_id']}",
                    json=drive
                )
                response.raise_for_status()
            
            # 配置machine
            response = session.put(
                "http://localhost/machine-config",
                json=config["machine-config"]
            )
            response.raise_for_status()
            
            # 配置network
            for iface in config["network-interfaces"]:
                response = session.put(
                    f"http://localhost/network-interfaces/{iface['iface_id']}",
                    json=iface
                )
                response.raise_for_status()
            
            # 启动实例
            response = session.put(
                "http://localhost/actions",
                json={"action_type": "InstanceStart"}
            )
            response.raise_for_status()
            
            return True
            
        except Exception as e:
            print(f"Failed to configure VM {vm_id}: {e}")
            return False
    
    def create_checkpoint(self, vm_id: str, checkpoint_id: str) -> bool:
        """创建VM检查点"""
        if vm_id not in self.instances:
            return False
        
        instance = self.instances[vm_id]
        api_socket = instance["api_socket"]
        
        session = requests.Session()
        session.mount("http://", UnixSocketAdapter(api_socket))
        
        try:
            response = session.put(
                "http://localhost/snapshot/create",
                json={
                    "snapshot_type": "Full",
                    "snapshot_path": f"/var/checkpoints/{vm_id}-{checkpoint_id}.snap",
                    "mem_file_path": f"/var/checkpoints/{vm_id}-{checkpoint_id}.mem"
                }
            )
            response.raise_for_status()
            
            print(f"Checkpoint {checkpoint_id} created for VM {vm_id}")
            return True
            
        except Exception as e:
            print(f"Failed to create checkpoint: {e}")
            return False
    
    def restore_from_checkpoint(self, vm_id: str, checkpoint_id: str) -> bool:
        """从检查点恢复VM"""
        # 先销毁现有实例
        if vm_id in self.instances and self.instances[vm_id]["status"] == "running":
            self.destroy_vm(vm_id)
        
        # 重新创建并恢复
        # ... 实现细节省略
        pass
    
    def get_vm_status(self, vm_id: str) -> dict:
        """获取VM状态"""
        if vm_id not in self.instances:
            return {"status": "not_found"}
        
        instance = self.instances[vm_id]
        status = {
            "vm_id": vm_id,
            "status": instance["status"],
            "created_at": instance.get("created_at"),
            "started_at": instance.get("started_at"),
            "uptime": None
        }
        
        if instance.get("started_at"):
            status["uptime"] = time.time() - instance["started_at"]
        
        # 检查进程状态
        if instance.get("process"):
            process = instance["process"]
            if process.poll() is not None:
                status["status"] = "exited"
                status["exit_code"] = process.returncode
        
        return status
    
    def destroy_vm(self, vm_id: str) -> bool:
        """销毁VM实例"""
        if vm_id not in self.instances:
            return False
        
        instance = self.instances[vm_id]
        
        # 发送停止信号
        if instance.get("process"):
            try:
                instance["process"].terminate()
                instance["process"].wait(timeout=5)
            except subprocess.TimeoutExpired:
                instance["process"].kill()
                instance["process"].wait()
        
        # 清理socket文件
        api_socket = instance["api_socket"]
        if os.path.exists(api_socket):
            os.remove(api_socket)
        
        # 清理Jailer目录
        jailer_dir = f"/srv/jailer/firecracker/{vm_id}"
        if os.path.exists(jailer_dir):
            subprocess.run(["rm", "-rf", jailer_dir], check=False)
        
        instance["status"] = "destroyed"
        print(f"VM {vm_id} destroyed")
        
        return True
    
    def list_vms(self) -> list:
        """列出所有VM"""
        return [
            {
                "vm_id": vm_id,
                **self.get_vm_status(vm_id)
            }
            for vm_id in self.instances.keys()
        ]


class UnixSocketAdapter(requests.adapters.HTTPAdapter):
    """支持Unix Socket的HTTP适配器"""
    
    def __init__(self, socket_path: str):
        self.socket_path = socket_path
        super().__init__()
    
    def init_poolmanager(self, *args, **kwargs):
        kwargs["socket_options"] = [(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)]
        return super().init_poolmanager(*args, **kwargs)
    
    def send(self, request, **kwargs):
        # 替换连接为Unix Socket
        import urllib3
        
        poolmanager = urllib3.PoolManager(
            num_pools=1,
            socket_options=[(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)]
        )
        
        # 使用UnixSocketConnectionPool
        conn = poolmanager.connection_from_host(
            "localhost", 
            port=80, 
            scheme="http"
        )
        
        # 实际上应该使用UnixSocket,这里简化处理
        # 真实实现需要使用urllib3的UnixSocketConnectionPool
        return super().send(request, **kwargs)


# CLI接口
if __name__ == "__main__":
    import argparse
    
    parser = argparse.ArgumentParser(description="Firecracker VM Manager")
    parser.add_argument("action", choices=["create", "start", "stop", "status", "list", "checkpoint"])
    parser.add_argument("--vm-id", help="VM identifier")
    parser.add_argument("--vcpu", type=int, default=2)
    parser.add_argument("--memory", type=int, default=512)
    parser.add_argument("--rootfs", help="Path to rootfs image")
    parser.add_argument("--checkpoint-id", help="Checkpoint identifier")
    
    args = parser.parse_args()
    
    manager = FirecrackerManager()
    
    if args.action == "create":
        vm = manager.create_vm(
            vm_id=args.vm_id,
            vcpu_count=args.vcpu,
            mem_size_mib=args.memory,
            rootfs_path=args.rootfs
        )
        print(f"VM {args.vm_id} created")
        
    elif args.action == "start":
        manager.start_vm(args.vm_id)
        
    elif args.action == "stop":
        manager.destroy_vm(args.vm_id)
        
    elif args.action == "status":
        status = manager.get_vm_status(args.vm_id)
        print(json.dumps(status, indent=2))
        
    elif args.action == "list":
        vms = manager.list_vms()
        print(json.dumps(vms, indent=2))
        
    elif args.action == "checkpoint":
        manager.create_checkpoint(args.vm_id, args.checkpoint_id)

状态持久化方案

Git-based状态管理

以下实现展示了如何使用Git作为状态持久化后端,支持断点续传。

// lib/checkpoint/gitCheckpoint.ts
import { execSync } from 'child_process';
import { existsSync, readFileSync, writeFileSync } from 'fs';
import { join } from 'path';

interface CheckpointMetadata {
  checkpointId: string;
  taskId: string;
  step: number;
  timestamp: string;
  description: string;
  externalState: Record<string, any>;
}

export class GitCheckpointManager {
  private workspacePath: string;
  private metadataPath: string;
  
  constructor(workspacePath: string) {
    this.workspacePath = workspacePath;
    this.metadataPath = join(workspacePath, '.harness', 'checkpoints.json');
    this.ensureGitRepo();
  }
  
  private ensureGitRepo() {
    const gitDir = join(this.workspacePath, '.git');
    if (!existsSync(gitDir)) {
      this.execGit('init');
      this.execGit('config user.email "harness@agent.ai"');
      this.execGit('config user.name "Harness Agent"');
    }
  }
  
  /**
   * 创建检查点
   */
  async createCheckpoint(
    taskId: string,
    step: number,
    description: string,
    externalState: Record<string, any> = {}
  ): Promise<string> {
    const checkpointId = `cp-${Date.now()}-${step}`;
    
    // 保存元数据
    const metadata: CheckpointMetadata = {
      checkpointId,
      taskId,
      step,
      timestamp: new Date().toISOString(),
      description,
      externalState
    };
    
    this.saveMetadata(metadata);
    
    // Git commit
    this.execGit('add -A');
    this.execGit(`commit -m "[checkpoint] ${checkpointId}: ${description}" --allow-empty`);
    
    // 推送到远程(如果配置了)
    try {
      this.execGit('push origin checkpoint-branch');
    } catch {
      // 远程可能未配置,忽略错误
    }
    
    console.log(`Checkpoint created: ${checkpointId}`);
    return checkpointId;
  }
  
  /**
   * 恢复到指定检查点
   */
  async restoreCheckpoint(checkpointId: string): Promise<CheckpointMetadata | null> {
    try {
      // 找到对应的commit
      const log = this.execGit(`log --all --oneline --grep="${checkpointId}"`);
      if (!log) {
        console.error(`Checkpoint ${checkpointId} not found`);
        return null;
      }
      
      const commitHash = log.split(' ')[0];
      
      // 重置到该commit
      this.execGit(`reset --hard ${commitHash}`);
      
      // 清理未跟踪的文件
      this.execGit('clean -fd');
      
      // 读取元数据
      const metadata = this.loadMetadata();
      if (metadata?.checkpointId === checkpointId) {
        console.log(`Restored to checkpoint: ${checkpointId} (step ${metadata.step})`);
        return metadata;
      }
      
      return null;
    } catch (error) {
      console.error(`Failed to restore checkpoint: ${error}`);
      return null;
    }
  }
  
  /**
   * 获取最新的检查点
   */
  async getLatestCheckpoint(): Promise<CheckpointMetadata | null> {
    try {
      const log = this.execGit('log --all --oneline --grep="\\[checkpoint\\]" -1');
      if (!log) return null;
      
      const metadata = this.loadMetadata();
      return metadata;
    } catch {
      return null;
    }
  }
  
  /**
   * 列出所有检查点
   */
  async listCheckpoints(): Promise<CheckpointMetadata[]> {
    try {
      const logs = this.execGit('log --all --oneline --grep="\\[checkpoint\\]"');
      if (!logs) return [];
      
      // 这里简化处理,实际应该解析每个commit的元数据
      const metadata = this.loadAllMetadata();
      return metadata;
    } catch {
      return [];
    }
  }
  
  /**
   * 验证状态一致性
   */
  async verifyConsistency(): Promise<boolean> {
    try {
      // 检查Git状态
      const status = this.execGit('status --porcelain');
      if (status) {
        console.warn('Uncommitted changes detected');
        return false;
      }
      
      // 验证文件哈希(可选)
      const manifestPath = join(this.workspacePath, '.harness', 'manifest.sha256');
      if (existsSync(manifestPath)) {
        this.execGit('ls-files -s | sha256sum > current.manifest');
        const diff = this.execGit('diff --no-index .harness/manifest.sha256 current.manifest || true');
        if (diff) {
          console.warn('File manifest mismatch');
          return false;
        }
      }
      
      return true;
    } catch (error) {
      console.error(`Consistency check failed: ${error}`);
      return false;
    }
  }
  
  private saveMetadata(metadata: CheckpointMetadata) {
    const checkpointDir = join(this.workspacePath, '.harness');
    if (!existsSync(checkpointDir)) {
      execSync(`mkdir -p ${checkpointDir}`);
    }
    
    writeFileSync(
      this.metadataPath,
      JSON.stringify(metadata, null, 2)
    );
  }
  
  private loadMetadata(): CheckpointMetadata | null {
    try {
      if (!existsSync(this.metadataPath)) return null;
      const data = readFileSync(this.metadataPath, 'utf-8');
      return JSON.parse(data);
    } catch {
      return null;
    }
  }
  
  private loadAllMetadata(): CheckpointMetadata[] {
    // 简化实现,实际应该遍历所有commit
    const metadata = this.loadMetadata();
    return metadata ? [metadata] : [];
  }
  
  private execGit(command: string): string {
    try {
      return execSync(`git ${command}`, {
        cwd: this.workspacePath,
        encoding: 'utf-8',
        stdio: ['pipe', 'pipe', 'pipe']
      }).trim();
    } catch (error: any) {
      if (error.stderr) {
        throw new Error(`Git command failed: ${error.stderr}`);
      }
      throw error;
    }
  }
}

心跳与健康检查服务

// lib/health/heartbeat.ts
import EventEmitter from 'events';

interface HealthStatus {
  taskId: string;
  timestamp: number;
  currentStep: number;
  totalSteps: number;
  cpuUsage: number;
  memoryUsage: number;
  diskUsage: number;
  lastActivity: number;
}

export class HeartbeatService extends EventEmitter {
  private taskId: string;
  private interval: number;
  private timeout: number;
  private timer: NodeJS.Timer | null = null;
  private lastHeartbeat: number = 0;
  private currentStep: number = 0;
  private totalSteps: number = 0;
  
  constructor(
    taskId: string,
    options: {
      interval?: number;  // 心跳间隔(毫秒),默认30000
      timeout?: number;   // 超时时间(毫秒),默认90000
    } = {}
  ) {
    super();
    this.taskId = taskId;
    this.interval = options.interval || 30000;
    this.timeout = options.timeout || 90000;
  }
  
  start(totalSteps: number) {
    this.totalSteps = totalSteps;
    this.lastHeartbeat = Date.now();
    
    this.timer = setInterval(() => {
      this.sendHeartbeat();
    }, this.interval);
    
    // 启动超时检测
    this.startTimeoutCheck();
    
    console.log(`Heartbeat service started for task ${this.taskId}`);
  }
  
  updateStep(step: number) {
    this.currentStep = step;
    this.lastHeartbeat = Date.now();
  }
  
  private async sendHeartbeat() {
    const status: HealthStatus = {
      taskId: this.taskId,
      timestamp: Date.now(),
      currentStep: this.currentStep,
      totalSteps: this.totalSteps,
      cpuUsage: await this.getCPUUsage(),
      memoryUsage: await this.getMemoryUsage(),
      diskUsage: await this.getDiskUsage(),
      lastActivity: this.lastHeartbeat
    };
    
    this.emit('heartbeat', status);
    
    // 发送到监控服务
    try {
      await fetch('http://monitoring.internal/heartbeat', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify(status)
      });
    } catch (error) {
      console.error('Failed to send heartbeat:', error);
    }
  }
  
  private startTimeoutCheck() {
    setInterval(() => {
      const timeSinceLastActivity = Date.now() - this.lastHeartbeat;
      
      if (timeSinceLastActivity > this.timeout) {
        this.emit('timeout', {
          taskId: this.taskId,
          timeSinceLastActivity,
          currentStep: this.currentStep
        });
      }
    }, this.timeout);
  }
  
  private async getCPUUsage(): Promise<number> {
    // 使用Node.js内置API获取CPU使用率
    const startUsage = process.cpuUsage();
    await new Promise(resolve => setTimeout(resolve, 100));
    const endUsage = process.cpuUsage(startUsage);
    return (endUsage.user + endUsage.system) / 1000000; // 转换为秒
  }
  
  private async getMemoryUsage(): Promise<number> {
    const usage = process.memoryUsage();
    return usage.heapUsed / 1024 / 1024; // 转换为MB
  }
  
  private async getDiskUsage(): Promise<number> {
    // 简化实现,实际应该使用系统调用
    return 0;
  }
  
  stop() {
    if (this.timer) {
      clearInterval(this.timer);
      this.timer = null;
    }
    console.log(`Heartbeat service stopped for task ${this.taskId}`);
  }
}

故障恢复协调器

// lib/recovery/recoveryCoordinator.ts
import { GitCheckpointManager } from '../checkpoint/gitCheckpoint';
import { HeartbeatService } from '../health/heartbeat';

interface RecoveryContext {
  taskId: string;
  workspacePath: string;
  onRecover: (step: number) => Promise<void>;
  onFailure: (error: Error) => Promise<void>;
}

export class RecoveryCoordinator {
  private context: RecoveryContext;
  private checkpointManager: GitCheckpointManager;
  private heartbeatService: HeartbeatService;
  private isRecovering: boolean = false;
  
  constructor(context: RecoveryContext) {
    this.context = context;
    this.checkpointManager = new GitCheckpointManager(context.workspacePath);
    this.heartbeatService = new HeartbeatService(context.taskId);
  }
  
  async startMonitoring(totalSteps: number) {
    this.heartbeatService.start(totalSteps);
    
    // 监听超时事件
    this.heartbeatService.on('timeout', async (data) => {
      console.warn(`Task ${data.taskId} timed out at step ${data.currentStep}`);
      await this.attemptRecovery();
    });
  }
  
  async attemptRecovery(): Promise<boolean> {
    if (this.isRecovering) {
      console.log('Recovery already in progress');
      return false;
    }
    
    this.isRecovering = true;
    console.log('Starting recovery process...');
    
    try {
      // 1. 获取最新检查点
      const checkpoint = await this.checkpointManager.getLatestCheckpoint();
      
      if (!checkpoint) {
        console.error('No checkpoint found, cannot recover');
        await this.context.onFailure(new Error('No checkpoint available'));
        return false;
      }
      
      // 2. 恢复到检查点
      const restored = await this.checkpointManager.restoreCheckpoint(
        checkpoint.checkpointId
      );
      
      if (!restored) {
        console.error('Failed to restore checkpoint');
        await this.context.onFailure(new Error('Checkpoint restore failed'));
        return false;
      }
      
      // 3. 验证状态一致性
      const consistent = await this.checkpointManager.verifyConsistency();
      if (!consistent) {
        console.warn('State consistency check failed, proceeding with caution');
      }
      
      // 4. 恢复外部状态
      await this.restoreExternalState(restored.externalState);
      
      // 5. 通知业务层从指定步骤继续
      console.log(`Recovery successful, resuming from step ${restored.step}`);
      await this.context.onRecover(restored.step);
      
      return true;
      
    } catch (error) {
      console.error('Recovery failed:', error);
      await this.context.onFailure(error as Error);
      return false;
    } finally {
      this.isRecovering = false;
    }
  }
  
  private async restoreExternalState(state: Record<string, any>) {
    // 恢复外部系统状态
    if (state.issueId) {
      console.log(`Restoring context for issue: ${state.issueId}`);
    }
    if (state.apiCalls) {
      console.log(`Noting ${state.apiCalls.length} previous API calls`);
    }
    // 实际实现需要根据具体外部系统调整
  }
  
  async createCheckpoint(
    step: number,
    description: string,
    externalState: Record<string, any> = {}
  ) {
    // 更新心跳
    this.heartbeatService.updateStep(step);
    
    // 创建Git检查点
    await this.checkpointManager.createCheckpoint(
      this.context.taskId,
      step,
      description,
      externalState
    );
  }
  
  stop() {
    this.heartbeatService.stop();
  }
}

参考资料

  1. Firecracker API Documentation. (2024).
  2. Node.js Best Practices. (2024).
  3. TypeScript Deep Dive. (2024).
  4. Express.js Security Best Practices. (2024).
  5. Jest Testing Framework. (2024).