< 返回技术文档列表

使用dstream.foreachRDD发送数据到外部系统

发布时间:2021-04-20 00:18:24⊙投诉举报

使用dstream.foreachRDD发送数据到外部系统
经过Spark Streaming解决后的数据经常需要推到外部系统,比方缓存、数据库、消息系统、文件系统、实时数据大屏等

放一张官网上的图:

图片来自https://spark.apache.org/docs/latest/streaming-programming-guide.html#overview

其中,最常用的就是使用方法dstream.foreachRDD
看下最佳用法:

dstream.foreachRDD(rdd -> {  rdd.foreachPartition(partitionOfRecords -> {    // ConnectionPool is a static, lazily initialized pool of connections    Connection connection = ConnectionPool.getConnection();    while (partitionOfRecords.hasNext()) {      connection.send(partitionOfRecords.next());    }    ConnectionPool.returnConnection(connection); // return to the pool for future reuse  });});
  1. 循环每个分区
  2. 在每个分区中,从连接池获取连接(数据库/缓存等)
  3. 循环操作每条记录,存储或者者发送数据
  4. 释放连接

几个常见的错误/低效用法

  1. dirver端创立连接, worker端使用连接(序列化/初始化错误等)
dstream.foreachRDD(rdd -> {  Connection connection = createNewConnection(); // executed at the driver  rdd.foreach(record -> {    connection.send(record); // executed at the worker  });});
  1. 每条记录创立一个连接(开销太高)
dstream.foreachRDD(rdd -> {  rdd.foreach(record -> {    Connection connection = createNewConnection();    connection.send(record);    connection.close();  });});

上面的代码会在worker端创立连接并使用,但是每条记录都会创立新的连接
当然可以使用连接池进行优化,但是还有更好的方法

  1. 每个分区创立一个连接(可进一步使用连接池优化)
dstream.foreachRDD(rdd -> {  rdd.foreachPartition(partitionOfRecords -> {    Connection connection = createNewConnection();    while (partitionOfRecords.hasNext()) {      connection.send(partitionOfRecords.next());    }    connection.close();  });});

上面的没啥大问题了,使用连接池后就是最上面的最佳用法了

参考
Design Patterns for using foreachRDD


/template/Home/Zkeys/PC/Static