pyspark代码片段

##基于anaconda安装python3.7
#参考网址:https://blog.csdn.net/weixin_43215250/article/details/89186733?utm_medium=distribute.pc_aggpage_search_result.none-task-blog-2~aggregatepage~first_rank_ecpm_v1~rank_v31_ecpm-1-89186733.pc_agg_new_rank&utm_term=cdh运行python&spm=1000.2123.3001.4430
sudo yum -y install bzip2


wget https://repo.continuum.io/archive/Anaconda3-5.3.0-Linux-x86_64.sh

bash Anaconda3-5.3.0-Linux-x86_64.sh
/opt/cloudera/parcels/CDH/lib/anaconda3
#配置环境变量
#vim /etc/profile
#export ANACONDA3_HOME=/opt/cloudera/parcels/CDH/lib/anaconda3/
#export PATH=$PATH:$ANACONDA3_HOME/bin
#source /etc/profile
#env |grep PATH

#在cdh配置Spark的Python环境(spark-env.sh)
export PYSPARK_PYTHON=/opt/cloudera/parcels/CDH/lib/anaconda3/bin/python
export PYSPARK_DRIVER_PYTHON=/opt/cloudera/parcels/CDH/lib/anaconda3/bin/python


#pyspark启动命令
spark2-submit \
--master yarn \
--executor-memory 16g \
--num-executors 14 \
--executor-cores 14 \
--driver-memory 8G \
--conf spark.executor.extraJavaOptions=-Xss4096m \
--conf spark.rpc.message.maxSize=768 \
--conf spark.python.worker.memory=4g \
python_test.py


#创建SparkSession
from pyspark import SparkConf
from pyspark.sql import SparkSession
conf = SparkConf().setAppName('test').setMaster('yarn')
spark = SparkSession.builder.config(conf=conf).getOrCreate()


#读取hdfs数据,返回Dataframe(分区表必须添加分区条件)
df = spark.sql("select * from table where date='2022-01-01'")


#spark Dataframe操作
#创建DataFrame
data = [{
        'key': 'value',
        'key': 'value',
        'key': 'value',
        'key': 'value'
}]
df = spark.createDataFrame(data)

#Dataframe转Pandas
df.toPandas()

#新增列(新增a1列填充数据0)
from pyspark.sql.functions import lit
df.withColumn('a1', lit(0))

#更改列名
spark.createDataFrame(df).withColumnRenamed('原列名', '新列名')

#写入hive表
spark.createDataFrame(df).write.format("hive").mode("overwrite").saveAsTable('库名.表名')

#写入hdfs文件
from hdfs.client import Client
client = Client("http://realtime001:50070/")
hdfs_path = "/label/user_lost_predict/dt="+dt+""
--判断文件是否存在,存在删除
if client.status(hdfs_path, strict=False):
    client.delete(hdfs_path, recursive=True)
spark.createDataFrame(out).rdd.map(lambda row: str(row[0]) + "\t" + str(row[1])).repartition(1).saveAsTextFile(hdfs_path)

#写入mysql
mysql_prop = {'user': 'xxx',
              'password': 'xxx'}
mysql_url = 'jdbc:mysql://host:port/database'
spark.createDataFrame(df).write.jdbc(url=mysql_url, table='表名', mode='append', properties=mysql_prop)






#pandas Dataframe操作
.head() --默认查看五条数据
.info --查看所有数据
.info() --查看数据结构

--head 查看详细数据
pd.options.display.max_columns = None
pd.options.display.max_rows = None

#Dataframe转pyspark
spark.createDataFrame(df)

#新增列(新增a1列填充数据0)
df['a1'] = 0

#更改列明
df.rename(columns={'旧列名': '新列名'})

#列名截取
df_index = df.columns[2:-1]

#选取所有的行以及columns为a,b的列;
user_label = df.loc[:, ['a', 'b']]

#合并两个dataframe
import pandas as pd
df = pd.concat([df1, df2])

#条件筛选
new_df = df[df.target == 1]
new_df = df[(df.target == 1) & (df.app_name == 'mfzs')]



#mysql
import pymysql
# 公用文件存放地址
import sys
sys.path.append('/opt/zwww_project/shell_script/commons/')
# 公用变量
from dictionary_variable import *
mysqldb = pymysql.connect(host, port, user,  password,  charset)
# # 获取游标 承载结果
mycursor = mysqldb.cursor()
#创建SQL查询语句并逆序排序
sql = "delete from hadoop_data.cp_book_test where book_id='121444'"
#执行sql语句
mycursor.execute(sql)
#提交语句
mydb.commit()






#pandas函数
--取样本
new_df = df[df.target == 1].sample(n=69)
--查看重复值 
new_df = len(df[df['列名'].duplicated()])
--填充空值
new_df = df.fillna(-999999)
--查看异常值
df_describe = df.describe([0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.99]).T
--过滤异常
new_df = df[df.字段名 <= df_describe['99%'][14]]




#调用另一个python文件(公共变量文件)
import sys
# 公用文件存放地址
sys.path.append('/opt/zwww_project/shell_script/commons/')
# 公用变量
from dictionary_variable import *


#获取今天(现在时间)
import datetime
today = datetime.date.today()
#日期格式化
today.strftime('%Y-%m-%d')
today.strftime('%Y-%m-%d %H:%M:%S')


#参数解析器
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--dt', type=str, default=datetime.date.today().strftime('%Y-%m-%d'))
args = parser.parse_args()
dt = args.dt




#安装pyhive
yum install gcc-c++ python-devel.x86_64 cyrus-sasl-devel.x86_64
pip install sasl
pip install thrift
pip install thrift-sasl
pip install PyHive


#使用pyhive连接其他集群将数据写入hive
from pyhive import hive
conn = hive.Connection(host='xxxx', port=19000)
cursor = conn.cursor()
cursor.execute("select * from 库名.表名 where 条件=''")
df = spark.createDataFrame(cursor,schema=['字段1', '字段2', '字段3', '字段4'])
conn.close()
df.write.format("hive").mode("overwrite").saveAsTable('库名.表名')


©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,539评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,911评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,337评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,723评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,795评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,762评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,742评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,508评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,954评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,247评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,404评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,104评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,736评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,352评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,557评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,371评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,292评论 2 352

推荐阅读更多精彩内容