openlake:搭建云原生数据湖

大数据和云原生一直都是基建方面的两个热点,而现在越来越多的大数据基建逐渐往云原生方向发展,例如云原生的消息队列Pulsar,又例如Snowflake提供云原生的数仓。因此笔者想要探索大数据和云原生的结合,于是发现了一个非常有意思的项目Openlake,该项目挂在minio下,是在kubernetes环境下,利用minion,spark,kafka,dremio,iceberg搭建一套数据湖,非常适合学习,本文主要就是记录搭建过程和心得。

0.准备kubernetes环境

如果已经有集群可以跳过本节,我这边想快速做实验所以采用docker-compose的方式在linux上搭建k3s

安装docker compose,参考guide,执行如下命令:

sudo curl -L "https://github.com/docker/compose/releases/download/1.25.0/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
sudo chmod +x /usr/local/bin/docker-compose

本地创建一个目录用于保存k3s的配置和数据:

mkdir -p /home/xiowang/k3s
mkdir -p /home/xiowang/k3s/data

创建docker-compose.yaml用于k3s启动:

#vim /home/xiowang/k3s/docker-compose.yaml
version: '3'
services:
  server:
    image: rancher/k3s:v1.20.2-k3s1
    container_name: k3s_server
    hostname: xiowang.dev
    command: server --tls-san=xiowang.dev
    tmpfs:
    - /run
    - /var/run
    privileged: true
    restart: always
    ports:
    - 6443:6443
    - 443:30443
    - 80:30080
    - 50050:30050
    - 50051:30051
    environment:
    - K3S_TOKEN=16963276443662
    - K3S_KUBECONFIG_OUTPUT=/root/.kube/config
    - K3S_KUBECONFIG_MODE=600
    volumes:
    - /var/lib/rancher/k3s:/var/lib/rancher/k3s
    - /etc/rancher:/etc/rancher
    - /home/xiowang/k3s/.kube:/root/.kube
    - /home/xiowang/k3s/data:/data:shared,rw

上面的配置,我们主要做了:

  1. 通过本机的6443用于访问kubernetes的apiserver,方便kubectl进行管理;
  2. 通过本机的443和80分别映射集群的nodeport:30443和30080;
  3. 把kubeconfig保存到/home/xiowang/k3s/.kube;
  4. 挂载集群的/data目录到/home/xiowang/k3s/data;
  5. 本地的32000和32001用于后续暴露minio的端口;

开始启动k3s:

cd /home/xiowang/k3s
docker-compose up -d

因为本机80和443对应集群的nodeport:30443和30080,所以这里改一下trafik的service,将其80和443的nodeport分别指向30080和30443:

#kubectl -n kube-system edit svc traefik
apiVersion: v1
kind: Service
metadata:
  annotations:
    meta.helm.sh/release-name: traefik
    meta.helm.sh/release-namespace: kube-system
  creationTimestamp: "2023-05-17T09:29:42Z"
  labels:
    app: traefik
    app.kubernetes.io/managed-by: Helm
    chart: traefik-1.81.0
    heritage: Helm
    release: traefik
  name: traefik
  namespace: kube-system
  resourceVersion: "368985"
  uid: 7bbb1758-ca01-4e84-b166-dae950613adf
spec:
  clusterIP: 10.43.115.234
  clusterIPs:
  - 10.43.115.234
  externalTrafficPolicy: Cluster
  ports:
  - name: http
    nodePort: 30080
    port: 80
    protocol: TCP
    targetPort: http
  - name: https
    nodePort: 30443
    port: 443
    protocol: TCP
    targetPort: https
  selector:
    app: traefik
    release: traefik
  sessionAffinity: None
  type: LoadBalancer

拷贝/home/xiowang/k3s/.kube/config到本机的~/.kube/config(建议使用kubecm来管理)

cp /home/xiowang/k3s/.kube/config ~/.kube/config

接下来我们就可以开始openlake之旅了

1.安装和配置minio

在kubernetes上安装minio,有两种推荐方式:

  1. operator的方式安装minio,参考guide
  2. helm安装minio,参考guide

这里暂时没时间学习operator的CRD,所以采用helm安装minio,步骤如下

helm repo add minio https://charts.min.io/
#设置密码和磁盘大小,默认也没有tls(根据需求改一下,我就20Gi用于测试)
helm upgrade --install --namespace minio --set rootUser=rootuser,rootPassword=rootpass123,persistence.size=20Gi,resources.requests.memory=100Mi,resources.limits.memory=2Gi,replicas=3,service.type=NodePort,consoleService.type=NodePort one --create-namespace minio/minio

成功部署日志:

espace minio/minio
NAME: one
LAST DEPLOYED: Mon May 22 12:22:31 2023
NAMESPACE: minio
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
MinIO can be accessed via port 9000 on the following DNS name from within your cluster:
one-minio.minio.svc.cluster.local

To access MinIO from localhost, run the below commands:

  1. export POD_NAME=$(kubectl get pods --namespace minio -l "release=one" -o jsonpath="{.items[0].metadata.name}")

  2. kubectl port-forward $POD_NAME 9000 --namespace minio

Read more about port forwarding here: http://kubernetes.io/docs/user-guide/kubectl/kubectl_port-forward/

You can now access MinIO server on http://localhost:9000. Follow the below steps to connect to MinIO server with mc client:

  1. Download the MinIO mc client - https://min.io/docs/minio/linux/reference/minio-mc.html#quickstart

  2. export MC_HOST_one-minio-local=http://$(kubectl get secret --namespace minio one-minio -o jsonpath="{.data.rootUser}" | base64 --decode):$(kubectl get secret --namespace minio one-minio -o jsonpath="{.data.rootPassword}" | base64 --decode)@localhost:9000

  3. mc ls one-minio-local

这里注意上面日志中的MC_HOST_one-minio-local环境变量名似乎是非法的,所以我换成了MC_HOST_one_minio_local:

  2. export MC_HOST_one_minio_local=http://$(kubectl get secret --namespace minio one-minio -o jsonpath="{.data.rootUser}" | base64 --decode):$(kubectl get secret --namespace minio one-minio -o jsonpath="{.data.rootPassword}" | base64 --decode)@localhost:9000

  3. mc ls one_minio_local

若想访问minio的console控制台,则forward 9001端口,再用root账号登陆localhost:9001:

export POD_NAME=$(kubectl get pods --namespace minio -l "release=one" -o jsonpath="{.items[0].metadata.name}")

kubectl port-forward $POD_NAME 9001 --namespace minio

而访问bucket则是9000端口

export POD_NAME=$(kubectl get pods --namespace minio -l "release=one" -o jsonpath="{.items[0].metadata.name}")
kubectl port-forward $POD_NAME 9000 --namespace minio

2.搭建spark on k8s

spark on k8s是google发起的一个开源项目(不是google的官方产品),使用operator对k8s的资源进行调度,方便spark对接k8s。

spark on k8s采用helm安装,安装命令如下:

helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operator
helm install my-release spark-operator/spark-operator \
--namespace spark-operator \
--set webhook.enable=true \
--set image.repository=openlake/spark-operator \
--set image.tag=3.3.2 \
--create-namespace

验证部署结果:

kubectl get pods -n spark-operator

应该能看到my-release的pod:

NAME                                           READY   STATUS      RESTARTS   AGE
my-release-spark-operator-6547984586-xzw4p     1/1     Running     2          4d20h

参考例子部署一个spark应用,保存为spark-pi.yaml用于计算pi(这里注意之前helm会在spark-operator的namespace中为serviceAccount: my-release-spark部署rbac,因此这里spark app都在spark-operator的namespace中,使用serviceAccount: my-release-spark运行):

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
    name: pyspark-pi
    namespace: spark-operator
spec:
    type: Python
    pythonVersion: "3"
    mode: cluster
    image: "openlake/spark-py:3.3.2"
    imagePullPolicy: Always
    mainApplicationFile: local:///opt/spark/examples/src/main/python/pi.py
    sparkVersion: "3.3.2"
    restartPolicy:
        type: OnFailure
        onFailureRetries: 3
        onFailureRetryInterval: 10
        onSubmissionFailureRetries: 5
        onSubmissionFailureRetryInterval: 20
    driver:
        cores: 1
        coreLimit: "1200m"
        memory: "512m"
        labels:
            version: 3.3.2
        serviceAccount: my-release-spark
    executor:
        cores: 1
        instances: 1
        memory: "512m"
        labels:
            version: 3.3.2
kubectl apply -f spark-pi.yaml

查看sparkapp和pods:

#kubectl -n spark-operator get sparkapp,pod
NAME                                               STATUS      ATTEMPTS   START                  FINISH                 AGE
sparkapplication.sparkoperator.k8s.io/pyspark-pi   COMPLETED   1          2023-05-22T06:43:24Z   2023-05-22T06:44:37Z   4m50s

NAME                                               READY   STATUS      RESTARTS   AGE
pod/my-release-spark-operator-webhook-init-xzx9c   0/1     Completed   0          4d20h
pod/my-release-spark-operator-6547984586-xzw4p     1/1     Running     2          4d20h
pod/pyspark-pi-driver                              0/1     Completed   0          4m46s

查看最后20行log, 可以看到DAGScheduler调度完成的日志:

#kubectl logs pyspark-pi-driver -n spark-operator --tail 10
23/05/22 06:44:35 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
23/05/22 06:44:35 INFO DAGScheduler: ResultStage 0 (reduce at /opt/spark/examples/src/main/python/pi.py:42) finished in 1.571 s
23/05/22 06:44:35 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
23/05/22 06:44:35 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished
23/05/22 06:44:35 INFO DAGScheduler: Job 0 finished: reduce at /opt/spark/examples/src/main/python/pi.py:42, took 1.615592 s
Pi is roughly 3.145160
23/05/22 06:44:35 INFO SparkUI: Stopped Spark web UI at http://pyspark-pi-60e9d1884232c31f-driver-svc.spark-operator.svc:4040
23/05/22 06:44:35 INFO KubernetesClusterSchedulerBackend: Shutting down all executors
23/05/22 06:44:35 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down
23/05/22 06:44:35 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed.
23/05/22 06:44:36 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/05/22 06:44:36 INFO MemoryStore: MemoryStore cleared
23/05/22 06:44:36 INFO BlockManager: BlockManager stopped
23/05/22 06:44:36 INFO BlockManagerMaster: BlockManagerMaster stopped
23/05/22 06:44:36 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/05/22 06:44:36 INFO SparkContext: Successfully stopped SparkContext
23/05/22 06:44:36 INFO ShutdownHookManager: Shutdown hook called
23/05/22 06:44:36 INFO ShutdownHookManager: Deleting directory /var/data/spark-4e8e1507-e941-4437-b0b5-18818fc8865f/spark-4f604216-aeab-4679-83ce-f2527613ec66
23/05/22 06:44:36 INFO ShutdownHookManager: Deleting directory /tmp/spark-00ddbc21-98da-4ad8-9905-fcf0e8d64129
23/05/22 06:44:36 INFO ShutdownHookManager: Deleting directory /var/data/spark-4e8e1507-e941-4437-b0b5-18818fc8865f/spark-4f604216-aeab-4679-83ce-f2527613ec66/pyspark-10201045-5307-40dc-b6b2-d7e5447763c4

3.使用spark分析minio上的数据

准备好dockerfile,因为和编译命令,因为后续会经常编辑py文件和推镜像.
这里直接使用openlake原文的镜像作为baseimage,因为里面提前安装spark的各种依赖

FROM openlake/sparkjob-demo:3.3.2

WORKDIR /app

COPY *.py .

编译和push dockerimage命令如下(因为dockerhub经常挂,所以我在华为云上开了一个镜像仓库,可以根据自身情况修改一下):

docker build -t xiowang/spark-minio:3.3.2 .
docker tag xiowang/spark-minio:3.3.2 swr.cn-north-4.myhuaweicloud.com/xiowang/spark-minio:3.3.2
 docker push swr.cn-north-4.myhuaweicloud.com/xiowang/spark-minio:3.3.2

下载纽约出租车司机数据(~112M rows and ~10GB in size):

wget  https://data.cityofnewyork.us/api/views/t29m-gskq/rows.csv ./

minio中创建bucket:

 mc mb one_minio_local/openlake/spark/sample-data

拷贝出租车数据到minio

mc cp rows.csv one_minio_local/openlake/spark/sample-data/

进到jupyter中安装pyspark

pip3 install pyspark

在minio中创建对one_minio_local/openlake的读写权限的用户,并申请key和secret,如下

export AWS_ACCESS_KEY_ID=3436ZpuHMvI5EEoR
export AWS_SECRET_ACCESS_KEY=6US0FDsSFdlg5DzbWPPJtS1UeL75Rb0G
export ENDPOINT=one-minio.minio:9000
export OUTPUT_PATH=s3a://openlake/spark/result/taxi
export INPUT_PATH=s3a://openlake/spark/sample-data/rows.csv

创建k8s secret,保存上述信息:

kubectl create secret generic minio-secret \
    --from-literal=AWS_ACCESS_KEY_ID=3436ZpuHMvI5EEoR \
    --from-literal=AWS_SECRET_ACCESS_KEY=6US0FDsSFdlg5DzbWPPJtS1UeL75Rb0G \
    --from-literal=ENDPOINT=http://one-minio.minio:9000 \
    --from-literal=AWS_REGION=us-east-1 \
    --namespace spark-operator

部署sparkapp:

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
    name: spark-minio
    namespace: spark-operator
spec:
    type: Python
    pythonVersion: "3"
    mode: cluster
    image: "swr.cn-north-4.myhuaweicloud.com/xiowang/spark-minio:3.3.2"
    imagePullPolicy: Always
    mainApplicationFile: local:///app/main.py
    sparkVersion: "3.3.2"
    restartPolicy:
        type: OnFailure
        onFailureRetries: 3
        onFailureRetryInterval: 10
        onSubmissionFailureRetries: 5
        onSubmissionFailureRetryInterval: 20
    driver:
        cores: 1
        memory: "1024m"
        labels:
            version: 3.3.2
        serviceAccount: my-release-spark
        env:
            - name: INPUT_PATH
              value: "s3a://openlake/spark/sample-data/rows.csv"
            - name: OUTPUT_PATH
              value: "s3a://openlake/spark/result/taxi"
            - name: SSL_ENABLED
              value: "true"
            - name: AWS_REGION
              valueFrom:
                  secretKeyRef:
                      name: minio-secret
                      key: AWS_REGION
            - name: AWS_ACCESS_KEY_ID
              valueFrom:
                  secretKeyRef:
                      name: minio-secret
                      key: AWS_ACCESS_KEY_ID
            - name: AWS_SECRET_ACCESS_KEY
              valueFrom:
                  secretKeyRef:
                      name: minio-secret
                      key: AWS_SECRET_ACCESS_KEY
            - name: ENDPOINT
              valueFrom:
                  secretKeyRef:
                      name: minio-secret
                      key: ENDPOINT
    executor:
        cores: 1
        instances: 3
        memory: "1024m"
        labels:
            version: 3.3.2
        env:
            - name: INPUT_PATH
              value: "s3a://openlake/spark/sample-data/rows.csv"
            - name: OUTPUT_PATH
              value: "s3a://openlake/spark/result/taxi"
            - name: SSL_ENABLED
              value: "true"
            - name: AWS_REGION
              valueFrom:
                  secretKeyRef:
                      name: minio-secret
                      key: AWS_REGION
            - name: AWS_ACCESS_KEY_ID
              valueFrom:
                  secretKeyRef:
                      name: minio-secret
                      key: AWS_ACCESS_KEY_ID
            - name: AWS_SECRET_ACCESS_KEY
              valueFrom:
                  secretKeyRef:
                      name: minio-secret
                      key: AWS_SECRET_ACCESS_KEY
            - name: ENDPOINT
              valueFrom:
                  secretKeyRef:
                      name: minio-secret
                      key: ENDPOINT

这个sparkapp中的python脚本内容如下,实际上做的就是统计每天超过6位乘客的信息:

import logging
import os

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, LongType, DoubleType, StringType

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger("MinIOSparkJob")

spark = SparkSession.builder.getOrCreate()


def load_config(spark_context: SparkContext):
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY_ID", "openlakeuser"))
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.secret.key",
                                                 os.getenv("AWS_SECRET_ACCESS_KEY", "openlakeuser"))
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.endpoint", os.getenv("ENDPOINT", "play.min.io:50000"))
    # spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", os.getenv("SSL_ENABLED", "true"))
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.attempts.maximum", "1")
    # spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.establish.timeout", "5000")
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.timeout", "10000")


load_config(spark.sparkContext)


# Define schema for NYC Taxi Data
schema = StructType([
    StructField('VendorID', LongType(), True),
    StructField('tpep_pickup_datetime', StringType(), True),
    StructField('tpep_dropoff_datetime', StringType(), True),
    StructField('passenger_count', DoubleType(), True),
    StructField('trip_distance', DoubleType(), True),
    StructField('RatecodeID', DoubleType(), True),
    StructField('store_and_fwd_flag', StringType(), True),
    StructField('PULocationID', LongType(), True),
    StructField('DOLocationID', LongType(), True),
    StructField('payment_type', LongType(), True),
    StructField('fare_amount', DoubleType(), True),
    StructField('extra', DoubleType(), True),
    StructField('mta_tax', DoubleType(), True),
    StructField('tip_amount', DoubleType(), True),
    StructField('tolls_amount', DoubleType(), True),
    StructField('improvement_surcharge', DoubleType(), True),
    StructField('total_amount', DoubleType(), True)])

# Read CSV file from MinIO
df = spark.read.option("header", "true").schema(schema).csv(
    os.getenv("INPUT_PATH", "s3a://openlake/spark/sample-data/taxi-data.csv"))
# Filter dataframe based on passenger_count greater than 6
large_passengers_df = df.filter(df.passenger_count > 6)

total_rows_count = df.count()
filtered_rows_count = large_passengers_df.count()
# File Output Committer is used to write the output to the destination (Not recommended for Production)
large_passengers_df.write.format("csv").option("header", "true").save(
    os.getenv("OUTPUT_PATH", "s3a://openlake-tmp/spark/nyc/taxis_small"))

logger.info(f"Total Rows for NYC Taxi Data: {total_rows_count}")
logger.info(f"Total Rows for Passenger Count > 6: {filtered_rows_count}")

如果上面的代码跑的有问题可以创建如下一个debug-pod,并进入pod进行debug

apiVersion: v1
kind: Pod
metadata:
  name: debug-pod
  namespace: spark-operator
spec:
  containers:
    - name: spark-minio
      image: swr.cn-north-4.myhuaweicloud.com/xiowang/spark-minio:3.3.2
      command: ["sleep"]
      args: ["infinity"]
      env:
          - name: INPUT_PATH
            value: "s3a://openlake/spark/sample-data/rows.csv"
          - name: OUTPUT_PATH
            value: "s3a://openlake/spark/result/taxi"
          - name: SSL_ENABLED
            value: "true"
          - name: AWS_REGION
            valueFrom:
                secretKeyRef:
                    name: minio-secret
                    key: AWS_REGION
          - name: AWS_ACCESS_KEY_ID
            valueFrom:
                secretKeyRef:
                    name: minio-secret
                    key: AWS_ACCESS_KEY_ID
          - name: AWS_SECRET_ACCESS_KEY
            valueFrom:
                secretKeyRef:
                    name: minio-secret
                    key: AWS_SECRET_ACCESS_KEY
          - name: ENDPOINT
            valueFrom:
                secretKeyRef:
                    name: minio-secret
                    key: ENDPOINT

最后查看日志,可以看到打印的日志

#kubectl -n spark-operator logs spark-minio-driver
2023-05-25 01:13:49,104 - MinIOSparkJob - INFO - Total Rows for NYC Taxi Data: 112234626
2023-05-25 01:13:49,104 - MinIOSparkJob - INFO - Total Rows for Passenger Count > 6: 1066

4.spark中使用iceberg分析minio上的数据

iceberg是Netflix开源的一款软件,简单来说就是方便大数据工程师通过sql方式操作csv,parquet等文件,并且支持snapshot,具体可以见官网介绍。
原文中的一些地址写死了,这里我修改了一下,保存为main-iceberg.py

import logging
import os
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, LongType, DoubleType, StringType

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger("MinIOSparkJob")


# adding iceberg configs
conf = (
    SparkConf()
    .set("spark.sql.extensions",
         "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") # Use Iceberg with Spark
    .set("spark.sql.catalog.demo", "org.apache.iceberg.spark.SparkCatalog")
    .set("spark.sql.catalog.demo.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    .set("spark.sql.catalog.demo.warehouse", os.getenv("WAREHOUSE", "s3a://openlake/warehouse/"))
    .set("spark.sql.catalog.demo.s3.endpoint", os.getenv("ENDPOINT", "play.min.io:50000"))
    .set("spark.sql.defaultCatalog", "demo") # Name of the Iceberg catalog
    .set("spark.sql.catalogImplementation", "in-memory")
    .set("spark.sql.catalog.demo.type", "hadoop") # Iceberg catalog type
    .set("spark.executor.heartbeatInterval", "300000")
    .set("spark.network.timeout", "400000")
)

spark = SparkSession.builder.config(conf=conf).getOrCreate()

# Disable below line to see INFO logs
spark.sparkContext.setLogLevel("ERROR")


def load_config(spark_context: SparkContext):
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY_ID", "openlakeuser"))
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.secret.key",
                                                 os.getenv("AWS_SECRET_ACCESS_KEY", "openlakeuser"))
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.endpoint", os.getenv("ENDPOINT", "play.min.io:50000"))
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "true")
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.attempts.maximum", "1")
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.establish.timeout", "5000")
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.timeout", "10000")


load_config(spark.sparkContext)

# Define schema for NYC Taxi Data
schema = StructType([
    StructField('VendorID', LongType(), True),
    StructField('tpep_pickup_datetime', StringType(), True),
    StructField('tpep_dropoff_datetime', StringType(), True),
    StructField('passenger_count', DoubleType(), True),
    StructField('trip_distance', DoubleType(), True),
    StructField('RatecodeID', DoubleType(), True),
    StructField('store_and_fwd_flag', StringType(), True),
    StructField('PULocationID', LongType(), True),
    StructField('DOLocationID', LongType(), True),
    StructField('payment_type', LongType(), True),
    StructField('fare_amount', DoubleType(), True),
    StructField('extra', DoubleType(), True),
    StructField('mta_tax', DoubleType(), True),
    StructField('tip_amount', DoubleType(), True),
    StructField('tolls_amount', DoubleType(), True),
    StructField('improvement_surcharge', DoubleType(), True),
    StructField('total_amount', DoubleType(), True)])

# Read CSV file from MinIO
df = spark.read.option("header", "true").schema(schema).csv(
    os.getenv("INPUT_PATH", "s3a://openlake/spark/sample-data/taxi-data.csv"))

# Create Iceberg table "nyc.taxis_large" from RDD
df.write.mode("overwrite").saveAsTable("nyc.taxis_large")

# Query table row count
count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large")
total_rows_count = count_df.first().cnt
logger.info(f"Total Rows for NYC Taxi Data: {total_rows_count}")

# Rename column "fare_amount" in nyc.taxis_large to "fare"
spark.sql("ALTER TABLE nyc.taxis_large RENAME COLUMN fare_amount TO fare")

# Rename column "trip_distance" in nyc.taxis_large to "distance"
spark.sql("ALTER TABLE nyc.taxis_large RENAME COLUMN trip_distance TO distance")

# Add description to the new column "distance"
spark.sql(
    "ALTER TABLE nyc.taxis_large ALTER COLUMN distance COMMENT 'The elapsed trip distance in miles reported by the taximeter.'")

# Move "distance" next to "fare" column
spark.sql("ALTER TABLE nyc.taxis_large ALTER COLUMN distance AFTER fare")

# Add new column "fare_per_distance" of type float
spark.sql("ALTER TABLE nyc.taxis_large ADD COLUMN fare_per_distance FLOAT AFTER distance")

# Check the snapshots available
snap_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots")
snap_df.show()  # prints all the available snapshots (1 till now)

# Populate the new column "fare_per_distance"
logger.info("Populating fare_per_distance column...")
spark.sql("UPDATE nyc.taxis_large SET fare_per_distance = fare/distance")

# Check the snapshots available
logger.info("Checking snapshots...")
snap_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots")
snap_df.show()  # prints all the available snapshots (2 now) since previous operation will create a new snapshot

# Qurey the table to see the results
res_df = spark.sql("""SELECT VendorID
                            ,tpep_pickup_datetime
                            ,tpep_dropoff_datetime
                            ,fare
                            ,distance
                            ,fare_per_distance
                            FROM nyc.taxis_large LIMIT 15""")
res_df.show()

# Delete rows from "fare_per_distance" based on criteria
logger.info("Deleting rows from fare_per_distance column...")
spark.sql("DELETE FROM nyc.taxis_large WHERE fare_per_distance > 4.0 OR distance > 2.0")
spark.sql("DELETE FROM nyc.taxis_large WHERE fare_per_distance IS NULL")

# Check the snapshots available
logger.info("Checking snapshots...")
snap_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots")
snap_df.show()  # prints all the available snapshots (4 now) since previous operations will create 2 new snapshots

# Query table row count
count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large")
total_rows_count = count_df.first().cnt
logger.info(f"Total Rows for NYC Taxi Data after delete operations: {total_rows_count}")

# Partition table based on "VendorID" column
logger.info("Partitioning table based on VendorID column...")
spark.sql("ALTER TABLE nyc.taxis_large ADD PARTITION FIELD VendorID")

# Query Metadata tables like snapshot, files, history
logger.info("Querying Snapshot table...")
snapshots_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots ORDER BY committed_at")
snapshots_df.show()  # shows all the snapshots in ascending order of committed_at column

logger.info("Querying Files table...")
files_count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large.files")
total_files_count = files_count_df.first().cnt
logger.info(f"Total Data Files for NYC Taxi Data: {total_files_count}")

spark.sql("""SELECT file_path,
                    file_format,
                    record_count,
                    null_value_counts,
                    lower_bounds,
                    upper_bounds
                    FROM nyc.taxis_large.files LIMIT 1""").show()

# Query history table
logger.info("Querying History table...")
hist_df = spark.sql("SELECT * FROM nyc.taxis_large.history")
hist_df.show()

# Time travel to initial snapshot
logger.info("Time Travel to initial snapshot...")
snap_df = spark.sql("SELECT snapshot_id FROM nyc.taxis_large.history LIMIT 1")
spark.sql(f"CALL demo.system.rollback_to_snapshot('nyc.taxis_large', {snap_df.first().snapshot_id})")

# Qurey the table to see the results
res_df = spark.sql("""SELECT VendorID
                            ,tpep_pickup_datetime
                            ,tpep_dropoff_datetime
                            ,fare
                            ,distance
                            ,fare_per_distance
                            FROM nyc.taxis_large LIMIT 15""")
res_df.show()

# Query history table
logger.info("Querying History table...")
hist_df = spark.sql("SELECT * FROM nyc.taxis_large.history")
hist_df.show()  # 1 new row

# Query table row count
count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large")
total_rows_count = count_df.first().cnt
logger.info(f"Total Rows for NYC Taxi Data after time travel: {total_rows_count}")

创建sparkapp,保存为spark-iceberg-minio.yaml

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
    name: spark-iceberg-minio
    namespace: spark-operator
spec:
    type: Python
    pythonVersion: "3"
    mode: cluster
    image: "swr.cn-north-4.myhuaweicloud.com/xiowang/spark-minio:3.3.2"
    imagePullPolicy: Always
    mainApplicationFile: local:///app/main-iceberg.py
    sparkVersion: "3.3.2"
    restartPolicy:
        type: OnFailure
        onFailureRetries: 3
        onFailureRetryInterval: 10
        onSubmissionFailureRetries: 5
        onSubmissionFailureRetryInterval: 20
    driver:
        cores: 1
        memory: "1024m"
        labels:
            version: 3.3.2
        serviceAccount: my-release-spark
        env:
            - name: INPUT_PATH
              value: "s3a://openlake/spark/sample-data/rows.csv"
            - name: OUTPUT_PATH
              value: "s3a://openlake/spark/result/taxi"
            - name: SSL_ENABLED
              value: "true"
            - name: WAREHOUSE
              value: "s3a://openlake/warehouse"
            - name: AWS_REGION
              valueFrom:
                  secretKeyRef:
                      name: minio-secret
                      key: AWS_REGION
            - name: AWS_ACCESS_KEY_ID
              valueFrom:
                  secretKeyRef:
                      name: minio-secret
                      key: AWS_ACCESS_KEY_ID
            - name: AWS_SECRET_ACCESS_KEY
              valueFrom:
                  secretKeyRef:
                      name: minio-secret
                      key: AWS_SECRET_ACCESS_KEY
            - name: ENDPOINT
              valueFrom:
                  secretKeyRef:
                      name: minio-secret
                      key: ENDPOINT
    executor:
        cores: 1
        instances: 3
        memory: "1024m"
        labels:
            version: 3.3.2
        env:
            - name: INPUT_PATH
              value: "s3a://openlake/spark/sample-data/rows.csv"
            - name: OUTPUT_PATH
              value: "s3a://openlake/spark/result/taxi"
            - name: SSL_ENABLED
              value: "true"
            - name: WAREHOUSE
              value: "s3a://openlake/warehouse"
            - name: AWS_REGION
              valueFrom:
                  secretKeyRef:
                      name: minio-secret
                      key: AWS_REGION
            - name: AWS_ACCESS_KEY_ID
              valueFrom:
                  secretKeyRef:
                      name: minio-secret
                      key: AWS_ACCESS_KEY_ID
            - name: AWS_SECRET_ACCESS_KEY
              valueFrom:
                  secretKeyRef:
                      name: minio-secret
                      key: AWS_SECRET_ACCESS_KEY
            - name: ENDPOINT
              valueFrom:
                  secretKeyRef:
                      name: minio-secret
                      key: ENDPOINT

小结

本文主要参考openlake的guide体验了一下k8s环境下spark如何处理minio中的数据。能感受到spark对应的生态比较完善,对于结构化和半结构化的数据处理起来非常方便。当然也有一些不太习惯的地方,比如iceberg这些中间件都是通过jar包的方式被引入,而不是通过中间件服务,这就意味着更新中间件需要去更新容器镜像。

另外openlake原文中还有dremio做查询层,利用kakfa进行spark stream处理的内容。感兴趣的同学可以去试一下。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容