Redis Pipeline 批处理:游标持久化的高性能实践

Redis Pipeline 批处理:游标持久化的高性能实践

本文讲解在爬虫和流处理场景下,如何利用 Redis Pipeline 实现批量游标持久化,大幅减少网络往返,提升系统吞吐。

背景

在数据流处理系统中,游标(cursor) 是标记处理进度的重要机制。每次处理完一批数据后,需要保存当前位置,以便故障恢复时从断点继续。

传统做法是每处理一条数据就更新一次游标:

1
2
3
4
// 传统方式:每次单独写入
for (platform, cursor) in cursors {
redis::cmd("SET").arg(&key).arg(&cursor).query(&mut conn)?;
}

问题在于:每次 SET 操作都是一次完整的网络往返。假设 Redis 延迟 1ms,处理 10000 个游标就需要 10 秒!


Pipeline 原理

Redis Pipeline 允许将多条命令打包发送,一次性获取所有响应:

1
2
3
4
5
6
7
8
9
传统模式:
Client -> SET key1 -> Server -> OK -> Client (1 RTT)
Client -> SET key2 -> Server -> OK -> Client (1 RTT)
... 总计 N 次 RTT

Pipeline 模式:
Client -> SET key1, SET key2, SET key3... -> Server
Server -> OK, OK, OK... -> Client
... 总计 1 次 RTT

性能对比

操作数量 传统方式 (RTT=1ms) Pipeline
100 100ms ~2ms
1000 1000ms ~3ms
10000 10000ms ~5ms

实战:游标批处理持久化

以下代码来自实际的 Solana 数据流处理项目。

批量缓存策略

首先在内存中缓存游标,达到阈值后批量提交:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Mutex;
use std::collections::HashMap;

const CURSOR_BATCH_SIZE: usize = 10000; // 达到 10000 个时批量提交
const CURSOR_FLUSH_INTERVAL_MS: u64 = 3600000; // 每小时强制刷新

lazy_static! {
static ref LATEST_CURSORS: Mutex<HashMap<String, String>> = Mutex::new(HashMap::new());
static ref CURSOR_COUNTER: AtomicU64 = AtomicU64::new(0);
}

fn enqueue_cursor(platform: &str, cursor: String) {
let mut cursors = LATEST_CURSORS.lock().unwrap();
cursors.insert(platform.to_string(), cursor);

let count = CURSOR_COUNTER.fetch_add(1, Ordering::SeqCst);
if count % 10 == 0 {
debug!("已缓存 {} 个游标", count);
}
}

Pipeline 批量持久化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
use redis::Commands;
use std::time::Instant;

fn flush_cursors(force: bool) -> Result<usize, anyhow::Error> {
let flush_start = Instant::now();
let mut cursors = LATEST_CURSORS.lock().unwrap();

// 不满足条件则跳过
if cursors.is_empty() || (!force && cursors.len() < CURSOR_BATCH_SIZE) {
return Ok(0);
}

let mut conn = get_conn();

// 创建 Pipeline
let mut pipeline = redis::pipe();
pipeline.atomic(); // 标记为原子操作

let count = cursors.len();
for (platform, cursor) in cursors.iter() {
let key = format!("{}_last_cursor", platform);
pipeline.set(&key, cursor); // 添加命令到 Pipeline
}

// 一次网络往返执行所有命令
match pipeline.query::<()>(&mut conn) {
Ok(_) => {
debug!("批量持久化 {} 个游标完成,耗时: {:.0}ms",
count, flush_start.elapsed().as_millis() as f64);
cursors.clear();
Ok(count)
},
Err(e) => {
error!("批量持久化游标失败: {:?}", e);
Err(anyhow::anyhow!("Redis批量操作失败: {}", e))
}
}
}

定时刷新任务

结合 Tokio 异步运行时,实现定时刷新:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
use tokio::sync::mpsc::Receiver;
use tokio::time::interval;
use std::time::Duration;

async fn start_cursor_flush_task(mut shutdown_rx: Receiver<()>) {
let mut interval = interval(Duration::from_millis(CURSOR_FLUSH_INTERVAL_MS));

loop {
tokio::select! {
_ = interval.tick() => {
match flush_cursors(true) {
Ok(count) if count > 0 => {
debug!("定时刷新了 {} 个游标", count);
},
Err(e) => error!("定时刷新游标失败: {:?}", e),
_ => {}
}
}
_ = shutdown_rx.recv() => {
info!("接收到关闭信号,进行最终游标刷新");
let _ = flush_cursors(true); // 退出前确保数据不丢失
break;
}
}
}
}

关键设计要点

1. 双重触发机制

  • 数量触发:达到 CURSOR_BATCH_SIZE 时提交
  • 时间触发:定时强制刷新,避免数据积压过久

2. 原子性保证

1
pipeline.atomic();  // 使用 MULTI/EXEC 包裹

这确保所有命令作为一个事务执行,要么全部成功,要么全部失败。

3. 优雅关闭

1
2
3
4
_ = shutdown_rx.recv() => {
let _ = flush_cursors(true); // 最终刷新
break;
}

程序退出前执行最后一次刷新,确保内存中的游标不丢失。


性能优化建议

Pipeline 大小控制

Pipeline 不是越大越好:

  • 过大:内存占用高,可能阻塞其他操作
  • 过小:网络往返次数多

建议根据业务场景测试确定最佳批量大小:

1
2
3
4
// 测试不同批量大小
for batch_size in [100, 500, 1000, 5000, 10000] {
benchmark_flush(batch_size);
}

连接池复用

1
2
3
4
5
6
7
8
9
lazy_static! {
static ref REDIS_CLIENT: redis::Client = {
redis::Client::open(redis_endpoint).expect("Failed to create Redis client")
};
}

fn get_conn() -> redis::Connection {
REDIS_CLIENT.get_connection().unwrap()
}

避免每次操作都创建新连接。


应用场景

  1. 流式数据处理:Kafka consumer offset、Substreams cursor
  2. 爬虫进度追踪:记录爬取位置,支持断点续爬
  3. 分布式任务调度:Worker 心跳和进度汇报
  4. 实时数据同步:CDC 场景的 binlog 位置记录

完整示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
use redis::Commands;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};

pub struct CursorManager {
cursors: Mutex<HashMap<String, String>>,
batch_size: usize,
redis_client: redis::Client,
}

impl CursorManager {
pub fn new(redis_url: &str, batch_size: usize) -> Self {
Self {
cursors: Mutex::new(HashMap::new()),
batch_size,
redis_client: redis::Client::open(redis_url).unwrap(),
}
}

pub fn update(&self, key: &str, cursor: String) {
let mut cursors = self.cursors.lock().unwrap();
cursors.insert(key.to_string(), cursor);
}

pub fn flush(&self) -> Result<usize, redis::RedisError> {
let mut cursors = self.cursors.lock().unwrap();
if cursors.len() < self.batch_size {
return Ok(0);
}

let mut conn = self.redis_client.get_connection()?;
let mut pipe = redis::pipe();
pipe.atomic();

let count = cursors.len();
for (key, cursor) in cursors.iter() {
pipe.set(key, cursor);
}

pipe.query(&mut conn)?;
cursors.clear();
Ok(count)
}
}

总结

Redis Pipeline 是提升批量操作性能的利器:

优化点 效果
减少网络往返 延迟降低 10-100 倍
批量提交 提高吞吐量
原子操作 保证数据一致性
定时刷新 防止数据积压

在流处理、爬虫等需要频繁更新状态的场景中,Pipeline + 批量缓存是标准的高性能实践。


参考资料