Logo
热心市民王先生

核心实现验证

硅基写手 代码验证

Star Office UI 关键代码验证:状态管理API、多Agent注册机制、Phaser动画系统实现分析

状态管理核心实现

Flask 后端架构

项目采用单文件 Flask 应用设计,所有核心逻辑集中在 backend/app.py

# 状态存储核心逻辑
import json
from flask import Flask, jsonify, request
from datetime import datetime

app = Flask(__name__)

STATE_FILE = 'state.json'
JOIN_KEYS_FILE = 'join-keys.json'

def load_state():
    """从 JSON 文件加载状态"""
    try:
        with open(STATE_FILE, 'r') as f:
            return json.load(f)
    except FileNotFoundError:
        return {'status': 'idle', 'message': '', 'last_update': None}

def save_state(state):
    """保存状态到 JSON 文件"""
    state['last_update'] = datetime.utcnow().isoformat() + 'Z'
    with open(STATE_FILE, 'w') as f:
        json.dump(state, f, indent=2)

@app.route('/status', methods=['GET'])
def get_status():
    """获取主 Agent 状态"""
    state = load_state()
    return jsonify(state)

@app.route('/set_state', methods=['POST'])
def set_state():
    """设置主 Agent 状态"""
    data = request.json
    valid_statuses = ['idle', 'writing', 'researching', 'executing', 'syncing', 'error']
    
    if data.get('status') not in valid_statuses:
        return jsonify({'error': 'Invalid status'}), 400
    
    state = {
        'status': data['status'],
        'message': data.get('message', ''),
        'agent_id': 'main'
    }
    save_state(state)
    return jsonify({'success': True, 'state': state})

状态更新脚本

项目提供了便捷的状态更新脚本 set_state.py

#!/usr/bin/env python3
"""状态更新命令行工具"""
import sys
import requests

def main():
    if len(sys.argv) < 2:
        print("Usage: python set_state.py <status> [message]")
        print("Statuses: idle, writing, researching, executing, syncing, error")
        sys.exit(1)
    
    status = sys.argv[1]
    message = sys.argv[2] if len(sys.argv) > 2 else ""
    
    response = requests.post(
        'http://127.0.0.1:18791/set_state',
        json={'status': status, 'message': message}
    )
    
    if response.status_code == 200:
        print(f"✅ State updated: {status} - {message}")
    else:
        print(f"❌ Error: {response.text}")

if __name__ == '__main__':
    main()

使用示例

# 设置为工作状态
python3 set_state.py writing "正在整理文档"

# 设置为同步状态
python3 set_state.py syncing "同步进度中"

# 设置为错误状态
python3 set_state.py error "发现问题,排查中"

多 Agent 注册机制

Agent 注册 API

# 多 Agent 管理核心逻辑

# 存储注册的 Agent 列表
registered_agents = {}

def load_join_keys():
    """加载有效的 Join Keys"""
    try:
        with open(JOIN_KEYS_FILE, 'r') as f:
            return json.load(f).get('keys', [])
    except FileNotFoundError:
        return []

@app.route('/join-agent', methods=['POST'])
def join_agent():
    """访客 Agent 加入办公室"""
    data = request.json
    
    # 验证 Join Key
    valid_keys = load_join_keys()
    key_match = next(
        (k for k in valid_keys if k['key'] == data.get('join_key')),
        None
    )
    
    if not key_match:
        return jsonify({'error': 'Invalid join key'}), 403
    
    # 注册 Agent
    agent_id = f"guest_{len(registered_agents) + 1}"
    registered_agents[agent_id] = {
        'name': data.get('name', key_match.get('name', 'Guest')),
        'status': 'idle',
        'message': '',
        'joined_at': datetime.utcnow().isoformat()
    }
    
    return jsonify({
        'success': True,
        'agent_id': agent_id,
        'name': registered_agents[agent_id]['name']
    })

@app.route('/agent-push', methods=['POST'])
def agent_push():
    """访客 Agent 推送状态"""
    data = request.json
    agent_id = data.get('agent_id')
    
    if agent_id not in registered_agents:
        return jsonify({'error': 'Agent not registered'}), 404
    
    registered_agents[agent_id].update({
        'status': data.get('status', 'idle'),
        'message': data.get('message', ''),
        'last_update': datetime.utcnow().isoformat()
    })
    
    return jsonify({'success': True})

@app.route('/agents', methods=['GET'])
def get_agents():
    """获取所有 Agent 列表"""
    # 合并主 Agent 和访客 Agent
    all_agents = [
        {'agent_id': 'main', **load_state()}
    ]
    all_agents.extend([
        {'agent_id': aid, **info}
        for aid, info in registered_agents.items()
    ])
    return jsonify(all_agents)

Agent 访问流程

sequenceDiagram
    participant Agent as 访客 Agent
    participant API as Flask API
    participant Storage as JSON Storage
    
    Note over Agent: 1. 获取 Join Key
    Agent->>API: POST /join-agent (join_key, name)
    API->>Storage: 验证 join-keys.json
    Storage-->>API: Key 有效
    API-->>Agent: agent_id, success
    
    Note over Agent: 2. 推送状态更新
    loop 定期更新
        Agent->>API: POST /agent-push (agent_id, status, message)
        API->>Storage: 更新 registered_agents
        API-->>Agent: success
    end
    
    Note over Agent: 3. 离开办公室
    Agent->>API: POST /leave-agent (agent_id)
    API->>Storage: 移除注册信息
    API-->>Agent: success

Phaser.js 前端实现

游戏场景初始化

// frontend/layout.js 核心结构

const config = {
    type: Phaser.AUTO,
    width: 800,
    height: 600,
    parent: 'game-container',
    pixelArt: true,  // 保持像素风格清晰
    scene: {
        preload: preload,
        create: create,
        update: update
    }
};

const game = new Phaser.Game(config);

function preload() {
    // 加载办公室地图
    this.load.image('office-map', 'assets/office-background.webp');
    
    // 加载 Agent 精灵表
    this.load.spritesheet('main-agent', 'assets/agent-spritesheet.webp', {
        frameWidth: 32,
        frameHeight: 48
    });
    
    // 加载访客角色(备选)
    this.load.spritesheet('guest-agent', 'assets/guest-spritesheet.webp', {
        frameWidth: 32,
        frameHeight: 48
    });
}

function create() {
    // 创建办公室背景
    const map = this.add.image(400, 300, 'office-map');
    
    // 创建主 Agent 精灵
    this.mainAgent = this.add.sprite(200, 300, 'main-agent');
    this.mainAgent.setScale(2);
    
    // 定义状态区域位置
    this.areas = {
        'idle': { x: 150, y: 400 },      // 休息区
        'writing': { x: 400, y: 250 },   // 工作区
        'researching': { x: 600, y: 200 }, // 书架区
        'executing': { x: 350, y: 350 }, // 操作台
        'syncing': { x: 500, y: 400 },   // 同步区
        'error': { x: 700, y: 350 }      // Bug 区
    };
    
    // 创建动画
    createAnimations.call(this);
    
    // 创建状态气泡
    this.statusBubble = this.add.container(0, 0);
    const bubble = this.add.graphics();
    // ... 气泡绘制逻辑
}

function createAnimations() {
    // 定义各状态的动画帧
    const animations = [
        { key: 'idle', frames: [0, 1, 2, 3], frameRate: 4 },
        { key: 'writing', frames: [4, 5, 6, 7], frameRate: 8 },
        { key: 'researching', frames: [8, 9, 10, 11], frameRate: 6 },
        { key: 'executing', frames: [12, 13, 14, 15], frameRate: 10 },
        { key: 'syncing', frames: [16, 17, 18, 19], frameRate: 8 },
        { key: 'error', frames: [20, 21, 22, 23], frameRate: 6 }
    ];
    
    animations.forEach(anim => {
        this.anims.create({
            key: anim.key,
            frames: this.anims.generateFrameNumbers('main-agent', {
                frames: anim.frames
            }),
            frameRate: anim.frameRate,
            repeat: -1
        });
    });
}

状态轮询与动画切换

function update() {
    // 每 1 秒轮询一次状态
    if (!this.lastPoll || Date.now() - this.lastPoll > 1000) {
        this.lastPoll = Date.now();
        fetchStatus.call(this);
    }
}

async function fetchStatus() {
    try {
        const response = await fetch('/status');
        const state = await response.json();
        
        // 更新动画
        this.mainAgent.play(state.status);
        
        // 移动到对应区域
        const targetPos = this.areas[state.status];
        this.tweens.add({
            targets: this.mainAgent,
            x: targetPos.x,
            y: targetPos.y,
            duration: 500,
            ease: 'Power2'
        });
        
        // 更新状态气泡
        updateStatusBubble.call(this, state.message);
        
    } catch (error) {
        console.error('Status fetch failed:', error);
    }
}

function updateStatusBubble(message) {
    // 更新气泡文本
    if (this.bubbleText) {
        this.bubbleText.setText(message);
    }
    
    // 控制气泡显示/隐藏
    this.statusBubble.setVisible(!!message);
}

配置与部署

配置文件结构

// state.sample.json - 状态配置模板
{
  "status": "idle",
  "message": "",
  "last_update": null,
  "agent_id": "main"
}

// join-keys.json - 访客密钥配置
{
  "keys": [
    {
      "key": "guest-example-key-2026",
      "name": "Assistant Bot",
      "expires": "2026-12-31",
      "created_by": "admin"
    }
  ]
}

启动脚本

#!/bin/bash
# backend/run.sh

cd "$(dirname "$0")"

# 检查依赖
if ! command -v python3 &> /dev/null; then
    echo "Python3 is required"
    exit 1
fi

# 安装依赖
pip3 install -r requirements.txt

# 启动服务
echo "Starting Star Office UI backend..."
python3 app.py

依赖清单

# backend/requirements.txt
flask>=2.3.0
flask-cors>=4.0.0
requests>=2.31.0

集成示例

与 LangChain Agent 集成

# 示例:LangChain Agent 状态推送
from langchain.agents import AgentExecutor
import requests

class StateReportingAgent:
    """具备状态上报能力的 Agent 包装器"""
    
    def __init__(self, agent_executor, office_url, agent_id):
        self.agent = agent_executor
        self.office_url = office_url
        self.agent_id = agent_id
    
    def update_status(self, status, message=""):
        """更新办公室状态"""
        requests.post(
            f"{self.office_url}/agent-push",
            json={
                "agent_id": self.agent_id,
                "status": status,
                "message": message
            }
        )
    
    def run(self, input_text):
        """执行任务并上报状态"""
        self.update_status("researching", "分析任务需求...")
        
        try:
            self.update_status("executing", "执行任务中...")
            result = self.agent.run(input_text)
            
            self.update_status("syncing", "同步结果...")
            return result
            
        except Exception as e:
            self.update_status("error", f"执行失败: {str(e)}")
            raise
        
        finally:
            self.update_status("idle", "待命中")

与 Discord Bot 集成

# 示例:Discord Bot 状态同步
import discord
from discord.ext import commands

class OfficeSyncBot(commands.Bot):
    def __init__(self, office_url):
        super().__init__(command_prefix='!')
        self.office_url = office_url
    
    async def on_ready(self):
        """Bot 启动时更新办公室状态"""
        requests.post(
            f"{self.office_url}/set_state",
            json={"status": "idle", "message": "Discord Bot 在线"}
        )
    
    @commands.command()
    async def working(self, ctx, *, task):
        """报告正在工作"""
        requests.post(
            f"{self.office_url}/set_state",
            json={"status": "writing", "message": task}
        )
        await ctx.send(f"📝 已更新状态:{task}")

验证结论

技术可行性评估

评估项结论说明
状态管理✅ 可行JSON 文件存储适合轻量级场景
多 Agent 支持✅ 可行API 设计合理,扩展性良好
前端动画✅ 可行Phaser.js 成熟稳定
移动端适配⚠️ 部分可行需要优化性能和触控交互
大规模扩展❌ 需改造JSON 存储是瓶颈,需引入数据库

关键代码质量

  1. 简洁性:代码结构清晰,无冗余复杂度
  2. 可读性:函数命名规范,注释适当
  3. 可扩展性:API 设计遵循 RESTful 规范,易于集成
  4. 安全性:Join Key 机制提供基础访问控制

改进建议

  1. 错误处理:增加更完善的异常捕获和日志记录
  2. 状态持久化:考虑引入 SQLite 提升可靠性
  3. WebSocket 支持:从轮询升级为推送,提升实时性
  4. 测试覆盖:添加单元测试和集成测试

小结

Star Office UI 的核心实现展现了”最小可行”的设计哲学:用最少的代码实现核心功能,同时保持良好的扩展性。

Flask + Phaser.js 的组合经过验证,足以支撑当前的功能需求。JSON 文件存储虽然简单,但换来了极致的部署便捷性。多 Agent 注册机制设计合理,为未来扩展奠定了基础。

下一章节将分析项目的风险和发展前景,给出最终评估结论。


参考资料