spark筑基篇-01-Eclipse开发Spark HelloWorld

[TOC]

1 前言

Spark这么火,越来越多的小伙伴开始搞大数据。
通过多方查阅资料,这个单机版的Spark的HelloWorld终于跑出来了。
此HelloWorld非彼HelloWorld,并不是打印出HelloWorld那么简单,而是一个单词统计程序,就是统计出一个文件中单词出现的次数并排序。

会通过原生的scala的方式,传统的java方式和java8的方式分别实现同一功能。

其实单机版和运行于集群之上的Spark程序,差别就在于运行环境,开发流程是一样的。
以后的文章会记录如何建立集群。

另外,该系列文章会在本人闲暇时同时在 CSDN简书 更新。

欢迎各位道友纠错。

2 环境搭建

本人所使用环境如下:

C:\Users\hylexus>java -version
java version "1.8.0_91"
Java(TM) SE Runtime Environment (build 1.8.0_91-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.91-b14, mixed mode)

C:\Users\hylexus>scala -version
Scala code runner version 2.11.8 -- Copyright 2002-2016, LAMP/EPFL

Eclipse for scala:

Scala IDE build of Eclipse SDK
Build id: 4.4.1-vfinal-2016-05-04T11:16:00Z-Typesafe

此处scala版和java版都将使用maven来管理依赖,如何使用maven创建scala工程,请看本人另一文章 http://blog.csdn.net/hylexus/article/details/52602774

注意:使用的spark-core_2.11依赖的jar文件多的吓人,耐心等待下载jar吧…………_

2.1 scala版

pom.xml部分内容如下:

<properties>
    <maven.compiler.source>1.6</maven.compiler.source>
    <maven.compiler.target>1.6</maven.compiler.target>
    <encoding>UTF-8</encoding>
    <scala.version>2.11.5</scala.version>
    <scala.compat.version>2.11</scala.compat.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.0.0</version>
    </dependency>


    <!-- Test -->
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.11</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.specs2</groupId>
        <artifactId>specs2-junit_2.11</artifactId>
        <version>2.4.16</version>
    </dependency>
</dependencies>

<build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>
        <plugin>
            <!-- see http://davidb.github.com/scala-maven-plugin -->
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.0</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                    <configuration>
                        <args>
                            <!-- <arg>-make:transitive</arg> -->
                            <arg>-dependencyfile</arg>
                            <arg>${project.build.directory}/.scala_dependencies</arg>
                        </args>
                    </configuration>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.5.1</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
                <compilerArguments>
                    <!-- <extdirs>src/main/webapp/plugins/ueditor/jsp/lib</extdirs> -->
                </compilerArguments>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-surefire-plugin</artifactId>
            <version>2.18.1</version>
            <configuration>
                <useFile>false</useFile>
                <disableXmlReport>true</disableXmlReport>
                <!-- If you have classpath issue like NoDefClassError,... -->
                <!-- useManifestOnlyJar>false</useManifestOnlyJar -->
                <includes>
                    <include>**/*Test.*</include>
                    <include>**/*Suite.*</include>
                </includes>
            </configuration>
        </plugin>
    </plugins>
</build>

2.2 java版

pom.xml文件内容如下:

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.0.0</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.11</version>
        <scope>test</scope>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.5.1</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
                <compilerArguments>
                    <!-- <extdirs>src/main/webapp/plugins/ueditor/jsp/lib</extdirs> -->
                </compilerArguments>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-surefire-plugin</artifactId>
            <version>2.18.1</version>
            <configuration>
                <useFile>false</useFile>
                <disableXmlReport>true</disableXmlReport>
                <!-- If you have classpath issue like NoDefClassError,... -->
                <!-- useManifestOnlyJar>false</useManifestOnlyJar -->
                <includes>
                    <include>**/*Test.*</include>
                    <include>**/*Suite.*</include>
                </includes>
            </configuration>
        </plugin>
    </plugins>
</build>

3 代码

3.1 scala-低调版

object WordCount {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setAppName("WordCount") //
            .setMaster("local")

        val sc = new SparkContext(conf)

        val fileName = "E:/workspace/eclipse/scala/spark-base/src/main/scala/log4j.properties"
        //获取文件内容
        val lines = sc.textFile(fileName, 1)
        //分割单词,此处仅以空格分割作为示例
        val words = lines.flatMap(line => line.split(" "))
        //String===>(word,count),count==1
        val pairs = words.map(word => (word, 1))
        //(word,1)==>(word,count)
        val result = pairs.reduceByKey((word, acc) => word + acc)
        //sort by count DESC
        val sorted=result.sortBy(e => { e._2 }, false, 1)

        val mapped=sorted.map(e => (e._2, e._1))

        mapped.foreach(e => { println("【" + e._1 + "】出现了" + e._2 + "次") })

        sc.stop()
    }
}

3.2 scala-高调版

object WordCount {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf
        conf.setAppName("rank test").setMaster("local")

        val sc = new SparkContext(conf)

        val fileName = "E:/workspace/eclipse/scala/spark-base/src/main/scala/log4j.properties"

        sc.textFile(fileName, 1) //lines
            .flatMap(_.split(" ")) //all words
            .map(word => (word, 1)) //to pair
            .reduceByKey(_ + _) //count
            .map(e => (e._2, e._1)) //
            .sortByKey(false, 1) //
            .map(e => (e._2, e._1)) //
            .foreach(e => { println("【" + e._1 + "】出现了" + e._2 + "次") })

        sc.stop()

    }
}

3.3 java-传统版

代码恶心的没法看啊……
到处都是匿名内部类……
还好有java8的lambda来拯救你

import java.util.Arrays;
import java.util.Iterator;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

public class WordCount {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf();

        conf.setAppName("WordCounter")//
                .setMaster("local");

        String fileName = "E:/workspace/eclipse/scala/spark-base/src/main/scala/log4j.properties";

        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> lines = sc.textFile(fileName, 1);

        JavaRDD<String> words = lines
                .flatMap(new FlatMapFunction<String, String>() {
                    private static final long serialVersionUID = 1L;

                    // 以前的版本好像是Iterable而不是Iterator
                    @Override
                    public Iterator<String> call(String line) throws Exception {
                        return Arrays.asList(line.split(" ")).iterator();
                    }
                });

        JavaPairRDD<String, Integer> pairs = words
                .mapToPair(new PairFunction<String, String, Integer>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, Integer> call(String word)
                            throws Exception {
                        return new Tuple2<String, Integer>(word, 1);
                    }
                });

        JavaPairRDD<String, Integer> result = pairs.reduceByKey(
                new Function2<Integer, Integer, Integer>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Integer call(Integer e, Integer acc)
                            throws Exception {
                        return e + acc;
                    }
                }, 1);

        result.map(
                new Function<Tuple2<String, Integer>, Tuple2<String, Integer>>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, Integer> call(
                            Tuple2<String, Integer> v1) throws Exception {
                        return new Tuple2<>(v1._1, v1._2);
                    }
                })//
                .sortBy(new Function<Tuple2<String, Integer>, Integer>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Integer call(Tuple2<String, Integer> v1)
                            throws Exception {
                        return v1._2;
                    }
                }, false, 1)//
                .foreach(new VoidFunction<Tuple2<String, Integer>>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public void call(Tuple2<String, Integer> e)
                            throws Exception {
                        System.out.println("【" + e._1 + "】出现了" + e._2 + "次");
                    }
                });
        sc.close();

    }
}

3.4 java-lambda版

用上java8的lambda之后,还是挺清爽的嘛_

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import scala.Tuple2;

public class WordCountByJava8 {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf();

        conf.setAppName("WordCounter")//
                .setMaster("local");

        String fileName = "src/main/java/hylexus/spark/test1/WordCountByJava8.java";

        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> lines = sc.textFile(fileName, 1);

        lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator())
                .mapToPair(word -> new Tuple2<>(word, 1))
                .reduceByKey((e, acc) -> e + acc, 1)
                .map(e -> new Tuple2<>(e._1, e._2))
                .sortBy(e -> e._2, false, 1)
                .foreach(e -> {
                    System.out.println("【" + e._1 + "】出现了" + e._2 + "次");
                });
        sc.close();

    }
}

4 运行效果

//...............
//...............
【->】出现了6次
【+】出现了5次
【import】出现了5次
【new】出现了4次
【=】出现了4次
//...............
//...............
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,133评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,682评论 3 390
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,784评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,508评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,603评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,607评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,604评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,359评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,805评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,121评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,280评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,959评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,588评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,206评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,442评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,193评论 2 367
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,144评论 2 352

推荐阅读更多精彩内容