我们要在以太坊主网做全网扫描的实时 AML 风险监控报警,约束如下:
Transfer 事件我们并不处理全链所有交易,而是处理指定 200 合约的 Transfer logs:
核心采用“可回放的流式管道”,将采集与计算解耦:
eth_getLogs 拉取:
Transfer(address,address,uint256) 事件签名block_hash,发现重组则标记相关事件 INVALIDATED,并触发撤销/更正机制alerts:对外可见报警cases/tasks:下游多 worker 归因/检验任务(可选强制)对每条 Transfer log 定义:
event_id = (chain_id, tx_hash, log_index)用于:
chain_id, block_number, block_hash, block_timetx_hash, log_indextoken_contract, amount_rawfrom, toamount_usd_approx(稳定币近似等价 USD,但仍需处理 decimals)对每个 confirmed Transfer 事件,派生两条评估单元(assessment):
subject = from, counterparty = to, direction = outsubject = to, counterparty = from, direction = in好处:
subject_id命中强标签(举例):
特点:
基于近 5m/1h/24h:
建议以离线预计算为主、在线查询为辅:
即使我们主线是 confirmed 才报警,仍必须具备撤销能力(应对 reorg、未来 pending 预警、规则更正)。
建议对外协议定义 3 类消息:
ALERT:逐笔报警HOT_SUMMARY:热门地址汇总报警CANCEL:撤销/取消(指向已有的 alert_id/summary_id)幂等要求:
reason_code 需 cooldown(例如 30 分钟)防重复关键点:热门维持不能依赖“逐笔报警次数”,否则进入热门后不逐笔会导致热门无法维持。必须用“事件强度/风险积分”来维持。
对每条 confirmed assessment(subject 视角)产生一个事件权重 [w]:
high => [w = 5]med => [w = 2]low => [w = 0](不计入热门积分,减少噪声与状态规模)滚动 24h 积分:
建议初始阈值:
解释:
以 subject_id 为 key,保存:
score_buckets[144]:过去 24h 每个 10 分钟桶的积分bucket_start_ts:当前桶起始时间(对齐到 10 分钟)cursor:当前桶索引S24:24h 总积分is_hot:当前是否热门更新方法:
该预算仅作用于“非热门状态”。状态字段:
sent_alert_times[]:最近 24h 逐笔报警发送时间(保留最近 3~10 个即可)sent_count_24h:可由数组长度计算last_alert_time_by_reason[reason_code]:原因级节流(cooldown)判断顺序(推荐):
is_hot=true:不走逐笔预算,直接聚合汇总的唯一键建议:
summary_id = (chain_id, subject_id, bucket_start_ts)要求:
每个 10 分钟桶内聚合:
tx_count_in/outsum_amount_in/out(稳定币可近似 USD;或按 token 汇总)unique_counterparty_count(可近似)top_counterparties_by_amount(Top 5)top_counterparties_by_risk(Top 5)top_reasons(Top 3:anchor/behavior/exposure 子类)highest_severityevidence_refs(1–3 条代表性证据引用:A/C/B 策略)
两种推荐方式(二选一):
方式 A(更准):定时 flush
方式 B(更简):跨桶时 flush 上一桶
你要求“每 10 分钟一次”,更匹配方式 A。
建议将“报警”与“归因任务”拆开:
alerts topic:对外/运营消费cases 或 tasks topic:给多个 worker 的检验任务热门地址场景:
summary_id 可对应一个 case_id(一桶一 case)case_update:
并发与幂等:
case_id 稳定subject_id,减少同一地址跨 worker 冲突建议按“区块高度”驱动采集:
eth_getLogs 拉取范围 [B, B] 的 logsmined_events topic为什么要按块拉:
幂等:
event_id=(tx_hash,log_index) 去重/upsert维护 block_number -> block_hash 映射(至少覆盖最近若干百块):
N=2 已显著降低 reorg 概率,但不能假设为 0。
两条主流路径:
confirmed_events topicblocks 表记录高度/hashevents_by_block 存 block_number -> event_ids流处理的优势:天然适合“回放、扩容、状态一致性”。
address -> {tags, risk_level, entity_type, confidence, label_version}即使你实时不做复杂图搜索,也要保证“证据可回放”:
tx_hash, log_index, token, from, to, amount, block_number这样订阅增强、人工审核、审计复盘都有稳定抓手。
label_version, as_of, source, confidence本设计的核心特点与扩展方向:
说明:方框=服务/组件;圆柱=Kafka Topic(日志流);箭头=数据流向
Producer=写入 Kafka;Consumer=从 Kafka 读取
flowchart LR
%% ===== Sources =====
subgraph ETH[以太坊网络 / RPC 节点]
RPC[(RPC / Archive Node)]
end
%% ===== Ingest =====
subgraph ING[采集层]
IDX["Go Indexer / Ingest Service\n(Producer)"]
end
%% ===== Kafka =====
subgraph KFK[Kafka Cluster]
T_MINED[(Topic: mined_events\nkey: token_contract 或 block_number)]
T_CONF[(Topic: confirmed_events\nkey: subject_id 或 tx_hash)]
T_ALERT[(Topic: alerts\n逐笔 ALERT + 撤销 CANCEL)]
T_SUM[(Topic: hot_summaries\n每10分钟 HOT_SUMMARY)]
T_CASE[(Topic: cases/tasks\n给归因/检验 Worker)]
T_UPD[(Topic: case_updates\nWorker 回写结果)]
T_LABEL[(Topic: label_updates\n标签/画像更新流)]
end
%% ===== Processing =====
subgraph PROC[处理层]
CM["Confirmation Manager (N=2)\nConsumer+Producer"]
RE[Risk Engine\nConsumer+Producer]
AE[Alert/Hot Engine\nConsumer+Producer]
end
%% ===== Data Stores =====
subgraph DS[数据存储/服务]
LKV[(Labels/Profile KV\nRedis/Scylla/etc.)]
CH[(ClickHouse/Pinot\n事件明细/查询)]
OBJ[(Object Storage\n冷归档 Parquet)]
end
%% ===== Workers =====
subgraph WRK[下游归因与检验]
W1["Worker Group\n(Consumer)"]
end
%% ===== Sinks =====
subgraph SINK[下游消费/通知]
OPS[运营台/告警系统]
SUB[订阅推送/Webhook]
end
%% ===== Flows =====
RPC -->|eth_getLogs by block| IDX
IDX -->|Transfer logs 规范化事件| T_MINED
T_MINED -->|按块缓冲&reorg检测| CM
CM -->|达到 N=2 输出| T_CONF
CM -->|reorg 导致失效| T_ALERT
T_CONF -->|生成 subject/counterparty 评估| RE
RE -->|带风险信号的评估事件| AE
AE -->|非热门: 逐笔报警| T_ALERT
AE -->|热门: 10分钟汇总| T_SUM
AE -->|"归因/检验任务(可选)"| T_CASE
T_CASE --> W1
W1 -->|结论/归因/误报标记| T_UPD
T_UPD -->|"更新标签/画像(回灌)"| LKV
T_LABEL -->|标签更新订阅| RE
%% Evidence storage
T_MINED --> CH
T_CONF --> CH
T_ALERT --> CH
T_SUM --> CH
CH --> OBJ
%% Downstream notifications
T_ALERT --> OPS
T_SUM --> OPS
T_ALERT --> SUB
T_SUM --> SUB
mined_events)mined_events) + Producer(confirmed_events/alerts(CANCEL))confirmed_events + label_updates) + Producer(评估输出给 Alert Engine)alerts/hot_summaries/cases)cases) + Producer(case_updates)label_updates 广播)如果你告诉我你希望 mined_events 的分区 key 用 block_number 还是 token_contract(以及 confirmed_events 用 subject_id 还是 tx_hash),我可以把“按分区保证顺序/并行度”的版本也画出来。