架构概览
Version: 2.0 (December 2024)
Signal Scout 后端采用 ARQ (Async Redis Queue) 实现异步任务处理,将持续学习和数据分析解耦为独立的 Worker Jobs。
┌─────────────────────────────────────────────────────────────────┐
│ Signal Scout Backend │
├─────────────────────────────────────────────────────────────────┤
│ │
│ FastAPI ──→ Orchestrator ──→ Modal continuous_learn() │
│ │ │
│ directions[] ─────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Redis (ARQ Queue) │ │
│ │ start_analysis_job → investigate_direction_job (×N) │ │
│ │ → finalize_analysis_job │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ PostgreSQL │ │ Modal │ │
│ │ (State) │ │ (Execution) │ │
│ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
核心改进
之前 vs 之后
| 指标 | 之前 (Layered) | 之后 (ARQ) |
|---|
| Modal 调用数 | ~58 | ~5 |
| 任务队列 | 无 (asyncio only) | Redis + ARQ |
| 容错性 | 进程崩=丢失 | 任务持久化,可重试 |
| 可扩展 | 单进程 | 多 Worker |
| 复杂度 | layered_orchestrator.py (600+ 行) | 4 个简单 job 函数 |
删除的组件
- ❌
layered_orchestrator.py - 完全删除
- ❌
theme_discovery() Modal 方法 - 不再需要
- ❌ 8 agents 并发主题发现 - 由 Learner 直接生成 directions
Job Flow
┌─────────────────────────┐
│ ANALYSIS_REQUEST │
│ from Continuous Learn │
│ │
│ question: "Why..." │
│ directions: [ │
│ {id, title, ...}, │
│ {id, title, ...}, │
│ ] │
└───────────┬─────────────┘
│
▼
┌───────────────────────────────────────────────────────────────────┐
│ start_analysis_job │
│ │
│ 1. Create analysis record (status: investigating) │
│ 2. Set Redis counter: directions_remaining = N │
│ 3. Enqueue N investigate_direction_jobs │
└───────────────────────────────┬───────────────────────────────────┘
│
┌───────────────┼───────────────┐
│ │ │
▼ ▼ ▼
┌───────────────────┐ ┌───────────────────┐ ┌───────────────────┐
│ investigate_ │ │ investigate_ │ │ investigate_ │
│ direction_job │ │ direction_job │ │ direction_job │
│ │ │ │ │ │
│ Direction 1 │ │ Direction 2 │ │ Direction 3 │
│ → Modal call │ │ → Modal call │ │ → Modal call │
│ → 1-3 hypotheses │ │ → 1-3 hypotheses │ │ → 1-3 hypotheses │
│ → Write to DB │ │ → Write to DB │ │ → Write to DB │
│ → DECR counter │ │ → DECR counter │ │ → DECR counter │
└─────────┬─────────┘ └─────────┬─────────┘ └─────────┬─────────┘
│ │ │
└──────────┬──────────┴──────────┬──────────┘
│ │
▼ ▼
counter > 0? counter = 0?
│ │
▼ ▼
(wait) ┌─────────────────────┐
│ finalize_analysis_ │
│ job │
│ │
│ 1. Load hypotheses │
│ 2. Deduplicate │
│ 3. FDR correction │
│ 4. Complete analysis│
└─────────────────────┘
ANALYSIS_REQUEST 新格式
Continuous Learner 现在直接输出带 directions 的分析请求:
{
"discovery_id": "2024-01-15_market_cap_issue",
"question": "Why do 1798 companies have impossible market caps?",
"context": "Only 8 companies globally should exceed $1T",
"priority": "high",
"directions": [
{
"id": "currency",
"title": "Currency Conversion Issue",
"description": "Foreign exchanges may store in local currency",
"tables": ["stocks", "exchanges"],
"approach": "Check correlation with non-USD exchanges"
},
{
"id": "data_entry",
"title": "Data Entry Errors",
"description": "Values may be typos or missing decimals",
"tables": ["stocks"],
"approach": "Look for exact multiples of realistic values"
}
]
}
关键变化:之前需要 8 个 agents 并发探索发现 “themes”,现在由 Learner 在发现问题时就思考好调查方向。
Worker 配置
# signal_agent/workers/settings.py
class WorkerSettings:
functions = [
start_analysis_job, # 入口,创建 analysis,派发 directions
investigate_direction_job, # 并行调查每个 direction
finalize_analysis_job, # 聚合、去重、完成
]
redis_settings = get_redis_settings() # 从 REDIS_URL 环境变量
max_jobs = 10 # 并发任务数
job_timeout = 600 # 10 分钟超时
retry_jobs = True
max_tries = 3
queue_name = "arq:signal-scout"
代码位置
| 组件 | 文件 |
|---|
| Workers | |
| Worker 配置 | workers/settings.py |
| Job Handlers | workers/handlers.py |
| Modal Functions | |
| 持续学习 | modal/agent.py → continuous_learn() |
| 方向调查 | modal/agent.py → investigate_direction() |
| Orchestrator | |
| 统一协调 | orchestrator.py → UnifiedOrchestrator |
| Prompts | |
| Learner 指南 | prompts_steward.py → GUIDE |
| 新格式定义 | prompts_steward.py → ANALYSIS_REQUEST section |
Docker (单容器,API + Worker)
# 环境变量
REDIS_URL=rediss://user:pass@host:6379 # rediss:// 启用 SSL
DATABASE_URL=postgresql://...
# 控制启动
RUN_API=true # 启动 FastAPI
RUN_WORKER=true # 启动 ARQ Worker
本地开发
# 一起启动
./start.sh
# 或分开启动
PYTHONPATH=src uvicorn signal_agent.main:app --port 8000
PYTHONPATH=src python -m arq signal_agent.workers.settings.WorkerSettings
Redis Keys
| Key | 用途 |
|---|
arq:signal-scout | Job 队列 |
analysis:{id}:directions_remaining | 跟踪剩余 directions |
日志示例
[user_id] Starting analysis: Why do 1798... (3 directions)
[analysis_id] Enqueued direction: Currency Conversion Issue
[analysis_id] Enqueued direction: Data Entry Errors
[analysis_id] Direction currency: 2 hypotheses
[analysis_id] Directions remaining: 2
[analysis_id] Direction data_entry: 1 hypotheses
[analysis_id] Directions remaining: 1
[analysis_id] Direction etf: 1 hypotheses
[analysis_id] Directions remaining: 0
[analysis_id] All directions done, enqueued finalize job
[analysis_id] Analysis completed: 3 significant findings
设计决策
| 选择 | 原因 |
|---|
| ARQ over Celery | 轻量、原生 async、简单 |
| Redis 计数器 | 跟踪 direction 完成,触发 finalize |
| Learner 生成 directions | 减少 ~50 个 Modal 调用,更有针对性 |
| 单容器部署 | 简化运维,共享连接池 |
| REDIS_URL 支持 | 兼容 Upstash 等云 Redis |