Apache Spark 是一个围绕速度、易用性和复杂分析构建的大数据处理框架,提供了一个全面、统一的框架用于管理各种有着不同性质(文本数据、图表数据等)的数据集和数据源(批量数据或实时的流数据)的大数据处理的需求。 这里简单展示如何用Apache Spark 把数据存储到Postgres数据库。
首先开始spark 服务
pyspark --driver-class-path /opt/spark/jars/postgresql-42.2.12.jar --jars /opt/spark/jars/postgresql-42.2.12.jar
然后删除之前产生的表
import psycopg2 as p2
conn = p2.connect("host=localhost dbname =test user=detian password='p31415926'")
cur = conn.cursor()
cur.execute(""" drop table test.germanydata""")
conn.commit()
然后抽取网络数据,并且存储在Dataframe 里面
import requests
import json
from pyspark.sql import Row
from collections import OrderedDict
from pyspark import SparkContext
from collections import OrderedDict
#Assign URL
URL = "https://api.covid19api.com/country/germany/status/confirmed/live?from=2020-04-01T00:00:00Z&to=2020-05-01T00:00:00Z"
r = requests.get(url =URL)
data = r.json()
#define a function to parse json file to row
def convert_to_row(d: dict) -> Row:
return Row(**OrderedDict(sorted(d.items())))
#convert the data to a dataframe
df=sc.parallelize(data).map(convert_to_row).toDF()
#use only some of the columns
jdbcDF=df.select("Cases", "Country", "Date","Status")
然后通过jdbc driver 连接postgres 并将dataframe 里面的数据写入数据库。
jdbcDF.write \
.format("jdbc") \
.option("url", "jdbc:postgresql://localhost:5432/test") \
.option("dbtable", "test.germanydata") \
.option("user", "detian") \
.option("password", "p31415926") \
.save()
检查数据库的germanydata 表