Logo
热心市民王先生

技术架构

技术研究 人工智能 Telegram

投资日报自动化系统采用分层架构设计,从数据获取、数据处理、分析生成到消息推送,各层职责清晰,便于维护和扩展。

核心架构设计

系统总体架构

投资日报自动化系统采用分层架构设计,从数据获取、数据处理、分析生成到消息推送,各层职责清晰,便于维护和扩展。

graph TB
    subgraph "数据层"
        A1[PostgreSQL 数据库]
        A2[Redis 缓存]
        A3[文件存储]
    end

    subgraph "服务层"
        B1[数据抓取模块]
        B2[技术指标计算模块]
        B3[大模型集成模块]
        B4[多渠道推送模块]
        B5[用户订阅管理模块]
        B6[定时任务调度器]
    end

    subgraph "接口层"
        C1[CLI 主程序]
        C2[配置管理器]
        C3[日志系统]
    end

    subgraph "外部服务"
        D1[股票数据源 API]
        D2[大模型 API]
        D3[钉钉/飞书/Telegram API]
        D4[邮件服务]
    end

    C1 --> B1
    C1 --> B2
    C1 --> B3
    C1 --> B4
    C1 --> B5
    C1 --> B6

    B1 --> A2
    B1 --> A1
    B1 --> D1

    B2 --> A2
    B2 --> A1

    B3 --> A3
    B3 --> D2

    B4 --> D3
    B4 --> D4

    B5 --> A1

    B6 --> B1
    B6 --> B3
    B6 --> B4

数据流转过程

系统的核心业务流程是从数据抓取开始,经过技术指标计算、大模型分析,最终生成日报并推送给用户。

sequenceDiagram
    participant U as 用户
    participant C as CLI 主程序
    participant S as 调度器
    participant DS as 数据抓取
    participant TC as 技术指标
    participant AI as 大模型
    participant P as 推送模块
    participant N as 通知渠道

    U->>C: 启动服务 / 手动触发
    C->>S: 启动定时任务

    Note over S,N: 每日收盘后执行

    S->>DS: 触发数据抓取
    DS->>DS: 从多数据源抓取行情
    DS->>DS: 缓存到 Redis
    DS-->>S: 返回抓取结果

    S->>TC: 触发指标计算
    TC->>TC: 计算 MA、MACD、RSI 等
    TC->>TC: 生成图表数据
    TC-->>S: 返回指标数据

    S->>AI: 触发大模型分析
    AI->>AI: 组装 Prompt
    AI->>AI: 调用 GLM-4.7 API
    alt 主模型失败
        AI->>AI: 切换到 Kimi 2.5
    end
    AI->>AI: 解析输出
    AI-->>S: 返回日报内容

    S->>P: 触发消息推送
    P->>P: 格式化消息(Markdown/HTML)
    P->>N: 推送到各渠道
    loop 多渠道推送
        P->>DingTalk: 推送
        P->>Feishu: 推送
        P->>Telegram: 推送
        P->>Email: 推送
    end
    P-->>S: 返回推送结果

    S-->>C: 任务完成
    C-->>U: 返回状态

模块设计详解

数据抓取模块

模块职责

数据抓取模块负责从多个权威数据源获取股票市场的实时行情和历史数据,是整个系统的基础数据来源。

核心组件

1. 数据源抽象层(DataSource Interface) 定义统一的数据源接口,支持灵活接入新的数据源。接口包含以下方法:

class DataSource(ABC):
    @abstractmethod
    def fetch_realtime_price(self, symbol: str) -> dict:
        """获取实时价格数据"""
        pass

    @abstractmethod
    def fetch_historical_data(self, symbol: str, start_date: str, end_date: str) -> pd.DataFrame:
        """获取历史行情数据"""
        pass

    @abstractmethod
    def get_stock_list(self, market: str) -> list:
        """获取股票列表"""
        pass

2. 数据源适配器(Adapters) 为每个数据源实现适配器,处理不同数据源的 API 调用和格式转换:

  • 东方财富适配器(EastMoneyAdapter):A股市场首选,数据权威且免费
  • 雅虎财经适配器(YahooFinanceAdapter):美股和港股数据源
  • Alpha Vantage 适配器(AlphaVantageAdapter):美股备用数据源,提供更多技术指标

3. 缓存管理器(Cache Manager) 使用 Redis 实现多级缓存策略:

  • 一级缓存(内存缓存):存储最近访问的数据,TTL 为 1 分钟
  • 二级缓存(Redis 缓存):存储历史数据,TTL 根据数据类型设定
    • 实时行情:1 分钟
    • 技术指标:15 分钟
    • 历史行情:1 天

4. 限流控制器(Rate Limiter) 为防止触发数据源 API 的限流策略,实现以下机制:

  • 请求队列管理
  • 指数退避重试策略
  • 多账号轮换(如适用)
  • 请求频率监控和告警

为什么这样设计

数据抓取模块采用抽象层+适配器的模式,有以下优势:

  1. 可扩展性:新增数据源只需实现接口,无需修改核心逻辑
  2. 高可用性:多个数据源互为备份,任一数据源故障不影响系统运行
  3. 性能优化:缓存机制大幅减少 API 调用次数,降低成本
  4. 合规性:限流控制确保遵守数据源的使用条款

技术指标计算模块

模块职责

技术指标计算模块负责对抓取的行情数据进行分析,计算各类技术指标,为后续的 AI 分析提供量化依据。

核心组件

1. 指标计算引擎(Indicator Engine) 使用 TA-Lib 库实现主流技术指标的计算:

class IndicatorEngine:
    def calculate_ma(self, data: pd.DataFrame, periods: list) -> dict:
        """计算移动平均线"""
        result = {}
        for period in periods:
            result[f'MA{period}'] = ta.SMA(data['close'], timeperiod=period)
        return result

    def calculate_macd(self, data: pd.DataFrame) -> dict:
        """计算 MACD 指标"""
        macd, signal, hist = ta.MACD(data['close'])
        return {
            'DIF': macd,
            'DEA': signal,
            'MACD': hist
        }

    def calculate_rsi(self, data: pd.DataFrame, period: int = 14) -> pd.Series:
        """计算 RSI 指标"""
        return ta.RSI(data['close'], timeperiod=period)

2. 图表生成器(Chart Generator) 生成技术指标的图表数据,支持多种图表类型:

  • K 线图(Candlestick)
  • 折线图(Line Chart)
  • 面积图(Area Chart)
  • 组合图(支持多指标叠加)

3. 指标信号解读器(Signal Interpreter) 根据技术指标的数值和趋势,生成交易信号:

class SignalInterpreter:
    def interpret_macd(self, macd_data: dict) -> str:
        """解读 MACD 信号"""
        dif = macd_data['DIF'][-1]
        dea = macd_data['DEA'][-1]
        hist = macd_data['MACD'][-1]

        if dif > dea and hist > 0:
            return "金叉,买入信号"
        elif dif < dea and hist < 0:
            return "死叉,卖出信号"
        else:
            return "震荡市,观望"

为什么这样设计

技术指标计算模块设计考虑了以下因素:

  1. 准确性:使用成熟的 TA-Lib 库,确保计算结果的专业性
  2. 性能:向量化计算,大幅提升计算速度
  3. 可扩展性:指标引擎支持自定义指标,便于添加新的技术分析方法
  4. 可视化:图表生成器为报告提供丰富的视觉元素

大模型集成模块

模块职责

大模型集成模块负责将市场数据、技术指标转化为结构化 Prompt,调用大模型 API 生成投资分析报告。

核心组件

1. 模型客户端抽象层(Model Client Interface) 定义统一的模型调用接口,支持多个大模型提供商:

class ModelClient(ABC):
    @abstractmethod
    def generate_report(self, prompt: str) -> str:
        """生成投资报告"""
        pass

    @abstractmethod
    def get_usage_stats(self) -> dict:
        """获取使用统计"""
        pass

2. 模型适配器(Model Adapters)

  • GLM-4.7 适配器(GLM4Client):主模型,使用智谱 AI 的 API
  • Kimi 2.5 适配器(KimiClient):备用模型,使用月之暗面的 API

3. Prompt 模板引擎(Prompt Template Engine) 使用 Jinja2 模板引擎,生成结构化 Prompt:

# daily_report_prompt.j2
你是一位专业的投资分析师,请基于以下数据生成投资日报。

## 市场概况
{{ market_overview }}

## 技术指标分析
{% for indicator in technical_indicators %}
- {{ indicator.name }}: {{ indicator.value }}(解读:{{ indicator.interpretation }})
{% endfor %}

## 要求
1. 分析至少 3 个行业的投资机会
2. 每个行业至少推荐 3 只股票
3. 提供明确的买入/卖出/持有建议
4. 给出详细的投资理由
5. 添加风险提示和免责声明

请生成 Markdown 格式的报告。

4. 输出解析器(Output Parser) 解析大模型的输出,提取结构化信息:

class OutputParser:
    def parse_report(self, raw_output: str) -> dict:
        """解析大模型输出"""
        lines = raw_output.split('\n')
        sections = {
            'market_overview': [],
            'sector_analysis': [],
            'stock_recommendations': [],
            'risk_warnings': []
        }

        current_section = None
        for line in lines:
            if '市场概况' in line:
                current_section = 'market_overview'
            elif '行业分析' in line:
                current_section = 'sector_analysis'
            elif '股票推荐' in line:
                current_section = 'stock_recommendations'
            elif '风险提示' in line:
                current_section = 'risk_warnings'
            elif current_section:
                sections[current_section].append(line)

        return sections

5. 配额管理器(Quota Manager) 监控和管理大模型 API 的调用配额:

  • 跟踪每日 token 使用量
  • 预算控制,超预算时降级服务
  • 成本优化,使用更便宜的模型处理简单任务

为什么这样设计

大模型集成模块的设计考虑了以下关键因素:

  1. 模型多样性:支持多个模型提供商,避免单点依赖
  2. 成本可控:配额管理防止意外超支
  3. 输出结构化:输出解析器确保报告格式统一
  4. Prompt 优化:模板引擎提高 Prompt 质量和一致性

多渠道推送模块

模块职责

多渠道推送模块负责将生成的投资日报推送到用户订阅的各个渠道,确保信息及时送达。

核心组件

1. 推送接口抽象层(Notifier Interface) 定义统一的推送接口,支持多种消息渠道:

class Notifier(ABC):
    @abstractmethod
    def send_message(self, message: str, recipient: str) -> bool:
        """发送消息"""
        pass

    @abstractmethod
    def format_message(self, content: dict) -> str:
        """格式化消息内容"""
        pass

2. 渠道适配器(Channel Adapters)

  • 钉钉适配器(DingTalkNotifier)

    • 使用钉钉机器人 Webhook
    • 支持 Markdown 格式
    • 消息长度限制:2048 字节
  • 飞书适配器(FeishuNotifier)

    • 使用飞书机器人 API
    • 支持富文本和交互式消息
    • 消息长度限制:更灵活
  • Telegram 适配器(TelegramNotifier)

    • 使用 Telegram Bot API
    • 支持 MarkdownV2 格式
    • 消息长度限制:4096 字符
  • 邮件适配器(EmailNotifier)

    • 使用 SMTP 或第三方邮件服务(如 SendGrid)
    • 支持 HTML 格式
    • 支持附件(图表)

3. 消息模板系统(Message Template System) 根据不同渠道的格式要求,生成适配的消息内容:

class MessageTemplate:
    def format_for_chat(self, content: dict) -> str:
        """格式化为聊天应用消息(Markdown)"""
        message = f"# 📊 {content['date']} 投资日报\n\n"
        message += f"## 🌏 市场概况\n{content['market_overview']}\n\n"

        for sector in content['sectors']:
            message += f"## 🏢 {sector['name']}\n"
            message += f"{sector['analysis']}\n\n"

            for stock in sector['stocks']:
                message += f"### 📈 {stock['symbol']}\n"
                message += f"- 价格:{stock['price']}\n"
                message += f"- 建议:{stock['recommendation']}\n"
                message += f"- 理由:{stock['reason']}\n\n"

        message += f"## ⚠️ 风险提示\n{content['risk_warnings']}\n\n"
        message += "---\n*本报告由 AI 生成,仅供参考,不构成投资建议*"

        return message

    def format_for_email(self, content: dict) -> str:
        """格式化为邮件消息(HTML)"""
        # HTML 模板,包含响应式样式
        pass

4. 推送管理器(Push Manager) 管理多渠道推送的协调工作:

class PushManager:
    def __init__(self, notifiers: list):
        self.notifiers = notifiers

    def push_report(self, report: dict, user: dict) -> dict:
        """推送日报到用户订阅的所有渠道"""
        results = {}

        for channel in user['channels']:
            notifier = self.get_notifier(channel['type'])
            if notifier:
                try:
                    message = notifier.format_message(report)
                    success = notifier.send_message(message, channel['recipient'])
                    results[channel['type']] = {
                        'success': success,
                        'timestamp': datetime.now()
                    }
                except Exception as e:
                    results[channel['type']] = {
                        'success': False,
                        'error': str(e),
                        'timestamp': datetime.now()
                    }

        return results

为什么这样设计

多渠道推送模块的设计考虑了以下因素:

  1. 统一接口:抽象层简化了新渠道的接入
  2. 格式适配:模板系统确保消息在各渠道显示效果良好
  3. 高可靠性:支持重试和失败告警
  4. 可扩展性:轻松添加新的推送渠道

用户订阅管理模块

模块职责

用户订阅管理模块负责管理用户的订阅偏好、关注行业、关注股票等信息,实现个性化推送。

核心组件

1. 用户数据模型(User Data Model) 使用 SQLAlchemy ORM 定义用户数据结构:

class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    name = Column(String(100))
    email = Column(String(255))
    created_at = Column(DateTime, default=datetime.now)

    subscriptions = relationship('Subscription', back_populates='user')

class Subscription(Base):
    __tablename__ = 'subscriptions'

    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, ForeignKey('users.id'))
    sector_id = Column(Integer, ForeignKey('sectors.id'))
    created_at = Column(DateTime, default=datetime.now)

    user = relationship('User', back_populates='subscriptions')
    sector = relationship('Sector')

class Watchlist(Base):
    __tablename__ = 'watchlist'

    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, ForeignKey('users.id'))
    stock_symbol = Column(String(20))
    stock_name = Column(String(100))
    market = Column(String(10))  # A-share, HK, US
    created_at = Column(DateTime, default=datetime.now)

2. 订阅管理 CLI(Subscription CLI) 提供命令行界面管理订阅:

# 添加行业订阅
investment-daily-report subscribe --sector=科技

# 添加股票到关注列表
investment-daily-report watchlist --add=AAPL

# 查看当前订阅
investment-daily-report list-subscriptions

# 删除订阅
investment-daily-report unsubscribe --sector=金融

3. 个性化内容生成器(Personalization Engine) 根据用户的订阅偏好,生成个性化的日报内容:

class PersonalizationEngine:
    def generate_personalized_report(self, user: User, base_report: dict) -> dict:
        """生成个性化日报"""
        personalized_report = {
            'user_name': user.name,
            'subscribed_sectors': [],
            'watchlist_stocks': []
        }

        # 筛选用户订阅的行业
        user_sectors = [sub.sector.name for sub in user.subscriptions]
        for sector in base_report['sectors']:
            if sector['name'] in user_sectors:
                personalized_report['subscribed_sectors'].append(sector)

        # 筛选关注列表中的股票
        watchlist_symbols = [stock.stock_symbol for stock in user.watchlist]
        for sector in base_report['sectors']:
            for stock in sector['stocks']:
                if stock['symbol'] in watchlist_symbols:
                    personalized_report['watchlist_stocks'].append(stock)

        return personalized_report

为什么这样设计

用户订阅管理模块的设计考虑了以下因素:

  1. 数据持久化:使用关系型数据库确保数据安全
  2. 易用性:CLI 命令简化了用户操作
  3. 个性化:根据订阅偏好生成定制化内容
  4. 可扩展性:支持添加新的订阅类型(如主题、策略等)

定时任务调度器

模块职责

定时任务调度器负责按照配置的时间表,自动触发数据抓取、日报生成和消息推送任务。

核心组件

1. 调度器配置(Scheduler Configuration) 使用 YAML 配置文件定义调度规则:

scheduler:
  jobs:
    - name: a_share_market_report
      trigger:
        type: cron
        cron_expression: "0 15 * * 1-5"  # 周一到周五 15:00
        timezone: "Asia/Shanghai"
      task: generate_report
      params:
        market: A-share

    - name: us_market_report
      trigger:
        type: cron
        cron_expression: "0 16 * * 1-5"  # 周一到周五 16:00
        timezone: "America/New_York"
      task: generate_report
      params:
        market: US

    - name: data_cleanup
      trigger:
        type: interval
        interval_hours: 24
      task: cleanup_cache

2. 任务执行器(Task Executor) 执行具体的业务逻辑:

class TaskExecutor:
    def generate_report(self, market: str):
        """生成投资日报"""
        # 1. 抓取数据
        data = self.data_scraper.scrape(market)

        # 2. 计算指标
        indicators = self.indicator_engine.calculate(data)

        # 3. 生成报告
        report = self.ai_model.generate(data, indicators)

        # 4. 推送给用户
        self.push_manager.push_report(report)

    def cleanup_cache(self):
        """清理缓存"""
        self.cache_manager.cleanup()

3. 任务监控器(Task Monitor) 监控任务执行状态:

  • 记录任务开始和结束时间
  • 记录任务执行结果(成功/失败)
  • 记录错误日志
  • 发送任务失败告警

为什么这样设计

定时任务调度器的设计考虑了以下因素:

  1. 灵活性:支持 cron 表达式和间隔调度
  2. 可监控:任务监控器确保问题可追溯
  3. 高可用:任务失败时有重试和告警机制
  4. 多时区支持:适应不同市场的交易时间

设计哲学

为什么选择这种架构

投资日报自动化系统的架构设计遵循以下设计原则:

1. 分层解耦 将系统分为数据层、服务层和接口层,每层职责单一,降低耦合度。这使得:

  • 某一层的技术选型不影响其他层
  • 便于团队协作,不同团队可以并行开发
  • 便于测试,可以针对每层单独进行单元测试

2. 依赖倒置 高层模块不依赖低层模块,而是都依赖于抽象接口。这使得:

  • 可以灵活替换具体实现(如更换数据源或模型)
  • 符合开闭原则,对扩展开放,对修改关闭
  • 提高代码的可测试性,便于 mock

3. 配置驱动 使用配置文件管理业务逻辑,避免硬编码。这使得:

  • 无需修改代码即可调整系统行为
  • 支持多环境配置(开发、测试、生产)
  • 便于运维人员管理系统

4. 容错设计 系统各环节都有容错机制,确保部分故障不影响整体服务。这使得:

  • 数据源故障时可切换到备用数据源
  • 大模型故障时可切换到备用模型
  • 推送失败时有重试和告警机制

为什么这个架构适合此场景

投资日报自动化系统有以下特点,使得这种架构尤其适合:

1. 数据源多样且不稳定 金融市场数据来自多个源头,API 稳定性不一。抽象层+适配器的模式使得:

  • 可以灵活接入新数据源
  • 数据源故障时快速切换
  • 统一数据格式,便于后续处理

2. 大模型成本高昂 大模型 API 调用成本较高,需要精细化管理。配额管理和缓存机制使得:

  • 避免重复调用,降低成本
  • 监控使用量,防止超支
  • 支持降级策略,降低成本

3. 用户需求多样化 不同用户关注不同的行业和股票。订阅管理模块使得:

  • 实现个性化推送
  • 支持用户自主管理订阅
  • 降低信息过载

4. 推送渠道多样化 用户习惯使用不同的通讯工具。多渠道推送模块使得:

  • 覆盖更广的用户群体
  • 适应不同渠道的格式要求
  • 确保消息及时送达

核心参考资料