如何使用 Apache Flume 发送日志数据至 Apache Pulsar

Apache Flume 是一个分布式的、可靠易用的系统,可以有效地收集和汇总来自多种源系统的大量日志数据,或转移这些数据至一个数据中心存储。

Apache Pulsar 是 Yahoo 基于 Apache BookKeeper 开发和开源的下一代分布式消息系统。Apache Pulsar 已经从下一代分布式消息系统演化成为一个流原生数据平台。

本文主要介绍使用 Flume 实现日志搜集,并发送日志数据至 Pulsar 进行消费。

本文主要涉及 Flume 和 Pulsar 服务的搭建、测试、数据的发送及消费,全面讲解了 Flume 如何发送数据到 Pulsar以及Pulsar 如何消费数据。上图虽然画了 3 个 Flume 服务,但是本次测试只使用了一个,目的是快速走通整个流程,消费数据时也是只启动了一个消费端。后期如果有需求,可以根据此次试验快速完成扩展。

安装环境依赖

在搭建本次试验之前,需要安装以下依赖,本次试验是在 Mac 系统上进行的测试。

依赖 安装指南
Docker https://www.docker.com/get-started
Java 8 https://www.oracle.com/technetwork/java/javase/downloads/index.html
Maven 3.5 或更高版本 https://archive.apache.org/dist/maven/maven-3/
Git https://www.linode.com/docs/development/version-control/how-to-install-git-on-linux-mac-and-windows/
Telnet http://www.telnet.org/htm/howto.htm

编译并打包项目

你能通过以下任一方式编译并打包项目:

方法 1:在本地编译并打包本项目

注意

在打包本项目之前,需要安装并配置好 Java 、 Maven 和 Git 的相关环境。

1、从 GitHub 克隆 Flume sink 代码。

git clone https://github.com/streamnative/pulsar-flume-ng-sink.git

2、使用 Maven 将本项目打包。

cd pulsar-flume-ng-sink
mvn clean package

如果出现以下信息,则说明打包成功:

[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------

方法 2:使用 Docker 镜像编译并打包本项目

如果本地不方便安装 Java 和 Maven 环境,可以使用 Docker 环境进行打包。

1、拉取并启动镜像。

本次使用的 Docker 镜像是 maven:3.6-jdk-8 ,带有 Maven 、 Java 和 Git 的相关环境。

docker pull maven:3.6-jdk-8
docker run -d -it --name docker-package maven:3.6-jdk-8 /bin/bash

上述命令会启动一个基于镜像 maven:3.6-jdk-8 名称为 docker-package 的 Docker 服务。

docker ps -a | grep docker-package
580376b38aa8        maven:3.6-jdk-8                              "/usr/local/bin/mvn-…"   5 minutes ago       Up 5 minutes                                    docker-package
docker exec -it docker-package /bin/bash

使用上述命令确认该镜像是否成功启动。成功启动后,我们就已经在 Docker 服务中了。

2、克隆代码并打包。

成功启动该镜像后,可以非常方便地进行打包。类似地,我们可以使用与在本地打包相同的方法,在 Docker 中对本项目进行打包。

git clone https://github.com/streamnative/pulsar-flume-ng-sink.git
cd pulsar-flume-ng-sink/
mvn clean package

如果出现以下信息,则说明打包成功:

[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------

打包成功后,在本目录下的 target 文件夹中会出现一个 jar 包,名字类似于 pulsar-flume-ng-sink-<version>.jar 。

安装 Pulsar

为了方便测试,本次安装 standalone Pulsar,并且使用 Docker 镜像 apachepulsar/pulsar:2.3.0 启动服务。

docker pull apachepulsar/pulsar:2.3.0
docker run -d -it -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-flume-standalone apachepulsar/pulsar:2.3.0 bin/pulsar standalone

该服务暴露 6650 和 8080 端口供外部服务使用,该容器将 Pulsar 服务的 data 目录挂载在了当前目录下,并且命名为 pulsar-flume-standalone。
-d 参数表示使该服务使用后台模式运行;-it 参数表示以交互模式运行容器,并为容器分配一个伪输入终端。

docker logs -f pulsar-flume-standalone

07:53:43.844 [pulsar-web-55-6] INFO  org.apache.pulsar.broker.web.PulsarWebResource - Successfully validated clusters on tenant [public]
07:53:43.856 [pulsar-web-55-6] INFO  org.apache.pulsar.broker.admin.impl.NamespacesBase - [null] Created namespace public/default
07:53:43.859 [pulsar-web-55-6] INFO  org.eclipse.jetty.server.RequestLog - 172.17.0.3 - - [28/Apr/2019:07:53:43 +0000] "PUT /admin/v2/namespaces/public/default HTTP/1.1" 204 0 "-" "Jersey/2.27 (HttpUrlConnection 1.8.0_181)" 17
07:53:43.864 [pulsar-web-55-6] INFO  org.apache.pulsar.broker.web.PulsarWebResource - Successfully validated clusters on tenant [public]
07:53:43.868 [pulsar-ordered-OrderedExecutor-1-0-EventThread] INFO  org.apache.pulsar.zookeeper.ZooKeeperDataCache - [State:CONNECTED Timeout:30000 sessionid:0x1000112d2ea000a local:/127.0.0.1:38464 remoteserver:localhost/127.0.0.1:2181 lastZxid:154 xid:41 sent:41 recv:43 queuedpkts:0 pendingresp:0 queuedevents:0] Received ZooKeeper watch event: WatchedEvent state:SyncConnected type:NodeDataChanged path:/admin/policies/public/default
07:53:43.869 [pulsar-web-55-6] INFO  org.apache.pulsar.broker.admin.impl.NamespacesBase - [null] Successfully updated the replication clusters on namespace public/default

使用 docker logs -f pulsar-flume-standalone 命令查看 Docker 日志。如果出现以上的日志信息,则说明 Pulsar 在单机模式下启动成功了。

使用 Flume

使用 Docker 搭建 Flume 环境。

1、启动 Flume 容器。

docker pull maven:3.6-jdk-8
docker run -d -it --link pulsar-flume-standalone -p 44445:44445 --name flume maven:3.6-jdk-8 /bin/bash

以上命令会启动一个名称为 flume 的服务,它链接到 pulsar-flume-standalone 容器,暴露 44445 端口的 Flume 服务。

docker ps -a | grep flume
27c7555e5481        maven:3.6-jdk-8                              "/usr/local/bin/mvn-…"   46 seconds ago      Up 45 seconds               0.0.0.0:44445->44445/tcp                         flume

使用 docker ps 命令可以看到启动的 Flume 服务。

2、安装 Flume 服务。

(1)进入 Flume 容器。

docker exec -it flume /bin/bash

(2)下载并解压 Apache Flume 压缩包到当前目录。

wget http://apache.01link.hk/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
tar -zxvf apache-flume-1.9.0-bin.tar.gz

(3)退出 Flume 容器。

exit

3、部署 pulsar-flume-ng-sink。

(1)修改配置文件。本次测试需使用以下 2 个配置文件:

  • 配置文件 flume-example.conf

    flume-example.conf 中兼容 Flume 的配置。更多关于 Flume 配置的信息,参阅 Flume 官方文档,此处简单介绍以下信息:

    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = 0.0.0.0
    a1.sources.r1.port = 44445
    
    ## Describe the sink
    a1.sinks.k1.type = org.apache.flume.sink.pulsar.PulsarSink
    # Configure the pulsar service url (without `pulsar://`)
    a1.sinks.k1.serviceUrl = pulsar-flume-standalone:6650
    a1.sinks.k1.topicName = flume-test-topic
    a1.sinks.k1.producerName = flume-test-producer
    
    参数 说明
    a1.sources.r1.type 指定数据来源。为了方便测试,此处指定 netcat 可以使用 Telnet 工具进行测试。
    a1.sources.r1.bind 绑定本机的 IP 地址。
    a1.sources.r1.port 绑定本机的 44445 端口。启动容器会映射该端口映射,方便测试。
    a1.sinks.k1.type 指定使用的 PulsarSink 类。这个不需要更改。
    a1.sinks.k1.serviceUrl 指定 Pulsar 服务的地址。你可以根据实际需求,更改该参数。此处使用容器地址是 pulsar-flume-standalone:6650。如需支持 SSL,可以配置打开 useTLS 选项。
    a1.sinks.k1.topicName 指定需要发送至 Pulsar 的目标 Topic。如果 Pulsar 服务没有该 Topic ,则会自动新建该 Topic。
    a1.sinks.k1.producerName 指定 sink 的名称,不能重复。

    以上是一些比较简单的配置,根据这些配置,我们可以完成本次试验。更多关于生产环境参数的置、客户端配置和 Producer 配置等信息,参阅 pulsar-flume-ng-sink 的配置

  • 配置文件 flume-env.sh

    flume-env.sh 文件是 Flume 在启动的时需要使用的一些库的查找路径,需要将已打包好的 flume-ng-pulsar-sink-<version>.jar 的 jar 包复制到该路径下。

    以下是本次测使用的 flume-env.sh 配置:

    #!/bin/sh
    export JAVA_HOME=/docker-java-home
    
    FLUME_CLASSPATH=/docker-java-home/lib
    

(2)复制配置文件至 Flume 服务器。

按照以上步骤修改完配置文件后,可以将这些配置复制到 Flume 服务所在的容器。

docker cp flume-example.conf flume:/apache-flume-1.9.0-bin/conf/
docker cp flume-env.sh flume:/apache-flume-1.9.0-bin/conf/

(3)复制 flume-ng-pulsar-sink-<version>.jar 至 Flume 容器。

使用以下任一方法:

  • 方法 1

    如果在本地进行了打包,使用以下命令可以将该 jar 包复制到 Flume 容器。
    进入打包文件的根目录,使用 Docker 命令进行复制。

    cd pulsar-flume-ng-sink
    docker cp target/flume-ng-pulsar-sink-1.9.0.jar flume:apache-flume-1.9.0-bin/lib/
    
  • 方法 2

    如果在容器中进行了打包,首先将容器中已打包的 jar 包复制到本地,再使用 Docker 命令复制到 Flume 容器中。

    docker cp docker-package:/pulsar-flume-ng-sink/target/flume-ng-pulsar-sink-1.9.0.jar .
    docker cp flume-ng-pulsar-sink-1.9.0.jar flume:apache-flume-1.9.0-bin/lib/
    

    至此,已成功构建启动 Flume 服务所需要的环境和配置文件。

4、启动 Flume 服务。

进入 Flume 容器并启动 flume agent。

docker exec -it flume /bin/bash
apache-flume-1.9.0-bin/bin/flume-ng agent --conf apache-flume-1.9.0-bin/conf/ -f apache-flume-1.9.0-bin/conf/flume-example.conf -n a1

在以上命令中,--conf 指定 Flume 服务的配置文件夹,-f 指定配置文件,-n 指定名称(需要与配置文件里的相同)。

如果出现如下信息,则说明 flume-ng agent 启动成功。

Info: Sourcing environment configuration script /apache-flume-1.9.0-bin/conf/flume-env.sh
Info: Including Hive libraries found via () for Hive access
+ exec /docker-java-home/bin/java -Xmx20m -cp '/apache-flume-1.9.0-bin/conf:/apache-flume-1.9.0-bin/lib/*:/docker-java-home/lib:/lib/*' -Djava.library.path= org.apache.flume.node.Application -f apache-flume-1.9.0-bin/conf/flume-example.conf -n a1

5、启动消费端。

在新的窗口中进入 Pulsar 的容器,再启动消费端(用于消费 Flume 发出的信息)。

测试使用的脚本文件在这里。脚本文件内容如下:

import pulsar

client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe('flume-test-topic',
                            subscription_name='sub')

while True:
    msg = consumer.receive()
    print("Received message: '%s'" % msg.data())
    consumer.acknowledge(msg)

client.close()

首先,该脚本会连接到本地 Pulsar 服务的 6650 端口。由于在 Pulsar 服务所在的容器中启动的该脚本,因此,服务地址是 localhost:6650。该脚本会在 flume-test-topic Topic 上进行消费,该 Topic 正是我们在 flume-example.conf 中配置的 Topic ,并且指定订阅名称为 sub。

(1)复制测试脚本文件到 Pulsar 服务所在的容器。

```
cd pulsar-flume-ng-sink
docker cp src/test/python/pulsar-flume.py pulsar-flume-standalone:/pulsar/
```

(2)进入 Pulsar 服务容器。

```
docker exec -it pulsar-flume-standalone /bin/bash
```

(3)启动消费者脚本。

  ```
  python pulsar-flume.py
  2019-04-28 09:01:43.581 INFO  Client:88 | Subscribing on Topic :flume-test-topic
  2019-04-28 09:01:43.582 INFO  ConnectionPool:72 | Created connection for pulsar://localhost:6650
  2019-04-28 09:01:43.585 INFO  ClientConnection:300 | [127.0.0.1:56088 -> 127.0.0.1:6650] Connected to broker
  2019-04-28 09:01:43.590 INFO  HandlerBase:52 | [persistent://public/default/flume-test-topic, sub, 0] Getting connection from pool
  2019-04-28 09:01:43.590 INFO  ConnectionPool:72 | Created connection for pulsar://c063d9580d4c:6650
  2019-04-28 09:01:43.591 INFO  ClientConnection:302 | [127.0.0.1:56090 -> 127.0.0.1:6650] Connected to broker through proxy. Logical broker: pulsar://c063d9580d4c:6650
  2019-04-28 09:01:43.614 INFO  ConsumerImpl:169 | [persistent://public/default/flume-test-topic, sub, 0] Created consumer on broker [127.0.0.1:56090 -> 127.0.0.1:6650]
  ```

启动脚本后出现以上信息,则说明消费端已经成功启动。

6、发送消息进行测试。

如前文所述,本次测试使用 Telnet 工具。

开启一个新的窗口,使用以下 Telnet 命令,发送数据到 Flume 的 44445 端口。

➜  ~ telnet localhost 44445
Trying ::1...
Connected to localhost.
Escape character is '^]'.
hello
OK
world
OK

此时,在刚才开启的消费端可以看到通过 Telnet 发送的信息,内容大致如下:

'eceived message: 'hello
'eceived message: 'world

以上正是通过 Telnet 发送到 Flume 44445 端口的数据。

清理环境

完成本次测试后,使用以下命令停止并且删除相关容器:

docker stop flume
docker rm flume
docker stop pulsar-flume-standalone
docker rm pulsar-flume-standalone
docker stop docker-package
docker rm docker-package

更多信息

Apache Pulsar Connector 系列文章:

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,588评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,456评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,146评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,387评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,481评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,510评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,522评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,296评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,745评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,039评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,202评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,901评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,538评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,165评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,415评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,081评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,085评论 2 352

推荐阅读更多精彩内容