二、spark之RDD

image.png

引导

我们在前一篇已经学习了spark的相关概念,并写了一个简单的demo,那么我们本篇开始深入的学习spark其中的最核心的一个概念RDD

2.1、什么是RDD?

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,一个可并行操作的有容错机制的数据集合,是Spark中最基本的数据抽象,我们就把它当成简单的数据集合。

2.2、怎么使用RDD?

有两种方式创建RDD,

  • 1、第二种是初始化一个集合,这种叫做并行集合。
  • 2、第一种是读取外部数据集,如:共享的文件系统,HDFS,HBase,或者其他的Hadoop数据格式的数据源

两种创建方式都需要一样sc对象,sc同上文为JavaSparkContext
创建方法如下图:

//并行集合
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);
//外部文件
JavaRDD<String> lines = sc.textFile("/Users/sunliangliang/Documents/personal/csv/000002.csv");

2.3、RDD操作

spark算子:英文Operator,其实就是操作的意思,我们这里面是指操作运算等等,其实就是指函数化,通过调用函数处理,一个函数可以称之为算子。

RDD的操作分为两种

2.3.1、转化操作

这类称之为Transformation(转换/变换算子),这类操作是延迟计算,即从一个RDD转换成另外一个并不会马上执行,需要等待行动操作的时候才执行。
常见的Transformation算子如下

  • map()
public static void main(String[] args) {
    SparkConf conf = new SparkConf()
            .setAppName("WordCountLocal")
            .setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaRDD<Integer> datas = sc.parallelize(Arrays.asList(1,2,3,4));

    /**通过map算子实现平方**/
    JavaRDD<Integer> result = datas.map(new Function<Integer, Integer>() {
        @Override
        public Integer call(Integer x) throws Exception {
            return x*x; }
    });
    System.out.println(StringUtils.join(result.collect(),","));

}

运行结果如下图:

1,4,9,16
  • fliter()
JavaRDD<Integer> filter = datas.filter(new Function<Integer, Boolean>() {
    @Override
    public Boolean call(Integer x) throws Exception {
        return x>2;
    }
});


我们可以猜想输出为>2的值

3,4
image.png
  • flatMap()
    这是将每个输入元素生成多个输出元素,拍扁的意思,也就是将每个元素按照格式拆分成一行如下图
public static void main(String[] args) {
    SparkConf conf = new SparkConf()
            .setAppName("WordCountLocal")
            .setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaRDD<String> datas = sc.textFile("/Users/sunliangliang/Documents/personal/spark.txt");


    JavaRDD<String> flatMap = datas.flatMap(new FlatMapFunction<String, String>() {
        @Override
        public Iterator<String> call(String s) throws Exception {
            return Arrays.asList(s.split("\\s")).iterator();
        }
    });


    JavaRDD<List<String>> map = datas.map(new Function<String, List<String>>() {
        @Override
        public List<String> call(String s) throws Exception {
            return Arrays.asList(s.split("\\s"));
        }
    });

    System.out.println(StringUtils.join(map.collect(),","));
    System.out.println(StringUtils.join(flatMap.collect(),","));

}

输出结果

[hello, world],[a, new, line],[hello],[the, end]

hello,world,a,new,line,hello,the,end

其中spark.txt中的内容如下

hello world
a new line
hello
the end

我们看到将map是按行拆分,而flapMap 拆成了一个个单词,如下图

image.png

其他常见操作如下图

image.png

2.3.2、行动操作

行动操作主要包含,collect(),reduce(),aggregate()等

  • reduce()
    接收一个函数作为参数,这个函数操作两个相同类型的元素,并返回一个同样类型的数据。最常见的就是叠加等。
public static void reduce(){
    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5));
    int sum = rdd.reduce(new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer x, Integer y) throws Exception {
            return x+y;
        }
    });
    System.out.println(sum);
}

输出结果如下

15
  • aggregate()
    TODO
    求平均值
2.4、不同类型间的转换
JavaRDD<Integer> datas = sc.parallelize(Arrays.asList(1,2,3,4,5));
JavaDoubleRDD result = datas.mapToDouble(new DoubleFunction<Integer>() {
    @Override
    public double call(Integer x) throws Exception {
        return (double) x+x;
    }
});
2.5、持久化

通过以下两种方式持久化一个rdd,然后将其保存在美国节点内存。该缓存是一个容错技术。
也就是缓存,有两种方式

  • cache():只是缓存到默认的缓存级别:只使用内存
  • persist():可以自定义缓存级别
    使用方式如下
rdd.persist(StorageLevel.DISK_ONLY());
rdd.cache();

RDDS特性

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,921评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,635评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,393评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,836评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,833评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,685评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,043评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,694评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,671评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,670评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,779评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,424评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,027评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,984评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,214评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,108评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,517评论 2 343

推荐阅读更多精彩内容