添加依赖
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.baozi</groupId>
<artifactId>spark-learning</artifactId>
<version>1.0</version>
<inceptionYear>2008</inceptionYear>
<licenses>
<license>
<name>My License</name>
<url>http://....</url>
<distribution>repo</distribution>
</license>
</licenses>
<properties>
<scala.version>2.11.8</scala.version>
<spark.version>2.1.0</spark.version>
</properties>
<dependencies>
<!-- Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- Spark SQL -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
object SQLContextApp {
def main(args: Array[String]): Unit = {
// 1) 创建SQLContext
val conf = new SparkConf()
// 用IDEA才需要,spark-shell、spark-submit不需要
conf.setAppName("SQLContextApp").setMaster("local[2]")
val ctx = new SparkContext(conf)
val sql = new SQLContext(ctx) // Spark1.x中入口点,已被标记过时
// 2) 相关的处理
val people = sql.read.format("json").load(args(0)) // 使用命令行参数传入
people.printSchema()
people.show()
/*
处理:/Users/baozi/dev/tools/spark/examples/src/main/resources/people.json
$ cat people.json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
*/
// 3) 关闭资源
ctx.stop()
}
}
HiveContext
Hive准备
$ vim emp-data
1 baozi1
2 baozi2
3 baozi3
启动hive
$ start-dfs.sh
$ start-yarn.sh
$ systemctl start mysqld
$ hive
创建表
create table emp(id int,name string) row format delimited fields terminated by '\t';
插入数据
load data local inpath '/home/user000/data/emp-data' into table emp;
查询
select * from emp;
添加依赖
<!-- Spark Hive -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
示例
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
object HiveContextApp {
def main(args: Array[String]): Unit = {
// HiveContext与SqlContext一样被标记过时,改用SparkSession
val conf = new SparkConf()
val sc = new SparkContext(conf)
val hive = new HiveContext(sc)
// 处理数据
hive.table("emp").show()
//关闭资源
sc.stop()
}
}
上传测试
$ mvn clean package -DskipTests
$ scp ./target/spark-learning-1.0-SNAPSHOT.jar user000@host000:/home/user000/jars
spark-submit用法:
./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]
因为我hive用mysql存储元数据,所以spark/jars需要加入mysql驱动包,或者启动时加入参数--jars /home/user000/doc/mysql-connector-java-5.1.45.jar
$ spark-submit --master local[2] \
--class HiveContextApp \
~/jars/spark-learning-1.0-SNAPSHOT.jar
SparkSession
// SQLContext与HiveContext -> SparkSession
val spark = SparkSession.builder()
// 本地运行才用
// .appName("SparkSessionApp")
// .master("local[2]")
// .config("spark.driver.host", "localhost")
.getOrCreate()