飞谷云 MySQL 第3讲
MySQL+表连接(下)
一、MySQL+表连接(下)
1、内连接(inner join):结果仅满足条件的行
select * from T1,T2 where T1.stud_code=T2.stud_code;
select * from T1 inner join T2 ON T1.stud_code=T2.stud_code;#效率比上句高
2、左连接(left outer join):
显示左表T1中的所有行,并把右表T2中符合条件加到左表T1中;
右表T2中不符合条件,就不用加入结果表中,并且NULL表示。
select * from T1 left outer join T2 ON T1.stud_code=T2.stud_code;
3、右连接(right outer join)
显示右表T2中的所有行,并把左表T1中符合条件加到右表T2中;
左表T1中不符合条件,就不用加入结果表中,并且NULL表示。
4、全连接(full outer join)
显示左表T1、右表T2两边中的所有行,即把左联结果表 + 右联结果表组合在一起,然后过滤掉重复的。
/*内连接*/
select * from stud_info T1 ,stud_score T2 WHERE T1.stud_code=T2.stud_code;
select * from stud_info T1 inner join stud_score T2 ON T1.stud_code=T2.stud_code;
/*左连接*/
select * from stud_info T1 LEFT OUTER join stud_score T2 ON T1.stud_code=T2.stud_code;
/*右连接*/
select * from stud_info T1 RIGHT OUTER join stud_score T2 ON T1.stud_code=T2.stud_code;
二、pandas 中的 DataFrame的表连接
- 导入必要的包或模块
import numpy as np
import pandas as pd
from pandas import DataFrame
import MySQLdb
两个DataFrame数据集如何进行关系?
df1=DataFrame({'key':['a','a','b','c','c'],'data1':range(5)})
df2=DataFrame({'key':['a','b','d'],'data1':range(3)})
- 内连接
df3=pd.merge(df1,df2,on='key')
df4=pd.merge(df1,df2,on='key',how='inner')
df5=pd.merge(df1,df2,left_on='key',right_on='key',how='inner')
#左连接
df_l=pd.merge(df1,df2,on='key',how='left')
#右连接
df_r=pd.merge(df1,df2,on='key',how='right')
#全连接
df_a=pd.merge(df1,df2,on='key',how='outer')
- 两个表结构相同字段,自动添加后缀,_x,_y,我们也可以自定义,后缀,通过
suffixes=(‘_left’,‘_right’)
df6=pd.merge(df1,df2,on='key',how='inner',suffixes=('_left','_right'))
#选择数据
df7=df6[['key','data1_right']]
#修改列名(data1_right改为data)
df8=df7.rename(columns={'data1_right':'data'})
#保存结果
df8.to_csv('data07.csv')
df8.to_csv('data07.csv',index=False,cols=['key','data'])##忽略行索引
- 把MySQL数据库表导入pandas
conn= MySQLdb.connect(host='localhost',port=3306,user='feigu_mysql', passwd='feigu2016', db='testdb',charset='utf8')
data2 = pd.read_sql('select * from stud_info', conn)
- 如果文件较大,也可可以逐块读,如:chunksize=1000(行)
`inputfile1 = '/home/feigu/python_test/data/stud_score.csv'`
#输入的数据文件
`inputfile2 = '/home/feigu/python_test/data/stud_info.csv'`
#输入的数据文件
data1 = pd.read_csv(inputfile1) #读取数据
data2 = pd.read_csv(inputfile2) #读取数据
df1=DataFrame(data1) #转换为DataFrame格式
df2=DataFrame(data2) #转换为DataFrame格式
#如果要跳过一些行,可以加上参数,skiprow[0,2]
pd.read_csv(inputfile2,skiprow[0,2])跳过第1行,第3行
df.count() #查看非NA值得数量
df.columns #查看列名
df.<tab> #查看df可以使用的方法
df.head(2) #查看前2行
df.tail(2) #查看倒数2行
三、把数据导入Spark-SQL中的DataFrame结构中
#把数据导入spark-rdd
#进入交互式编程环境
spark-shell --master local[2]
==========使用scala语言==========
// 首先用已有的Spark Context对象创建SQLContext对象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// 导入语句,可以隐式地将RDD转化成DataFrame
import sqlContext.implicits._
// 创建一个表示stud的自定义类
case class stud_score(stud_code:String, sub_code:String, sub_name: String, sub_tech: String, sub_score: Int)
//用文件创建一个RDD
val rdd_st = sc.textFile("file:///home/hadoop/data/stud_score.csv")
//转换为含case的rdd
val rdd_case=rdd_st.map(_.split(",")).map(p => stud_score(p(0), p(1), p(2), p(3), p(4).trim.toInt))
//把RDD转换为DataFrame
val dfstud=rdd_case.toDF()
// 将DataFrame注册为一个表
dfstud.registerTempTable("stud_score")
// 用sqlContext对象提供的sql方法执行SQL语句。
val result80 = sqlContext.sql("SELECT * FROM stud_score where sub_score>=80")
//显示结果
result80.show()
result80.take(6)
//也可以保存
result80.rdd.saveAsTextFile("/home/hadoop/data")