package com.everdata.spark;
import java.io.IOException;
import java.sql.Array;
import java.sql.Connection;
import java.sql.Statement;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import scala.reflect.ClassTag;
public class AppearOne {
//private static CanReportRequireConfigBean configBean ;
private static Long lastTimeMiles = new Date().getTime() - 10 * 60 * 1000;
private static String[] global_args ;
public static void main(String[] args) {
//读取json配置
//generateConfig(args[0]);
global_args = args;
/*if(StringUtils.isNotEmpty(args[1])) {
lastTimeMiles = DateUtil.parseDate(args[0].trim(), DateUtil.COMPACT_MINUTE_FORMAT).getTime();
System.out.println(lastTimeMiles +":"+ args[0]);
}*/
SparkSession spark = SparkSession
.builder()
.appName("AppearOne")
.master("local[1]")
.config("spark.sql.parquet.binaryAsString", "true")
.getOrCreate();
System.out.println("=========="+args[0]);
ClassTag<String> tag = scala.reflect.ClassTag$.MODULE$.apply(String.class);
Broadcast<String> s=spark.sparkContext().broadcast(args[0], tag);
Dataset<Row> parquetFileDF = spark.read().parquet("d://xxx.parquet");
final Dataset<String> baseDS=parquetFileDF.map(new MapFunction<Row,String>(){
private static final long serialVersionUID = 1L;
@Override
public String call(Row value) throws Exception {
return value.getLong(0)+"";
}
},Encoders.STRING());
System.out.println("==================="+baseDS.count());
baseDS.javaRDD().foreach(new VoidFunction<String>() {
private static final long serialVersionUID = 1L;
@Override
public void call(String line) throws Exception {
System.out.println(line);
System.out.println(s.getValue());
}
});
}
}
spark广播变量
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
推荐阅读更多精彩内容
- 本文全部手写原创,请勿复制粘贴、转载请注明出处,谢谢配合! 前言:Spark是集群部署的,具有很多节点,节点之间的...
- 一.广播变量和累加器的作用累加器(集群规模之间的大变量):做Spark的全局统计使用广播变量(集群规模间的大常量)...
- Broadcast 顾名思义,broadcast 就是将数据从一个节点发送到其他各个节点上去。这样的场景很多,比如...