基于Kafka的高性能流处理平台——Confluent


软件准备

  • Confluent安装包
    下载地址:https://www.confluent.io/download/
    Confluent有两个类型可以下载,企业版(Enterprise)需要付费,可以免费使用30天,我这里使用的是开源版(Open Source)版,版本号是4.1.1

1. Confluent 介绍

(1) Confluent 是什么?

Confluent平台是一个可靠的,高性能的流处理平台,你可以通过这个平台组织和管理各式各样的数据源中的数据。

image.png

(2) Confluent 中有什么?

  • Confluent开源版
    • Confluent Kafka Connectors
      • Kafka Connect JDBC Connector
      • Kafka Connect HDFS Connector
      • Kafka Connect Elasticsearch Connector
      • Kafka Connect S3 Connector
    • Confluent Kafka Clients
      • C/C++ Client Library
      • Python Client Library
      • Go Client Library
      • .Net Client Library
    • Confluent Schema Registry
    • Confluent Kafka REST Proxy
  • Confluent 企业版中增加的功能
    • Automatic Data Balancing
    • Multi-Datacenter Replication
    • Confluent Control Center
    • JMS Client

2. Confluent 开源版安装

(1) 解压安装包,可以看到以下目录:

[root@confluent confluent-4.1.1]# ll
total 24
drwxr-xr-x  3 1000 1000 4096 May 12 08:01 bin
drwxr-xr-x 14 1000 1000 4096 May 12 07:05 etc
drwxr-xr-x  3 1000 1000 4096 May 12 06:47 lib
-rw-r--r--  1 1000 1000  871 May 12 08:02 README
drwxr-xr-x  6 1000 1000 4096 May 12 07:05 share
drwxr-xr-x  2 1000 1000 4096 May 12 08:02 src

(2) 启动confluent

[root@confluent confluent-4.1.1]# bin/confluent start
Using CONFLUENT_CURRENT: /tmp/confluent.I5Y1nzpT
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]

confluent start 会启动 confluent 全部组件,如果想要单独启动,比如单独启动 schema-registry,可以执行以下命令:

schema-registry-start

具体的单独启动各组件的命令,进入 bin 目录下,一看就能明白,不再赘述。

3. 简单使用

(1) 创建 topic

[root@confluent confluent-4.1.1]# bin/kafka-topics \
> --create \
> --zookeeper localhost:2181 \
> --replication-factor 1 \
> --partitions 1 \
> --topic confluent-test-001
Created topic "confluent-test-001".

说明:
confluent 中内嵌了 Kafka 和 Zookeeper,你也可以通过指定不同的 zookeeper 在其他的 kafka 集群中创建 topic 或执行其他操作。

(2) 生产数据

[root@confluent confluent-4.1.1]# bin/ksql-datagen \
> quickstart=users \
> format=json \
> topic=confluent-test-001 \
> maxInterval=1000
[2018-06-22 14:53:19,170] INFO AvroDataConfig values: 
    schemas.cache.config = 1
    enhanced.avro.schema.support = false
    connect.meta.data = true
 (io.confluent.connect.avro.AvroDataConfig:179)
User_5 --> ([ 1513083004885 | 'User_5' | 'Region_9' | 'OTHER' ])
User_8 --> ([ 1508770926089 | 'User_8' | 'Region_3' | 'OTHER' ])
User_9 --> ([ 1504006562725 | 'User_9' | 'Region_5' | 'FEMALE' ])
User_8 --> ([ 1490524175099 | 'User_8' | 'Region_2' | 'OTHER' ])
User_8 --> ([ 1489424770134 | 'User_8' | 'Region_8' | 'MALE' ])
User_1 --> ([ 1516449943408 | 'User_1' | 'Region_4' | 'OTHER' ])
......

以上命令是内嵌的一个kafka-producer脚本,生成随机的用户信息,可以通过 quickstart=[CLICKSTREAM_CODES, CLICKSTREAM, CLICKSTREAM_USERS, ORDERS, RATINGS, USERS, USERS_, PAGEVIEWS] 来生成不同的数据,这个脚本会运行很长时间(官网只说了很长时间,到底多长,没说),除非你手动停止

(3) 使用 KSQL 查询生产的数据

在另一个窗口中,进入KSQL命令行(上一个窗口继续发数据不要停)

[root@confluent confluent-4.1.1]# bin/ksql
                  
                  ===========================================
                  =        _  __ _____  ____  _             =
                  =       | |/ // ____|/ __ \| |            =
                  =       | ' /| (___ | |  | | |            =
                  =       |  <  \___ \| |  | | |            =
                  =       | . \ ____) | |__| | |____        =
                  =       |_|\_\_____/ \___\_\______|       =
                  =                                         =
                  =  Streaming SQL Engine for Apache Kafka® =
                  ===========================================

Copyright 2017 Confluent Inc.

CLI v4.1.1, Server v4.1.1 located at http://localhost:8088

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql> 

把生产过来的数据创建为user表:

ksql> CREATE TABLE users (registertime BIGINT, gender VARCHAR, regionid VARCHAR, \
> userid VARCHAR, \interests array<VARCHAR>, contact_info map<VARCHAR, VARCHAR>) \
> WITH (KAFKA_TOPIC='confluent-test-001', VALUE_FORMAT='JSON', KEY = 'userid');

 Message       
---------------
 Table created 
---------------

设置消费偏移量为 "earliest":

ksql> SET 'auto.offset.reset'='earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'

查询:

ksql> select * from users;
1529651156298 | User_7 | 1497590434653 | OTHER | Region_7 | User_7 | null | null
1529651158082 | User_9 | 1508375625042 | OTHER | Region_1 | User_9 | null | null
1529651160496 | User_5 | 1501045879443 | MALE | Region_6 | User_5 | null | null
1529651161870 | User_6 | 1514541057484 | FEMALE | Region_5 | User_6 | null | null
1529651162248 | User_3 | 1498247501220 | MALE | Region_1 | User_3 | null | null
1529651162727 | User_1 | 1495368101769 | FEMALE | Region_3 | User_1 | null | null
1529651164048 | User_4 | 1508110530233 | MALE | Region_6 | User_4 | null | null
.....
# 只要生产数据的程序没有停止,这里会一直打印查询结果

4. 关闭服务

[root@confluent confluent-4.1.1]# bin/confluent stop
Using CONFLUENT_CURRENT: /tmp/confluent.I5Y1nzpT
Stopping ksql-server
ksql-server is [DOWN]
Stopping connect
connect is [DOWN]
Stopping kafka-rest
kafka-rest is [DOWN]
Stopping schema-registry
schema-registry is [DOWN]
Stopping kafka
kafka is [DOWN]
Stopping zookeeper
zookeeper is [DOWN]
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。