##基于anaconda安装python3.7
#参考网址:https://blog.csdn.net/weixin_43215250/article/details/89186733?utm_medium=distribute.pc_aggpage_search_result.none-task-blog-2~aggregatepage~first_rank_ecpm_v1~rank_v31_ecpm-1-89186733.pc_agg_new_rank&utm_term=cdh运行python&spm=1000.2123.3001.4430
sudo yum -y install bzip2
wget https://repo.continuum.io/archive/Anaconda3-5.3.0-Linux-x86_64.sh
bash Anaconda3-5.3.0-Linux-x86_64.sh
/opt/cloudera/parcels/CDH/lib/anaconda3
#配置环境变量
#vim /etc/profile
#export ANACONDA3_HOME=/opt/cloudera/parcels/CDH/lib/anaconda3/
#export PATH=$PATH:$ANACONDA3_HOME/bin
#source /etc/profile
#env |grep PATH
#在cdh配置Spark的Python环境(spark-env.sh)
export PYSPARK_PYTHON=/opt/cloudera/parcels/CDH/lib/anaconda3/bin/python
export PYSPARK_DRIVER_PYTHON=/opt/cloudera/parcels/CDH/lib/anaconda3/bin/python
#pyspark启动命令
spark2-submit \
--master yarn \
--executor-memory 16g \
--num-executors 14 \
--executor-cores 14 \
--driver-memory 8G \
--conf spark.executor.extraJavaOptions=-Xss4096m \
--conf spark.rpc.message.maxSize=768 \
--conf spark.python.worker.memory=4g \
python_test.py
#创建SparkSession
from pyspark import SparkConf
from pyspark.sql import SparkSession
conf = SparkConf().setAppName('test').setMaster('yarn')
spark = SparkSession.builder.config(conf=conf).getOrCreate()
#读取hdfs数据,返回Dataframe(分区表必须添加分区条件)
df = spark.sql("select * from table where date='2022-01-01'")
#spark Dataframe操作
#创建DataFrame
data = [{
'key': 'value',
'key': 'value',
'key': 'value',
'key': 'value'
}]
df = spark.createDataFrame(data)
#Dataframe转Pandas
df.toPandas()
#新增列(新增a1列填充数据0)
from pyspark.sql.functions import lit
df.withColumn('a1', lit(0))
#更改列名
spark.createDataFrame(df).withColumnRenamed('原列名', '新列名')
#写入hive表
spark.createDataFrame(df).write.format("hive").mode("overwrite").saveAsTable('库名.表名')
#写入hdfs文件
from hdfs.client import Client
client = Client("http://realtime001:50070/")
hdfs_path = "/label/user_lost_predict/dt="+dt+""
--判断文件是否存在,存在删除
if client.status(hdfs_path, strict=False):
client.delete(hdfs_path, recursive=True)
spark.createDataFrame(out).rdd.map(lambda row: str(row[0]) + "\t" + str(row[1])).repartition(1).saveAsTextFile(hdfs_path)
#写入mysql
mysql_prop = {'user': 'xxx',
'password': 'xxx'}
mysql_url = 'jdbc:mysql://host:port/database'
spark.createDataFrame(df).write.jdbc(url=mysql_url, table='表名', mode='append', properties=mysql_prop)
#pandas Dataframe操作
.head() --默认查看五条数据
.info --查看所有数据
.info() --查看数据结构
--head 查看详细数据
pd.options.display.max_columns = None
pd.options.display.max_rows = None
#Dataframe转pyspark
spark.createDataFrame(df)
#新增列(新增a1列填充数据0)
df['a1'] = 0
#更改列明
df.rename(columns={'旧列名': '新列名'})
#列名截取
df_index = df.columns[2:-1]
#选取所有的行以及columns为a,b的列;
user_label = df.loc[:, ['a', 'b']]
#合并两个dataframe
import pandas as pd
df = pd.concat([df1, df2])
#条件筛选
new_df = df[df.target == 1]
new_df = df[(df.target == 1) & (df.app_name == 'mfzs')]
#mysql
import pymysql
# 公用文件存放地址
import sys
sys.path.append('/opt/zwww_project/shell_script/commons/')
# 公用变量
from dictionary_variable import *
mysqldb = pymysql.connect(host, port, user, password, charset)
# # 获取游标 承载结果
mycursor = mysqldb.cursor()
#创建SQL查询语句并逆序排序
sql = "delete from hadoop_data.cp_book_test where book_id='121444'"
#执行sql语句
mycursor.execute(sql)
#提交语句
mydb.commit()
#pandas函数
--取样本
new_df = df[df.target == 1].sample(n=69)
--查看重复值
new_df = len(df[df['列名'].duplicated()])
--填充空值
new_df = df.fillna(-999999)
--查看异常值
df_describe = df.describe([0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.99]).T
--过滤异常
new_df = df[df.字段名 <= df_describe['99%'][14]]
#调用另一个python文件(公共变量文件)
import sys
# 公用文件存放地址
sys.path.append('/opt/zwww_project/shell_script/commons/')
# 公用变量
from dictionary_variable import *
#获取今天(现在时间)
import datetime
today = datetime.date.today()
#日期格式化
today.strftime('%Y-%m-%d')
today.strftime('%Y-%m-%d %H:%M:%S')
#参数解析器
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--dt', type=str, default=datetime.date.today().strftime('%Y-%m-%d'))
args = parser.parse_args()
dt = args.dt
#安装pyhive
yum install gcc-c++ python-devel.x86_64 cyrus-sasl-devel.x86_64
pip install sasl
pip install thrift
pip install thrift-sasl
pip install PyHive
#使用pyhive连接其他集群将数据写入hive
from pyhive import hive
conn = hive.Connection(host='xxxx', port=19000)
cursor = conn.cursor()
cursor.execute("select * from 库名.表名 where 条件=''")
df = spark.createDataFrame(cursor,schema=['字段1', '字段2', '字段3', '字段4'])
conn.close()
df.write.format("hive").mode("overwrite").saveAsTable('库名.表名')
pyspark代码片段
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
推荐阅读更多精彩内容
- 1.copy:@property(nonatomic, copy)NSString*<#string#>;2.st...
- 代码片段 Xcode的代码片段(Code Snippets)创建自定义的代码片段,当你重用这些代码片段时,会给你带...
- spark 1.6 的数据抽取代码 插入数据 采用 dataframe下面是python版的 主要代码在 main...
- 摘要 自定义对象的布尔值真假,可以让我们的代码更pythonic; 善用 any() / all() 可以使代码更...