Skip to main content
这是内部开发笔记,不对外公开。

架构概览

Version: 2.0 (December 2024) Signal Scout 后端采用 ARQ (Async Redis Queue) 实现异步任务处理,将持续学习和数据分析解耦为独立的 Worker Jobs。
┌─────────────────────────────────────────────────────────────────┐
│                    Signal Scout Backend                          │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│   FastAPI ──→ Orchestrator ──→ Modal continuous_learn()         │
│                                      │                           │
│                    directions[] ─────┘                           │
│                         │                                        │
│                         ▼                                        │
│   ┌─────────────────────────────────────────────────────────┐   │
│   │                 Redis (ARQ Queue)                        │   │
│   │   start_analysis_job → investigate_direction_job (×N)   │   │
│   │                              → finalize_analysis_job     │   │
│   └─────────────────────────────────────────────────────────┘   │
│                         │                                        │
│                         ▼                                        │
│   ┌──────────────┐  ┌──────────────┐                            │
│   │  PostgreSQL  │  │    Modal     │                            │
│   │  (State)     │  │ (Execution)  │                            │
│   └──────────────┘  └──────────────┘                            │
└─────────────────────────────────────────────────────────────────┘

核心改进

之前 vs 之后

指标之前 (Layered)之后 (ARQ)
Modal 调用数~58~5
任务队列无 (asyncio only)Redis + ARQ
容错性进程崩=丢失任务持久化,可重试
可扩展单进程多 Worker
复杂度layered_orchestrator.py (600+ 行)4 个简单 job 函数

删除的组件

  • layered_orchestrator.py - 完全删除
  • theme_discovery() Modal 方法 - 不再需要
  • ❌ 8 agents 并发主题发现 - 由 Learner 直接生成 directions

Job Flow

                    ┌─────────────────────────┐
                    │   ANALYSIS_REQUEST      │
                    │   from Continuous Learn │
                    │                         │
                    │   question: "Why..."    │
                    │   directions: [         │
                    │     {id, title, ...},   │
                    │     {id, title, ...},   │
                    │   ]                     │
                    └───────────┬─────────────┘


┌───────────────────────────────────────────────────────────────────┐
│                        start_analysis_job                          │
│                                                                    │
│   1. Create analysis record (status: investigating)               │
│   2. Set Redis counter: directions_remaining = N                  │
│   3. Enqueue N investigate_direction_jobs                         │
└───────────────────────────────┬───────────────────────────────────┘

                ┌───────────────┼───────────────┐
                │               │               │
                ▼               ▼               ▼
┌───────────────────┐ ┌───────────────────┐ ┌───────────────────┐
│ investigate_      │ │ investigate_      │ │ investigate_      │
│ direction_job     │ │ direction_job     │ │ direction_job     │
│                   │ │                   │ │                   │
│ Direction 1       │ │ Direction 2       │ │ Direction 3       │
│ → Modal call      │ │ → Modal call      │ │ → Modal call      │
│ → 1-3 hypotheses  │ │ → 1-3 hypotheses  │ │ → 1-3 hypotheses  │
│ → Write to DB     │ │ → Write to DB     │ │ → Write to DB     │
│ → DECR counter    │ │ → DECR counter    │ │ → DECR counter    │
└─────────┬─────────┘ └─────────┬─────────┘ └─────────┬─────────┘
          │                     │                     │
          └──────────┬──────────┴──────────┬──────────┘
                     │                     │
                     ▼                     ▼
              counter > 0?            counter = 0?
                     │                     │
                     ▼                     ▼
                  (wait)         ┌─────────────────────┐
                                 │ finalize_analysis_  │
                                 │ job                 │
                                 │                     │
                                 │ 1. Load hypotheses  │
                                 │ 2. Deduplicate      │
                                 │ 3. FDR correction   │
                                 │ 4. Complete analysis│
                                 └─────────────────────┘

ANALYSIS_REQUEST 新格式

Continuous Learner 现在直接输出带 directions 的分析请求:
{
  "discovery_id": "2024-01-15_market_cap_issue",
  "question": "Why do 1798 companies have impossible market caps?",
  "context": "Only 8 companies globally should exceed $1T",
  "priority": "high",
  "directions": [
    {
      "id": "currency",
      "title": "Currency Conversion Issue",
      "description": "Foreign exchanges may store in local currency",
      "tables": ["stocks", "exchanges"],
      "approach": "Check correlation with non-USD exchanges"
    },
    {
      "id": "data_entry",
      "title": "Data Entry Errors",
      "description": "Values may be typos or missing decimals",
      "tables": ["stocks"],
      "approach": "Look for exact multiples of realistic values"
    }
  ]
}
关键变化:之前需要 8 个 agents 并发探索发现 “themes”,现在由 Learner 在发现问题时就思考好调查方向。

Worker 配置

# signal_agent/workers/settings.py

class WorkerSettings:
    functions = [
        start_analysis_job,        # 入口,创建 analysis,派发 directions
        investigate_direction_job, # 并行调查每个 direction
        finalize_analysis_job,     # 聚合、去重、完成
    ]

    redis_settings = get_redis_settings()  # 从 REDIS_URL 环境变量
    max_jobs = 10                           # 并发任务数
    job_timeout = 600                       # 10 分钟超时
    retry_jobs = True
    max_tries = 3
    queue_name = "arq:signal-scout"

代码位置

组件文件
Workers
Worker 配置workers/settings.py
Job Handlersworkers/handlers.py
Modal Functions
持续学习modal/agent.pycontinuous_learn()
方向调查modal/agent.pyinvestigate_direction()
Orchestrator
统一协调orchestrator.pyUnifiedOrchestrator
Prompts
Learner 指南prompts_steward.pyGUIDE
新格式定义prompts_steward.py → ANALYSIS_REQUEST section

部署

Docker (单容器,API + Worker)

CMD ["./start.sh"]
# 环境变量
REDIS_URL=rediss://user:pass@host:6379  # rediss:// 启用 SSL
DATABASE_URL=postgresql://...

# 控制启动
RUN_API=true     # 启动 FastAPI
RUN_WORKER=true  # 启动 ARQ Worker

本地开发

# 一起启动
./start.sh

# 或分开启动
PYTHONPATH=src uvicorn signal_agent.main:app --port 8000
PYTHONPATH=src python -m arq signal_agent.workers.settings.WorkerSettings

Redis Keys

Key用途
arq:signal-scoutJob 队列
analysis:{id}:directions_remaining跟踪剩余 directions

日志示例

[user_id] Starting analysis: Why do 1798... (3 directions)
[analysis_id] Enqueued direction: Currency Conversion Issue
[analysis_id] Enqueued direction: Data Entry Errors
[analysis_id] Direction currency: 2 hypotheses
[analysis_id] Directions remaining: 2
[analysis_id] Direction data_entry: 1 hypotheses
[analysis_id] Directions remaining: 1
[analysis_id] Direction etf: 1 hypotheses
[analysis_id] Directions remaining: 0
[analysis_id] All directions done, enqueued finalize job
[analysis_id] Analysis completed: 3 significant findings

设计决策

选择原因
ARQ over Celery轻量、原生 async、简单
Redis 计数器跟踪 direction 完成,触发 finalize
Learner 生成 directions减少 ~50 个 Modal 调用,更有针对性
单容器部署简化运维,共享连接池
REDIS_URL 支持兼容 Upstash 等云 Redis

迁移自layered_orchestrator.py (已删除)旧架构文档见:Data Steward 架构 (部分内容已过时)