Pulsar 快速开始(集群搭建+Java demo)

Apache Pulsar集群部署手册

1 集群组成

  1. 搭建 Pulsar 集群至少需要 3 个组件:ZooKeeper 集群、BookKeeper 集群和 broker 集群(BrokerPulsar 的自身实例)。这三个集群组件如下:
  • ZooKeeper 集群(多 个 ZooKeeper 节点组成)

  • bookie 集群(也称为 BookKeeper 集群,多个 BookKeeper 节点组成)

  • broker 集群(多 个 Pulsar 节点组成)

  1. Pulsar 的安装包已包含了搭建集群所需的各个组件库。无需单独下载 ZooKeeper 安装包和 BookKeeper 安装包。

2 准备工作

  • 必须条件

  1. 在所需服务器上安装JDK(要求版本不低于 JDK 8),安装略过

  2. 将pulsar安装包上传至所需服务器,目前安装包可以在https://pulsar.apache.org/en/download/中下载

  3. 解压安装包,以/home/kafka路径为例,$tar -zxvf apache-pulsar-2.6.1-bin.tar.gz -C /home/kafka/

  • 安装建议

    官方建议需要6台机器:

    • 3 台用于运行ZooKeeper 集群,建议使用性能较弱的机器,Pulsar仅将ZooKeeper用于与协调有关的定期任务和与配置有关的任务,而不用于基本操作。

    • 3 用于运行bookie 集群和broker 集群,建议使用性能强劲的机器。

    但是也可以在一台机器上同时部署ZooKeeperbookiebroker ,也就是最少需要三台机器就可以部署一个Pulsar 集群。

    其实也还可以在3台机器上部署ZooKeeper 集群,另外3台机器部署bookie 集群,另3台机器部署broker 集群,也就是共需要消耗9台机器。

    总结,部署一个Pulsar 集群(包含一个ZooKeeper 集群(3 个 ZooKeeper 节点组成),一个bookie 集群(也称为 BookKeeper 集群,3 个 BookKeeper 节点组成),一个broker 集群(3 个 Pulsar 节点组成)),最少需要3台机器,官方建议6台机器,最多需要9台机器。

    下文将以3台机器为例介绍部署方法,三台机器IP地址分别为:

    • 172.21.2.103

    • 172.21.2.104

    • 172.21.2.105

3 部署流程

3.1 zookeeper安装

Pulsar安装包内包含了zookeeper,也可以自建zookeeper,自建zookeeper的方式略。安装Pulsar安装包内的zookeeper方法如下:

  1. 通过cd /home/kafka/apache-pulsar-2.6.1-bin 进入到Pulsar根目录

  2. 通过vim ./conf/zookeeper.conf修改配置文件(三个节点上都需执行此操作),新增或修改如下关键配置项:


#dataDir是修改,其他都是新增

dataDir=/home/kafka/data/zookeeper/data

dataLogDir=/home/kafka/data/zookeeper/log

server.1=172.21.2.103:2888:3888

server.2=172.21.2.104:2888:3888

server.3=172.21.2.105:2888:3888

参数说明:
dataDir:当前zookeeper节点的数据存放目录
dataLogDir:当前zookeeper节点的日志存放目录
server.1~3:为zookeeper集群的各节点指定编号

  1. 在每个zookeeper节点的机器上,新建如下文件目录:
  • data:ZooKeeper使用的数据存储目录

    mkdir -pv /home/kafka/data/zookeeper/data

  • log:ZooKeeper使用的日志存储目录

    mkdir -pv /home/kafka/data/zookeeper/log

  1. 为每个zookeeper节点新建myid,分别在指定的sever上写入配置文件中指定的编号:
  • 在server.1服务器上执行bash命令:

    echo 1 > /home/kafka/data/zookeeper/data/myid

  • 在server.2服务器上执行bash命令:

    echo 2 > /home/kafka/data/zookeeper/data/myid

  • 在server.3服务器上执行bash命令:

    echo 3 > /home/kafka/data/zookeeper/data/myid

  1. 执行后台运行命令,这个命令是启动zookeeper:

bin/pulsar-daemon start zookeeper

  1. 执行zookeeper客户端连接命令:

bin/pulsar zookeeper-shell

客户端正常连接,就算zookeeper启动好了

  1. 在另外两台服务器上也执行bin/pulsar-daemon start zookeeper之后,在其中一个zookeeper节点的机器上,初始化集群元数(总共只需执行一次):

例如在172.21.2.103上:

```bash

bin/pulsar initialize-cluster-metadata \

    --cluster pulsar-cluster-zk \

    --zookeeper 172.21.2.103:2181 \

    --configuration-store 172.21.2.103:2181 \

    --web-service-url http://172.21.2.103:8080,172.21.2.104:8080,172.21.2.105:8080 \

    --web-service-url-tls https://172.21.2.103:8443,172.21.2.104:8443,172.21.2.105:8443 \

    --broker-service-url pulsar://172.21.2.103:6650,172.21.2.104:6650,172.21.2.105:6650 \

    --broker-service-url-tls pulsar+ssl://172.21.2.103:6651,172.21.2.104:6651,172.21.2.105:6651

```

> ##### 集群元数据说明

> - cluster

>  集群名称

> - zookeeper

>  ZooKeeper集群连接参数,仅需要包含ZooKeeper集群中的一个节点即可

> - configuration-store

>  Pulsar实例的配置存储集群(ZooKeeper),多集群部署时才会发挥作用,需要另外部署ZooKeeper集群,但是单集群部署时可以和--zookeeper参数设置一样,只需要包含ZooKeeper集群中的一个节点即可

> - web-service-url

>  集群Web服务的URL+端口,URL是一个标准的DNS名称,默认端口8080,不建议修改。

> - web-service-url-tls

>  集群Web提供TLS服务的URL+端口,端口默认8443,不建议修改。

> - broker-service-url

>  集群brokers服务URL,URL中DNS的名称和Web服务保持一致,URL使用pulsar替代http/http,端口默认6650,不建议修改。

> - broker-service-url-tls

>  集群brokers提供TLS服务的URL,默认端口6551,不建议修改。

>

> **ps:如果没有DNS服务器,也可以使用多主机(multi-host)格式的service-url设置web-service-url,web-service-url-tls,broker-service-url,broker-service-url-tls**
  1. bin/pulsar zookeeper-shell进入zk控制台,通过ls /查看所有zk节点。能看到bookies,ledgers等节点,则说明初始化成功了。

如果需要关闭zookeeper,可使用命令

bin/pulsar-daemon stop zookeeper

3.2 bookkeeper部署

  1. 在每个部署bookkeeper的机器上,通过vim conf/bookkeeper.conf来编辑bookkeeper配置文件,修改如下关键配置项:

advertisedAddress=172.21.2.103

zkServers=172.21.2.103:2181,172.21.2.104:2181,172.21.2.105:2181

journalDirectories=/home/kafka/data/bookkeeper/journal

ledgerDirectories=/home/kafka/data/bookkeeper/ledgers

prometheusStatsHttpPort=8100

注意:

  1. prometheusStatsHttpPort默认是8000,但实际上在bookkeeper.conf中,httpServerPort默认也是8000,会导致端口被占用。
  2. 上面的advertisedAddress需要设置为对应机器的ip,而不是全设置为同一个
    参数说明:
    advertisedAddress:指定当前节点的主机名或IP地址
    zkServers:指定zookeeper集群,用来将bookkeeper节点的元数据存放在zookeeper集群
    journalDirectories:当前bookkeeper节点的journal数据存放目录。
    如果需要提高磁盘写入性能,可以指定多个目录用来存放journal数据,关键是每一个目录必须在不同的磁盘,不然反而会影响写入性能
    ledgerDirectories:当前bookkeeper节点的ledger存放目录
  1. 在每个部署bookkeeper的机器上,创建bookie所需要目录:

mkdir -pv /home/kafka/data/bookkeeper/

mkdir -pv /home/kafka/data/bookkeeper/journal

mkdir -pv /home/kafka/data/bookkeeper/ledgers

  1. 执行初始化元数据命令,若出现提示,输入Y继续(<font color=#FF0000 >该步骤只需在一个bookie节点执行一次,总共只需执行一次</font>):

bin/bookkeeper shell metaformat

  1. 在三台机器上,分别输入以下命令来以后台进程启动bookie:

bin/pulsar-daemon start bookie

  1. 验证是否启动成功:

bin/bookkeeper shell bookiesanity

出现Bookie sanity test succeeded则代表启动成功。

如果需要关闭bookkeeper,可使用命令

bin/pulsar-daemon stop bookie

3.3 Broker集群部署

  1. 在每个部署Broker的机器上,通过vim conf/broker.conf来编辑Broker配置文件,修改如下关键配置项:

zookeeperServers=172.21.2.103:2181,172.21.2.104:2181,172.21.2.105:2181

configurationStoreServers=172.21.2.103:2181,172.21.2.104:2181,172.21.2.105:2181

advertisedAddress=172.21.2.103

#clusterName与前面zookeeper初始化的cluster一致

clusterName=pulsar-cluster-zk

注意:
上面的advertisedAddress需要设置为对应机器的ip,而不是全设置为同一个
参数说明:
zookeeperServers:指定zookeeper集群,用来将broker节点的元数据存放在zookeeper集群
configurationStoreServers:多集群部署时管理多个pulsar集群元数据的zookeeper集群地址,单集群部署时可以和zookeeperServers设置一样
advertisedAddress:指定当前节点的主机名或IP地址
clusterName:指定pulsar集群名称

  1. 在每个部署Broker的机器上,以后台进程启动broker

bin/pulsar-daemon start broker

​ 如果需要关闭broker,可使用命令

bin/pulsar-daemon stop broker

  1. 查看集群 brokers 节点情况

bin/pulsar-admin brokers list pulsar-cluster

我们的示例部署正常的话,这一步会显示如下结果;


  172.21.2.103:8080

  172.21.2.104:8080

  172.21.2.105:8080

代表此时集群内有存活的节点: 172.21.2.103、172.21.2.104、172.21.2.105,端口号都是8080。到这一步,Pulsar的部署就完成了。

Java Demo示例

pom.xml文件


<!-- in your <properties> block -->

<pulsar.version>2.6.1</pulsar.version>

<!-- in your <dependencies> block -->

<dependency>

  <groupId>org.apache.pulsar</groupId>

  <artifactId>pulsar-client</artifactId>

  <version>${pulsar.version}</version>

</dependency>

Producer demo


import com.google.common.collect.Lists;

import org.apache.pulsar.client.api.*;

import java.util.List;

import java.util.concurrent.CompletableFuture;

import java.util.concurrent.TimeUnit;

public class ProducerDemo {

    /*

    *

    * @param null

    * @return

    * @exception

    * @author guojiangtao

    * @date 2020/11/6 13:44

    */

    //Pulsar集群中broker的serviceurl

    private static final String brokerServiceurl = "pulsar://172.21.2.103:6650,172.21.2.104:6650,172.21.2.105:6650";

    //指定topic name

    private static final String topicName = "persistent://public/default/my-topic20201102";

    public static void main(String[] args) throws PulsarClientException {

        //构造Pulsar client

        PulsarClient client = PulsarClient.builder()

                .serviceUrl(brokerServiceurl)

                .build();

        //创建producer

        Producer<byte[]> producer = client.newProducer()

                .topic(topicName)

                .enableBatching(true)//是否开启批量处理消息,默认true,需要注意的是enableBatching只在异步发送sendAsync生效,同步发送send失效。因此建议生产环境若想使用批处理,则需使用异步发送,或者多线程同步发送

                .compressionType(CompressionType.LZ4)//消息压缩(四种压缩方式:LZ4,ZLIB,ZSTD,SNAPPY),consumer端不用做改动就能消费,开启后大约可以降低3/4带宽消耗和存储(官方测试)

                .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS) //设置将对发送的消息进行批处理的时间段,10ms;可以理解为若该时间段内批处理成功,则一个batch中的消息数量不会被该参数所影响。

                .sendTimeout(0, TimeUnit.SECONDS)//设置发送超时0s;如果在sendTimeout过期之前服务器没有确认消息,则会发生错误。默认30s,设置为0代表无限制,建议配置为0

                .batchingMaxMessages(1000)//批处理中允许的最大消息数。默认1000

                .maxPendingMessages(1000)//设置等待接受来自broker确认消息的队列的最大大小,默认1000

                .blockIfQueueFull(true)//设置当消息队列中等待的消息已满时,Producer.send 和 Producer.sendAsync 是否应该block阻塞。默认为false,达到maxPendingMessages后send操作会报错,设置为true后,send操作阻塞但是不报错。建议设置为true

                .roundRobinRouterBatchingPartitionSwitchFrequency(10)//向不同partition分发消息的切换频率,默认10ms,可根据batch情况灵活调整

                .batcherBuilder(BatcherBuilder.DEFAULT)//key_Shared模式要用KEY_BASED,才能保证同一个key的message在一个batch里

                .create();

        ProducerDemo pro = new ProducerDemo();

        //异步发送100条消息

        pro.AsyncSend(client, producer);

        //同步发送100条消息

//        pro.SyncSend(client, producer);

    }

    public void AsyncSend(PulsarClient client, Producer producer) throws PulsarClientException {

        /*

        * 异步发送

        * @param client

        * @param producer

        * @return void

        * @exception

        * @author guojiangtao

        * @date 2020/11/6 13:36

        */

        List<CompletableFuture<MessageId>> futures = Lists.newArrayList();

        for (int i = 0; i < 100; i++) {

            final String content = "my-AsyncSend-message-" + i;

            CompletableFuture<MessageId> future = producer.sendAsync(content.getBytes());//异步发送

            future.handle((v, ex) -> {

                if (ex == null) {

                    System.out.println("Message persisted: " + content);

                } else {

                    System.out.println("Error persisting message: " + content + ex);

                }

                return null;

            });

            futures.add(future);

        }

        System.out.println("Waiting for async ops to complete");

        for (CompletableFuture<MessageId> future : futures) {

            future.join();

        }

        System.out.println("All operations completed");

        producer.close();//关闭producer

        client.close();//关闭client

    }

    public void SyncSend(PulsarClient client, Producer producer) throws PulsarClientException {

        /*

        * 同步发送

        * @param client

        * @param producer

        * @return void

        * @exception

        * @author guojiangtao

        * @date 2020/11/6 13:39

        */

        for (int i = 0; i < 100; i++) {

            final String content = "my-SyncSend-message-" + i;

            producer.send((content).getBytes());//同步发送

            System.out.println("Send message: " + content);

        }

        producer.close();//关闭producer

        client.close();//关闭client

    }

}

Consumer demo


import org.apache.pulsar.client.api.*;

import java.util.concurrent.TimeUnit;

/*

*

* @param null

* @return

* @exception

* @author guojiangtao

* @date 2020/11/2 17:10

*/

public class ConsumerDemo {

    //Pulsar集群中broker的serviceurl

    private static final String brokerServiceurl = "pulsar://172.21.2.103:6650,172.21.2.104:6650,172.21.2.105:6650";

    //需要订阅的topic name

    private static final String topicName = "persistent://public/default/my-topic20201102";

    //订阅名

    private static final String subscriptionName = "my-sub";

    public static void main(String[] args) throws PulsarClientException {

        //构造Pulsar client

        PulsarClient client = PulsarClient.builder()

                .serviceUrl(brokerServiceurl)

                .build();

        //创建consumer

        Consumer consumer = client.newConsumer()

                .topic(topicName)

                .subscriptionName(subscriptionName)

                .subscriptionType(SubscriptionType.Exclusive)//指定消费模式,包含:Exclusive,Failover,Shared,Key_Shared。默认Exclusive模式

                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)//指定从哪里开始消费还有Latest,valueof可选,默认Latest

                .negativeAckRedeliveryDelay(60, TimeUnit.SECONDS)//指定消费失败后延迟多久broker重新发送消息给consumer,默认60s

                .subscribe();

        //消费消息

        while (true) {

            Message message = consumer.receive();

            try {

                System.out.printf("Message received: %s%n", new String(message.getData()));

                consumer.acknowledge(message);

            } catch (Exception e) {

                e.printStackTrace();

                consumer.negativeAcknowledge(message);

            }

        }

    }

}

©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350

推荐阅读更多精彩内容