spark1.5.x升级spark2.1.1代码改动

背景

公司一直用spark1.5.2, 最近将其升级至spark2.1.1。本文的总结升级过程需要改动的地方。涉及了spark普通项目和spark streaming项目,项目均为java语言开发。

改动一:flatMap and mapPartitions 返回iterator

对于spark1.5,flatMap和mapPartitions算子返回的是集合,比如list。而升级到spark2之后需要返回迭代器iterator。
参见官方说明:spark2.0 官方更新说明

Java RDD’s flatMap and mapPartitions functions used to require functions returning Java Iterable. They have been updated to require functions returning Java iterator so the functions do not need to materialize all the data.

  • spark1.5 代码 return list
JavaPairRDD<String, String> spark1FlatMap = pariRdd.flatMapToPair(tuple2 -> {
     List list = new ArrayList();
     list.add(tuple2);
     return list;
 });
  • spark2 代码 return iterator
JavaPairRDD<String, String> spark2FlatMap = pariRdd.flatMapToPair(tuple2 -> {
     List list = new ArrayList();
     list.add(tuple2);
     return list.iterator();
 });

改动二:foreachRDD 不再return null

  • spark1.5 代码需return null
    saveDstream.foreachRDD(rdd -> {
        rdd.unpersist();
        return null;
    });
  • spark2.1 代码不用return null
    saveDstream.foreachRDD(rdd -> {
        rdd.unpersist();
    });

改动三:spark streaming mapWithState 部分方法不兼容

mapWithState中更新状态的函数中我使用的Optional来自com.google.common.base,升级到spark2之后不可用。

 private JavaPairDStream<String, HashMap> updateState(JavaPairDStream<String, JSONObject> pairs) {
        Function2<List<JSONObject>, Optional<HashMap>, Optional<HashMap>> updateFunction =
                (newValues, oldStateOptional) -> {
                    HashMap newState = oldStateOptional.or(new HashMap());
         };
        JavaPairDStream<String, HashMap> statedPairs = pairs.updateStateByKey(updateFunction);
        return statedPairs;

改动四:spark streaming 中JavaStreamingContextFactory类 废弃

在streaming中用于driver重启,构造新JavaStreamingContext的JavaStreamingContextFactory类已经在spark2中不存在。

多版本部署

如果想同时运行多个版本的spark,只需下载相应版本,然后在运行脚本之前,export SPARK_HOME=指定版本位置,再用SPARK_HOME下面的spark-submit提交作业即可。样例如下:

#!/bin/sh
export SPARK_HOME=/usr/local/spark-2.1.1
$SPARK_HOME/bin/spark-submit  --class cn.your_package.your_main \
    --master yarn \
    --deploy-mode cluster \
    --driver-memory 1g \
    --executor-memory 3g \
    --num-executors 4 \
    --executor-cores 2 \
    ...

以上。
如需知道更多改动,参照官网各版本(spark2.0.0spark2.1.0)的Behavior Changes模块。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容