【参考 以慕课网日志分析为例 进入大数据Sparksql】
0 导读
SQL:MySQL,Oracle,DB2,SQLServer
在大数据平台上实现大数据计算:Hive / Spark SQL / Spark Core
直接使用SQL语句进行大数据分析
hive的问题:底层MR,2.x之后可以用spark
应用场景
SQL on Hadoop:Hive,Shark(不维护了),Impala(Cloudera,内存使用较多),Presto(京东使用较多),Drill
Hive:on MapReduce,翻译为MR语句,在Hadoop Cluster运行。由于shuffle要落地,速度较慢,进程级别的。
Shark: on Spark,基于Hive源码改造。
Spark SQL:on Spark,是由自己的执行计划解析的。
共同点:matestore:表名,字段名,顺序是什么,数据类型是什么,存储位置 存在mysql上
补充: Hive on Spark:底层是hive的基础上添加引擎叫spark
1 Spark SQL概述
park SQL is a Spark module for structured data processing.
Integrated——集成
可以查询结构化的数据,用sql;或者使用DF API。
支持Java, Scala, Python and R.
uniform Data Access——链接到数据源
Connect to any data source the same way.
DataFrames and SQL provide a common way to access a variety of data sources, including Hive, Avro, Parquet, ORC, JSON, and JDBC. You can even join data across these sources.
spark.read.json("s3n://...").registerTempTable("json")
results = spark.sql( """SELECT * FROM peopleJOIN json ...""")
Hive Integration
可以和hive无缝对接
Spark SQL supports the HiveQL syntax as well as Hive SerDes and UDFs, allowing you to access existing Hive warehouses.
Standard Connectivity
标准的JDBC/ODBC链接其他内容
hive不只是sql
spark sql :是一个spark模块,可以处理结构化数据,支持SQL和DF dataset API。
用于处理结构化数据的组间,结构化是数据,而非sql
2 Spark SQL架构
3 DataFrame/Dataset详解
概述
dataset 1.6 版本添加进来的
以列的方式组成的数据集
以列名,列类型, 列值
A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. The DataFrame API is available in Scala, Java, Python, and R. In Scala and Java, a DataFrame is represented by a Dataset of Rows. In the Scala API, DataFrame is simply a type alias of Dataset[Row]. While, in Java API, users need to use Dataset<Row> to represent a DataFrame.
【面试题】RDD与DataFrame的区别
编程
The entry point into all functionality in Spark is the SparkSession class. To create a basic SparkSession, just use SparkSession.builder:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
在IDE中编:在ide中运行比较慢,在pyspark中运行比较快。
from pyspark.sql import SparkSession
def basic(spark):
df=spark.read.json("文件名")
df.show()
df.printSchema() //显示表结构
df.select("name").show()
df.select("*").show()
df.select("name","age").show()
df.select(df["name"],df["age"]+1).show()
df.filter(df['age'] > 21).show()
df.filter(df.age> 21).show()
df.groupBy("age").count().show()
def basic2(spark):
df = spark.read.json("文件名")
// 把df注册成people表,创建临时视图
df.createOnReplaceTempView("people")
sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// df.createOnGlobalTempView("people") 创建全局视图,不常用
// sqlDF = spark.sql("SELECT * FROM global_temp.people")
if _name_ =='__main__':
spark = SparkSession.builder.appName("spark0801").getOrCreate()
basic(spark)
如果报错deaby已经存在 需要在hadoop的sbin目录下删除metastore_db
ps:
more +文件名 查看文件内容
DF与RDD相互转换
Spark SQL supports two different methods for converting existing RDDs into Datasets.
1 reflection
def new1(spark):
sc = spark.sparkContext
# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)
schemaPeople.printSchema()- 显示DF的元数据信息
schemaPeople.createOrReplaceTempView("people")
# SQL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
# The results of SQL queries are Dataframe objects.
# rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
for name in teenNames:
print(na
2 用schema的方式
比较灵活,但是代码较多。
Create an RDD of tuples or lists from the original RDD;
Create the schema represented by a StructType matching the structure of tuples or lists in the RDD created in the step 1.
Apply the schema to the RDD via createDataFrame method provided by SparkSession.
from pyspark.sql.types import *
def new2(Spark):
sc = spark.sparkContext
# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))
# The schema is encoded in a string.
schemaString = "name age"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)
# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)
# Creates a temporary view using the DataFrame
schemaPeople.createOrReplaceTempView("people")
# SQL can be run over DataFrames that have been registered as a table.
results = spark.sql("SELECT name FROM people")
results.s
data soure 自己学习
还有和JDBC的交互