Apache Pulsar集群部署手册
1 集群组成
- 搭建
Pulsar
集群至少需要 3 个组件:ZooKeeper
集群、BookKeeper
集群和broker
集群(Broker
是Pulsar
的自身实例)。这三个集群组件如下:
ZooKeeper
集群(多 个ZooKeeper
节点组成)bookie
集群(也称为BookKeeper
集群,多个BookKeeper
节点组成)broker
集群(多 个Pulsar
节点组成)
- Pulsar 的安装包已包含了搭建集群所需的各个组件库。无需单独下载 ZooKeeper 安装包和 BookKeeper 安装包。
2 准备工作
-
必须条件
在所需服务器上安装JDK(要求版本不低于 JDK 8),安装略过
将pulsar安装包上传至所需服务器,目前安装包可以在https://pulsar.apache.org/en/download/中下载
解压安装包,以/home/kafka路径为例,$
tar -zxvf apache-pulsar-2.6.1-bin.tar.gz -C /home/kafka/
-
安装建议
官方建议需要6台机器:
3 台用于运行
ZooKeeper
集群,建议使用性能较弱的机器,Pulsar
仅将ZooKeeper
用于与协调有关的定期任务和与配置有关的任务,而不用于基本操作。3 用于运行
bookie
集群和broker
集群,建议使用性能强劲的机器。
但是也可以在一台机器上同时部署
ZooKeeper
,bookie
,broker
,也就是最少需要三台机器就可以部署一个Pulsar 集群。其实也还可以在3台机器上部署
ZooKeeper
集群,另外3台机器部署bookie
集群,另3台机器部署broker
集群,也就是共需要消耗9台机器。总结,部署一个Pulsar 集群(包含一个ZooKeeper 集群(3 个 ZooKeeper 节点组成),一个bookie 集群(也称为 BookKeeper 集群,3 个 BookKeeper 节点组成),一个broker 集群(3 个 Pulsar 节点组成)),最少需要3台机器,官方建议6台机器,最多需要9台机器。
下文将以3台机器为例介绍部署方法,三台机器IP地址分别为:
172.21.2.103
172.21.2.104
172.21.2.105
3 部署流程
3.1 zookeeper安装
Pulsar安装包内包含了zookeeper,也可以自建zookeeper,自建zookeeper的方式略。安装Pulsar安装包内的zookeeper方法如下:
通过
cd /home/kafka/apache-pulsar-2.6.1-bin
进入到Pulsar根目录通过
vim ./conf/zookeeper.conf
修改配置文件(三个节点上都需执行此操作),新增或修改如下关键配置项:
#dataDir是修改,其他都是新增
dataDir=/home/kafka/data/zookeeper/data
dataLogDir=/home/kafka/data/zookeeper/log
server.1=172.21.2.103:2888:3888
server.2=172.21.2.104:2888:3888
server.3=172.21.2.105:2888:3888
参数说明:
dataDir:当前zookeeper节点的数据存放目录
dataLogDir:当前zookeeper节点的日志存放目录
server.1~3:为zookeeper集群的各节点指定编号
- 在每个zookeeper节点的机器上,新建如下文件目录:
-
data:ZooKeeper使用的数据存储目录
mkdir -pv /home/kafka/data/zookeeper/data
-
log:ZooKeeper使用的日志存储目录
mkdir -pv /home/kafka/data/zookeeper/log
- 为每个zookeeper节点新建myid,分别在指定的sever上写入配置文件中指定的编号:
-
在server.1服务器上执行bash命令:
echo 1 > /home/kafka/data/zookeeper/data/myid
-
在server.2服务器上执行bash命令:
echo 2 > /home/kafka/data/zookeeper/data/myid
-
在server.3服务器上执行bash命令:
echo 3 > /home/kafka/data/zookeeper/data/myid
- 执行后台运行命令,这个命令是启动zookeeper:
bin/pulsar-daemon start zookeeper
- 执行zookeeper客户端连接命令:
bin/pulsar zookeeper-shell
客户端正常连接,就算zookeeper启动好了
- 在另外两台服务器上也执行
bin/pulsar-daemon start zookeeper
之后,在其中一个zookeeper节点的机器上,初始化集群元数(总共只需执行一次):
例如在172.21.2.103上:
```bash
bin/pulsar initialize-cluster-metadata \
--cluster pulsar-cluster-zk \
--zookeeper 172.21.2.103:2181 \
--configuration-store 172.21.2.103:2181 \
--web-service-url http://172.21.2.103:8080,172.21.2.104:8080,172.21.2.105:8080 \
--web-service-url-tls https://172.21.2.103:8443,172.21.2.104:8443,172.21.2.105:8443 \
--broker-service-url pulsar://172.21.2.103:6650,172.21.2.104:6650,172.21.2.105:6650 \
--broker-service-url-tls pulsar+ssl://172.21.2.103:6651,172.21.2.104:6651,172.21.2.105:6651
```
> ##### 集群元数据说明
> - cluster
> 集群名称
> - zookeeper
> ZooKeeper集群连接参数,仅需要包含ZooKeeper集群中的一个节点即可
> - configuration-store
> Pulsar实例的配置存储集群(ZooKeeper),多集群部署时才会发挥作用,需要另外部署ZooKeeper集群,但是单集群部署时可以和--zookeeper参数设置一样,只需要包含ZooKeeper集群中的一个节点即可
> - web-service-url
> 集群Web服务的URL+端口,URL是一个标准的DNS名称,默认端口8080,不建议修改。
> - web-service-url-tls
> 集群Web提供TLS服务的URL+端口,端口默认8443,不建议修改。
> - broker-service-url
> 集群brokers服务URL,URL中DNS的名称和Web服务保持一致,URL使用pulsar替代http/http,端口默认6650,不建议修改。
> - broker-service-url-tls
> 集群brokers提供TLS服务的URL,默认端口6551,不建议修改。
>
> **ps:如果没有DNS服务器,也可以使用多主机(multi-host)格式的service-url设置web-service-url,web-service-url-tls,broker-service-url,broker-service-url-tls**
-
bin/pulsar zookeeper-shell
进入zk控制台,通过ls /查看所有zk节点。能看到bookies,ledgers等节点,则说明初始化成功了。
如果需要关闭zookeeper,可使用命令
bin/pulsar-daemon stop zookeeper
3.2 bookkeeper部署
- 在每个部署bookkeeper的机器上,通过vim conf/bookkeeper.conf来编辑bookkeeper配置文件,修改如下关键配置项:
advertisedAddress=172.21.2.103
zkServers=172.21.2.103:2181,172.21.2.104:2181,172.21.2.105:2181
journalDirectories=/home/kafka/data/bookkeeper/journal
ledgerDirectories=/home/kafka/data/bookkeeper/ledgers
prometheusStatsHttpPort=8100
注意:
- prometheusStatsHttpPort默认是8000,但实际上在bookkeeper.conf中,httpServerPort默认也是8000,会导致端口被占用。
- 上面的
advertisedAddress
需要设置为对应机器的ip,而不是全设置为同一个
参数说明:
advertisedAddress:指定当前节点的主机名或IP地址
zkServers:指定zookeeper集群,用来将bookkeeper节点的元数据存放在zookeeper集群
journalDirectories:当前bookkeeper节点的journal数据存放目录。
如果需要提高磁盘写入性能,可以指定多个目录用来存放journal数据,关键是每一个目录必须在不同的磁盘,不然反而会影响写入性能
ledgerDirectories:当前bookkeeper节点的ledger存放目录
- 在每个部署bookkeeper的机器上,创建bookie所需要目录:
mkdir -pv /home/kafka/data/bookkeeper/
mkdir -pv /home/kafka/data/bookkeeper/journal
mkdir -pv /home/kafka/data/bookkeeper/ledgers
- 执行初始化元数据命令,若出现提示,输入Y继续(<font color=#FF0000 >该步骤只需在一个bookie节点执行一次,总共只需执行一次</font>):
bin/bookkeeper shell metaformat
- 在三台机器上,分别输入以下命令来以后台进程启动bookie:
bin/pulsar-daemon start bookie
- 验证是否启动成功:
bin/bookkeeper shell bookiesanity
出现Bookie sanity test succeeded
则代表启动成功。
如果需要关闭bookkeeper,可使用命令
bin/pulsar-daemon stop bookie
3.3 Broker集群部署
- 在每个部署Broker的机器上,通过
vim conf/broker.conf
来编辑Broker配置文件,修改如下关键配置项:
zookeeperServers=172.21.2.103:2181,172.21.2.104:2181,172.21.2.105:2181
configurationStoreServers=172.21.2.103:2181,172.21.2.104:2181,172.21.2.105:2181
advertisedAddress=172.21.2.103
#clusterName与前面zookeeper初始化的cluster一致
clusterName=pulsar-cluster-zk
注意:
上面的advertisedAddress需要设置为对应机器的ip,而不是全设置为同一个
参数说明:
zookeeperServers:指定zookeeper集群,用来将broker节点的元数据存放在zookeeper集群
configurationStoreServers:多集群部署时管理多个pulsar集群元数据的zookeeper集群地址,单集群部署时可以和zookeeperServers设置一样
advertisedAddress:指定当前节点的主机名或IP地址
clusterName:指定pulsar集群名称
- 在每个部署Broker的机器上,以后台进程启动broker
bin/pulsar-daemon start broker
如果需要关闭broker,可使用命令
bin/pulsar-daemon stop broker
- 查看集群 brokers 节点情况
bin/pulsar-admin brokers list pulsar-cluster
我们的示例部署正常的话,这一步会显示如下结果;
172.21.2.103:8080
172.21.2.104:8080
172.21.2.105:8080
代表此时集群内有存活的节点: 172.21.2.103、172.21.2.104、172.21.2.105,端口号都是8080。到这一步,Pulsar的部署就完成了。
Java Demo示例
pom.xml文件
<!-- in your <properties> block -->
<pulsar.version>2.6.1</pulsar.version>
<!-- in your <dependencies> block -->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar.version}</version>
</dependency>
Producer demo
import com.google.common.collect.Lists;
import org.apache.pulsar.client.api.*;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class ProducerDemo {
/*
*
* @param null
* @return
* @exception
* @author guojiangtao
* @date 2020/11/6 13:44
*/
//Pulsar集群中broker的serviceurl
private static final String brokerServiceurl = "pulsar://172.21.2.103:6650,172.21.2.104:6650,172.21.2.105:6650";
//指定topic name
private static final String topicName = "persistent://public/default/my-topic20201102";
public static void main(String[] args) throws PulsarClientException {
//构造Pulsar client
PulsarClient client = PulsarClient.builder()
.serviceUrl(brokerServiceurl)
.build();
//创建producer
Producer<byte[]> producer = client.newProducer()
.topic(topicName)
.enableBatching(true)//是否开启批量处理消息,默认true,需要注意的是enableBatching只在异步发送sendAsync生效,同步发送send失效。因此建议生产环境若想使用批处理,则需使用异步发送,或者多线程同步发送
.compressionType(CompressionType.LZ4)//消息压缩(四种压缩方式:LZ4,ZLIB,ZSTD,SNAPPY),consumer端不用做改动就能消费,开启后大约可以降低3/4带宽消耗和存储(官方测试)
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS) //设置将对发送的消息进行批处理的时间段,10ms;可以理解为若该时间段内批处理成功,则一个batch中的消息数量不会被该参数所影响。
.sendTimeout(0, TimeUnit.SECONDS)//设置发送超时0s;如果在sendTimeout过期之前服务器没有确认消息,则会发生错误。默认30s,设置为0代表无限制,建议配置为0
.batchingMaxMessages(1000)//批处理中允许的最大消息数。默认1000
.maxPendingMessages(1000)//设置等待接受来自broker确认消息的队列的最大大小,默认1000
.blockIfQueueFull(true)//设置当消息队列中等待的消息已满时,Producer.send 和 Producer.sendAsync 是否应该block阻塞。默认为false,达到maxPendingMessages后send操作会报错,设置为true后,send操作阻塞但是不报错。建议设置为true
.roundRobinRouterBatchingPartitionSwitchFrequency(10)//向不同partition分发消息的切换频率,默认10ms,可根据batch情况灵活调整
.batcherBuilder(BatcherBuilder.DEFAULT)//key_Shared模式要用KEY_BASED,才能保证同一个key的message在一个batch里
.create();
ProducerDemo pro = new ProducerDemo();
//异步发送100条消息
pro.AsyncSend(client, producer);
//同步发送100条消息
// pro.SyncSend(client, producer);
}
public void AsyncSend(PulsarClient client, Producer producer) throws PulsarClientException {
/*
* 异步发送
* @param client
* @param producer
* @return void
* @exception
* @author guojiangtao
* @date 2020/11/6 13:36
*/
List<CompletableFuture<MessageId>> futures = Lists.newArrayList();
for (int i = 0; i < 100; i++) {
final String content = "my-AsyncSend-message-" + i;
CompletableFuture<MessageId> future = producer.sendAsync(content.getBytes());//异步发送
future.handle((v, ex) -> {
if (ex == null) {
System.out.println("Message persisted: " + content);
} else {
System.out.println("Error persisting message: " + content + ex);
}
return null;
});
futures.add(future);
}
System.out.println("Waiting for async ops to complete");
for (CompletableFuture<MessageId> future : futures) {
future.join();
}
System.out.println("All operations completed");
producer.close();//关闭producer
client.close();//关闭client
}
public void SyncSend(PulsarClient client, Producer producer) throws PulsarClientException {
/*
* 同步发送
* @param client
* @param producer
* @return void
* @exception
* @author guojiangtao
* @date 2020/11/6 13:39
*/
for (int i = 0; i < 100; i++) {
final String content = "my-SyncSend-message-" + i;
producer.send((content).getBytes());//同步发送
System.out.println("Send message: " + content);
}
producer.close();//关闭producer
client.close();//关闭client
}
}
Consumer demo
import org.apache.pulsar.client.api.*;
import java.util.concurrent.TimeUnit;
/*
*
* @param null
* @return
* @exception
* @author guojiangtao
* @date 2020/11/2 17:10
*/
public class ConsumerDemo {
//Pulsar集群中broker的serviceurl
private static final String brokerServiceurl = "pulsar://172.21.2.103:6650,172.21.2.104:6650,172.21.2.105:6650";
//需要订阅的topic name
private static final String topicName = "persistent://public/default/my-topic20201102";
//订阅名
private static final String subscriptionName = "my-sub";
public static void main(String[] args) throws PulsarClientException {
//构造Pulsar client
PulsarClient client = PulsarClient.builder()
.serviceUrl(brokerServiceurl)
.build();
//创建consumer
Consumer consumer = client.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Exclusive)//指定消费模式,包含:Exclusive,Failover,Shared,Key_Shared。默认Exclusive模式
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)//指定从哪里开始消费还有Latest,valueof可选,默认Latest
.negativeAckRedeliveryDelay(60, TimeUnit.SECONDS)//指定消费失败后延迟多久broker重新发送消息给consumer,默认60s
.subscribe();
//消费消息
while (true) {
Message message = consumer.receive();
try {
System.out.printf("Message received: %s%n", new String(message.getData()));
consumer.acknowledge(message);
} catch (Exception e) {
e.printStackTrace();
consumer.negativeAcknowledge(message);
}
}
}
}