python读取postgresql并进行spark处理

贴出一段程序,主要是对postgresql进行读取并进行spark处理


def operator(x):

print(x[1])

x[2] =='kao'

    if x[4] =='male':

x[4] =1

    elif x[4] =='female':

x[4] =2

    else:

x[4] =0

    print(x[4])

return x

def tuple_convert_to_list(x):

list1 =list(x)

return list1

if __name__ =='__main__':

conn = psycopg2.connect(host='192.168.0.1',

port=5000,

user='test',

password='test20180910',

database='test')

cursor = conn.cursor()

try:

sql ="select *from unicom_2i_hold_schema.tb_user limit 100000"

        print("sql:"+sql)

cursor.execute(sql)

rows = cursor.fetchall()

print(cursor.rowcount)

print(rows)

print(type(rows))

sc = SparkContext(appName="test")

rdd = sc.parallelize(rows)

print(type(rdd))

rdd2 = rdd.map(tuple_convert_to_list)

rdd3 = rdd2.map(operator)

print(type(rdd2))

print(rdd2.take(100))

print(rdd3.take(1000))

finally:

cursor.close;

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容