spark学习(二)从hello world开始

  • 每当第一次学习一门语言时,都会写个helloWorld程序,spark也不例外,让我们从spark的helloWorld(word count)开始。在github中,spark有完整的word count源码,代码清晰整洁,是用RDD完成编码的,但上一节提到过spark 2.0之后推荐使用Dataset进行编码,所以本节笔者试着使用Dataset进行word count程序编写。大家可以点击代码连接 查看源码。废话不多说,先看看代码,感受一下。
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Encoders;
    import org.apache.spark.sql.SparkSession;
    import org.junit.BeforeClass;
    import org.junit.Test;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Objects;
    
    /**
     * @program: sparkstudy
     * @description:
     * @author: lin wang
     * @create: 2019-10-29
     **/
    
    public class WordsCount {
        private static SparkSession sparkSession;
    
        @Data
        @AllArgsConstructor
        @NoArgsConstructor
        public static class Words {
            private String word;
            private Long count;
        }
    
        @BeforeClass
        public static void before() {
            sparkSession = SparkSession
                    .builder()
                    .appName("words count")
                    .master("local[*]")
                    .getOrCreate();
        }
    
        @Test
        public void wordsCountInvoke() {
            // read file contents from sound_of_silence.txt
            Dataset<String> dataset = sparkSession
                    .read()
                    .textFile(Objects.requireNonNull(this.getClass().getClassLoader().getResource("sound_of_silence.txt")).toString());
            // calculator words
            List<Words> wordsList = dataset
                    .flatMap((line) -> {
                        List<Words> words = new ArrayList<>();
                        for (String word : line.split(" ")) {
                            words.add(new Words(word, 1L));
                        }
                        return words.iterator();
                    }, Encoders.bean(Words.class))
                    .groupByKey(Words::getWord, Encoders.STRING())
                    .reduceGroups((x, y) -> {
                        x.setCount(x.getCount() + y.getCount());
                        return x;
                    })
                    .map(x -> x._2, Encoders.bean(Words.class))
                    .collectAsList();
            System.out.println("The results are :");
            wordsList.forEach(words -> System.out.println(String.join(":", words.getWord(), String.valueOf(words.getCount()))));
        }
    }
    
    代码中统计了10遍 The sound of silence的歌词,结果如下,为减少篇幅省略了一些词数统计:
    The results are :
    naked:10
    But:10
    speaking:10
    wells:10
    not:10
    softly:10
    Left:10
    sleeping:10
    you:40
    raindrops:10
    more:10
    was:20
    ...
    

后面章节讲解各个算子的使用,本节只感受下spark的运行流程。

  • web UI
    • 当提交spark任务后就可以在UI界面看到spark执行情况,这使得spark调优和debug非常的方便,UI主界面如下,
      • image.png

        UI界面有很多spark的基本概念,需要弄懂spark中各个术语的含义才能真正利用UI界面解决实际问题。在后面的章节中UI会帮助我们更好的理解spark。

  • spark基本概念
    • 上面UI界面包含很多的标签页和术语,我们先了解下spark的基本概念吧。
    • Application:用户提交的spark应用程序;
    • Driver:执行main函数,进行创建管理sparkContext,进行资源申请,任务分配等;
    • Executor:运行在worker节点上的一个进程,该进程负责运行某些task,负责数据存到内存或者磁盘上。并行运行task的数据取决于分配的cpu数量;
    • Worker:集群中可以运行application代码的节点。spark on yarn模式中指的node manager;
    • Task:在executor进程中执行任务的工作单元,多个task组成一个stage;
    • Job:由行动操作划分的一组并行计算;
    • Stage:由shuffle算子划分的一组并行计算;
    • DAGScheduler:根据job构建的基于stage的DAG(有向无环图);
    • TaskScheduler:由taskset提交给woker,决定每个executor运行哪些task;
  • 集群上部署spark程序
    • image.png
    • spark application 在集群上做为独立的进程运行,在主程序中通过SparkContext进行工作协调;
    • SparkContext可以连接不同种类的Cluster Manager,Cluster Manger负责资源申请;
    • SparkContext申请到集群中的资源后(Executor),会在此Executor上进行数据的计算和存储;
    • SparkContext发送应用程序代码到Executor上;
    • SparkContext发送任务到Executor上执行;
    • 关于spark运行的一些知识:
      • 每个应用都有独立的Executor进程,它存在于整个应用程序,并且使用多线程计算执行任务计算;
      • spark不关心Cluster Manager的运行流程;
      • driver端在整个应用程序中一直监听Executor运行;
      • driver端需要和各个Executor通信,所以需要靠近worker nodes;

本节就到这里吧,下节会进行RDD相关知识的学习。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,240评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,328评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,182评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,121评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,135评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,093评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,013评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,854评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,295评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,513评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,678评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,398评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,989评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,636评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,801评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,657评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,558评论 2 352

推荐阅读更多精彩内容