使用dstream.foreachRDD发送数据到外部系统
经过Spark Streaming处理后的数据经常需要推到外部系统,比如缓存、数据库、消息系统、文件系统、实时数据大屏等
放一张官网上的图:
其中,最常用的就是使用方法dstream.foreachRDD
看下最佳用法:
dstream.foreachRDD(rdd -> {
rdd.foreachPartition(partitionOfRecords -> {
// ConnectionPool is a static, lazily initialized pool of connections
Connection connection = ConnectionPool.getConnection();
while (partitionOfRecords.hasNext()) {
connection.send(partitionOfRecords.next());
}
ConnectionPool.returnConnection(connection); // return to the pool for future reuse
});
});
- 循环每个分区
- 在每个分区中,从连接池获取连接(数据库/缓存等)
- 循环操作每条记录,存储或者发送数据
- 释放连接
几个常见的错误/低效用法
- dirver端创建连接, worker端使用连接(序列化/初始化错误等)
dstream.foreachRDD(rdd -> {
Connection connection = createNewConnection(); // executed at the driver
rdd.foreach(record -> {
connection.send(record); // executed at the worker
});
});
- 每条记录创建一个连接(开销太高)
dstream.foreachRDD(rdd -> {
rdd.foreach(record -> {
Connection connection = createNewConnection();
connection.send(record);
connection.close();
});
});
上面的代码会在worker端创建连接并使用,但是每条记录都会创建新的连接
当然可以使用连接池进行优化,但是还有更好的方法
- 每个分区创建一个连接(可进一步使用连接池优化)
dstream.foreachRDD(rdd -> {
rdd.foreachPartition(partitionOfRecords -> {
Connection connection = createNewConnection();
while (partitionOfRecords.hasNext()) {
connection.send(partitionOfRecords.next());
}
connection.close();
});
});
上面的没啥大问题了,使用连接池后就是最上面的最佳用法了