Substreams 流式框架入门:Solana 链上数据实时处理

Substreams 流式框架入门:Solana 链上数据实时处理

本文介绍 Substreams 流式框架的核心概念,以 Solana 生态为例,讲解 WASM 运行时、Protobuf 序列化、模块化架构等关键技术。

背景

在区块链数据处理领域,传统的索引方案往往面临以下挑战:

  • 延迟高:轮询式抓取无法做到实时响应
  • 扩展难:新增协议需要大量重构
  • 性能瓶颈:单线程处理难以应对高吞吐

Substreams 是 StreamingFast 推出的流式数据处理框架,专为区块链数据索引而设计。本文将以 Solana 生态为例,介绍其核心原理与实践。


核心概念

1. 流式处理模型

Substreams 采用推送模型,由节点主动推送区块数据,而非客户端轮询:

1
2
传统模式: Client -> Request -> Node -> Response (轮询)
Substreams: Node -> Block Stream -> Client (推送)

这种模式的优势:

  • 低延迟:新区块产生即刻推送
  • 高效利用:避免无效轮询请求
  • 断点续传:通过游标(cursor)实现精确恢复

2. WebAssembly 运行时

Substreams 的数据处理逻辑编译为 WebAssembly (WASM),在沙箱环境中执行:

1
2
# 编译为 WASM
cargo build --target wasm32-unknown-unknown --release

WASM 的优势:

  • 安全性:沙箱隔离,防止恶意代码
  • 跨平台:一次编译,多处运行
  • 高性能:接近原生的执行速度
  • 可移植:模块可分发复用

3. Protocol Buffers 序列化

数据格式采用 Protocol Buffers,相比 JSON 有显著优势:

特性 Protobuf JSON
序列化大小 小(二进制) 大(文本)
解析速度
Schema 强类型
向后兼容 支持 需手动处理

定义示例:

1
2
3
4
5
6
7
8
syntax = "proto3";
package solana_hub;

message SolanaHubBlockEvents {
pumpfun.PumpfunBlockEvents pumpfun_events = 1;
raydium_amm.RaydiumAmmBlockEvents raydium_events = 2;
meteora_dlmm.MeteoraDlmmBlockEvents meteora_dlmm_events = 3;
}

模块化架构设计

solana-substreams 项目为例,展示模块化架构:

1
2
3
4
5
6
7
8
.
├── solana_hub/ # 核心汇聚模块
├── spl_token/ # SPL Token 事件处理
├── system_program/ # 系统程序事件
├── raydium_amm/ # Raydium AMM 协议
├── pumpfun/ # Pumpfun Meme 币平台
├── meteora_dlmm/ # Meteora DLMM 协议
└── orca_whirlpools/ # Orca 集中流动性协议

模块独立性

每个模块独立封装,可单独运行:

1
2
3
4
5
# 单独运行 Raydium 模块
cd raydium_amm && substreams gui

# 单独运行 Pumpfun 模块
cd pumpfun && substreams gui

统一汇聚层

solana_hub 作为汇聚层,整合所有子模块数据流:

1
2
3
4
5
6
7
8
// 消费者只需订阅一个流即可获取全量事件
let stream = SubstreamsStream::new(
endpoint,
"solana_hub",
output_module,
start_block,
cursor,
);

实战:游标持久化

在流式处理中,游标(cursor) 标记当前处理位置,用于断点续传。以下是实际项目中的游标批处理实现:

批量缓存策略

1
2
3
4
5
6
7
8
9
10
11
12
// 达到 10000 个游标时批量提交
const CURSOR_BATCH_SIZE: usize = 10000;
// 每小时强制刷新
const CURSOR_FLUSH_INTERVAL_MS: u64 = 3600000;

fn enqueue_cursor(platform: &str, cursor: String) {
let mut cursors = LATEST_CURSORS.lock().unwrap();
cursors.insert(platform.to_string(), cursor);

let count = CURSOR_COUNTER.fetch_add(1, Ordering::SeqCst);
// 批量提交逻辑...
}

Redis Pipeline 持久化

1
2
3
4
5
6
7
8
9
10
11
12
fn flush_cursors(force: bool) -> Result<usize, anyhow::Error> {
let mut pipeline = redis::pipe();
pipeline.atomic(); // 原子操作

for (platform, cursor) in cursors.iter() {
let key = format!("{}_last_cursor", platform);
pipeline.set(&key, cursor);
}

pipeline.query(&mut conn)?; // 一次网络往返完成所有写入
Ok(count)
}

构建与部署

环境准备

1
2
3
4
5
6
7
8
# 安装 Substreams CLI
curl https://substreams.streamingfast.io/install | bash

# 设置 API 密钥
export STREAMINGFAST_KEY=your_api_key_here

# 安装 Rust target
rustup target add wasm32-unknown-unknown

构建流程

1
2
3
4
5
6
7
8
9
10
11
# 生成 Protobuf 代码
buf generate

# 编译为 WASM
make build

# 打包为 Substreams 包
make package

# 启动数据流
make stream START=344531182

性能优化要点

1. 批量处理

  • 游标批量持久化(减少 Redis 网络往返)
  • Kafka 批量投递(提高吞吐量)

2. 并发模型

  • 使用 Tokio 异步运行时
  • 多模块并行处理

3. 内存管理

  • 避免不必要的克隆
  • 使用引用计数智能指针

应用场景

  1. DeFi 数据索引:实时捕获交易、流动性变化
  2. NFT 追踪:监控铸造、转移事件
  3. 链上监控:特定地址或合约活动追踪
  4. 数据分析:构建实时数据仓库

总结

Substreams 为区块链数据处理提供了现代化的解决方案:

  • 流式推送:低延迟实时处理
  • WASM 运行时:安全、高效、可移植
  • Protobuf 序列化:紧凑、快速、强类型
  • 模块化架构:易扩展、可复用

对于 Solana 生态的开发者,Substreams 是构建高性能数据基础设施的理想选择。


参考资料