pulsar伪分布式安装

0 缘起
Pulsar是一个支持多租户的、高性能的消息中间件。
2018年11月中旬开始初步在线上生产环境使用。
pulsar官网,部署教程文档还不是很详细,网络上的教程基本都是官网的翻译版,对于一个没有丰富经验的开发者还是会踩一些坑,本文记录一下相对详细测试集群搭建。
1 准备资源
一台主机(本文以macOS为例)
java8运行环境
2 搭建集群的组成
zk集群(3个ZooKeeper节点组成)
bookie集群(3个BookKeeper节点组成)
broker集群(3个Pulsar节点组成)
3 zk集群-搭建
ZooKeeper 版本 3.4.12。
使用一台机器,在该台机器上运行多个ZooKeeper 服务进程,搭建zk集群。

(3.1)下载zk,解压。
Zookeeper官网下载地址(zookeeper-3.4.12.tar.gz)
https://archive.apache.org/dist/zookeeper/zookeeper-3.4.12/

(3.2)将解压好的zookeeper-3.4.12复制到新建文件zookeepers目录下,重名为server1。用server1复制出server2和server3,目录结构如图所示。

(3.3)对server1进行配置(先对zk的一个节点进行配置)。
(3.3.1)在server1目录下,新建data和dataLog两个文件夹。目录结构如下。

(3.3.2)将conf目录下的zoo_sample.cfg文件重命名为zoo.cfg。如图所示。

(3.3.3)修改 zoo.cfg 文件内容,主要修改以下5个配置参数,以及添加集群节点信息。
*修改如下5个参数
dataDir
dataLogDir
clientPort
admin.enableServer
admin.serverPort

*添加如下集群节点信息
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890

我的server1的zoo.cfg 文件内容如下:

The number of milliseconds of each tick

tickTime=2000

The number of ticks that the initial

synchronization phase can take

initLimit=10

The number of ticks that can pass between

sending a request and getting an acknowledgement

syncLimit=10

the directory where the snapshot is stored.

do not use /tmp for storage, /tmp here is just

example sakes.

dataDir=/Users/gaotianci/Softwares/zookeepers/server1/data
dataLogDir=/Users/gaotianci/Softwares/zookeepers/server1/dataLog

the port at which the clients will connect

clientPort=2181

the maximum number of client connections.

increase this if you need to handle more clients

maxClientCnxns=60

admin.enableServer=true
admin.serverPort=9181

server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890

Be sure to read the maintenance section of the

administrator guide before turning on autopurge.

http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance

The number of snapshots to retain in dataDir

autopurge.snapRetainCount=3

Purge task interval in hours

Set to "0" to disable auto purge feature

autopurge.purgeInterval=1

对zoo.cfg 配置参数解释:
tickTime:zookeeper中使用的基本时间单位, 毫秒值,默认2000ms。
initLimit:用来配置Zookeeper服务器集群中Follower服务器初始化连接Leader服务器时最长能忍受多少个tickTime。这里设置为5表示最长容忍时间为10秒。
syncLimit:用来配置Leader与Follower之间发送消息、请求和应答时间最长能忍受多少个tickTime。这里设置为2表示最长容忍时间为4秒。
dataDir:数据文件目录。
dataLogDir:日志文件目录。
clientPort:监听client连接的端口号。
server.{myid}={ip}:{leader服务器交换信息的端口}:{当leader服务器挂了后, 选举leader的端口}
maxClientCnxns:对于一个客户端的连接数限制,默认是60。
admin.enableServer:是否启用zk管理后台。
admin.serverPort:管理后台端口号。


在一台服务器上,部署多个实例,需要指定不同的端口号。
[1]clientPort: 3个zk节点中分别配置为:2181,2182,2183
[2]admin.enableServer:3个zk节点中都配置为 true
[3]admin.serverPort:3个zk节点中分别配置为:9181,9182,9183
[4]集群节点信息:3个zk节点中都配置为:
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890

(3.3.4)在server1/data/目录下创建名字为myid文件。
向myid文件中写入内容 1

对myid文件的解释:

每一个zk节点的的myid内容都不能一样,它是不同节点的唯一标识。
myid文件内容,3个zk节点中分别为1,2,3

(3.3.5)server1配置完毕,对server2,server3做类似配置。具体不再赘述。
(3.3.6)启动zk集群,在终端分别用命令启动zk节点。
./bin/zkServer.sh start zk启动命令(zkServer.sh start)
./bin/zkServer.sh status zk状态查看命令(zkServer.sh status)
./bin/zkServer.sh stop zk关闭命令(zkServer.sh stop)

(3.3.7)./bin/zkServer.sh status 命令查看节点状态。

(3.3.8)用zk客户端命令(./bin/zkCli.sh -timeout 5000 -server 127.0.0.1:2181)连接zk节点,在这里连接server1。
(3.3.9)查看(ls /)zk集群当前内容。

(3.3.10)向zk集群写入元数据内容。用如下命令写入,后面解释写入内容。
create /--cluster pulsar-cluster
create /--zookeeper 127.0.0.1:2181
create /--configuration-store 127.0.0.1:2181
create /--web-service-url http://pulsar.cluster.com:8080
create /--web-service-url-tls https://pulsar.cluster.com:8443
create /--broker-service-url pulsar://pulsar.cluster.com:6650
create /--broker-service-url-tls pulsar+ssl://pulsar.cluster.com:6651

zk集群写入元数据内容解释:
--cluster
集群名称

--zookeeper
ZooKeeper集群连接参数,仅需要包含集群中的一个节点即可

--configuration-store
Pulsar实例的配置存储集群(ZooKeeper),和-zookeeper参数一样只需要包含集群中的一个节点即可

--web-service-url
集群Web服务的URL+端口,URL必须是一个i标准的DNS名称,默认端口8080,不建议修改。

--web-service-url-tls
集群Web提供TLS服务的URL+端口,端口默认8443,不建议修改。

--broker-service-url
集群brokers服务URL,URL中DNS的名称和Web服务保持一致,URL使用pulsar替代http/http,端口默认6650,不建议修改。

--broker-service-url-tls
集群brokers提供TLS服务的URL,默认端口6551,不建议修改。

4 bookie集群-搭建
bookie 是bookkeeper 的别称。版本 4.7.2。
使用一台机器,在该台机器上运行多个bookie 服务进程,搭建bookie集群。

(4.1)下载zk,解压。
bookkeeper官网下载地址(bookkeeper-server-4.7.2-bin.tar.gz)
https://bookkeeper.apache.org/releases/

(4.2)将解压好的bookkeeper-server-4.7.2复制到新建文件bookkeepers目录下,重名为bookie1。用bookie1复制出bookie2和bookie3,目录结构如图所示。

(4.3)对bookie1进行配置(先对bookkeeper的一个节点进行配置)。
(4.3.1)修改 bk_server.conf 文件内容,主要修改以下3个端口号,以防在一台主机上造成端口号冲突,以及添加zk集群节点信息。
*修改如下3个参数
bookiePort
httpServerPort
storageserver.grpc.port

*添加如下zk集群节点信息
zkServers=localhost:2181,localhost:2182,localhost:2183

在一台服务器上,部署多个实例,需要指定不同的端口号。
[1]bookiePort: 3个bookie节点中分别配置为:3181,3182,3183
[2]httpServerPort: 3个bookie节点中都配置为:8050,8060,8070
[3]storageserver.grpc.port: 3个bookie节点中都配置为: 4181,4182,4183
[4]zk集群节点信息:3个bookie中都配置为:
zkServers=localhost:2181,localhost:2182,localhost:2183

(4.3.2)bookie1配置完毕,对bookie2,bookie3做类似配置。具体不再赘述。
(4.3.3)执行初始化集群元数据命令(在一个bookie上执行即可),命令如下。
./bin/bookkeeper shell metaformat

(4.3.4)在终端,3个bookie下,分别执行命令启动bookie,命令如下。
./bin/bookkeeper bookie

(4.3.5)检查bookie启动状态,集群启动成功会有"Bookie sanity test succeeded"日志输出。在一个bookie 实例下执行如下命令。
./bin/bookkeeper shell bookiesanity

5 broker集群-搭建
broker 是pulsar实例别称。版本 2.2.0。
使用一台机器,在该台机器上运行多个broker 服务进程,搭建broker集群。

(5.1)下载pulsar,解压。
pulsar官网下载地址(apache-pulsar-2.2.0-bin-tar.gz)
http://pulsar.apache.org/zh-CN/download/

(5.2)将解压好的apache-pulsar-2.2.0复制到新建文件brokers目录下,重名为broker1。用broker1复制出broker2和broker3,目录结构如图所示。

(5.3)对broker1进行配置(先对broker的一个节点进行配置)。
(5.3.1)修改 broker.conf 文件内容,主要修改以下4个端口号,以防在一台主机上造成端口号冲突,以及添加zk集群节点信息。
*修改如下4个参数
brokerServicePort
brokerServicePortTls
webServicePort
webServicePortTls

*添加如下zk集群节点信息
zookeeperServers=localhost:2181,localhost:2182,localhost:2183
configurationStoreServers=localhost:2181,localhost:2182,localhost:2183

在一台服务器上,部署多个实例,需要指定不同的端口号。
[1]brokerServicePort: 3个broker节点中分别配置为:6650,6660,6670
[2]brokerServicePortTls: 3个broker节点中分别配置为:6651,6661,6671
[3]webServicePort: 3个broker节点中分别配置为:8080,8081,8082
[4]webServicePortTls: 3个broker节点中分别配置为:8443,8444,8445
[5]zk集群节点信息:3个broker中都配置为:
zookeeperServers=localhost:2181,localhost:2182,localhost:2183
configurationStoreServers=localhost:2181,localhost:2182,localhost:2183

(5.3.2)broker1配置完毕,对broker2,broker3做类似配置。具体不再赘述。
(5.3.3)在终端,3个broker下,分别执行命令启动broker,启动成功后会有日志“PulsarService started”输出,命令如下。
./bin/pulsar broker

6 pulsar集群启动完毕,现用命令创建集群名,租户名,命名空间,topic并行给出测试demo。
(6.1)依次创建集群,租户,命名空间,分区topic,并为命名空间指定集群。
创建集群(集群名:pulsar-cluster)
./bin/pulsar-admin clusters create --url http://pulsar.cluster.com:8080 pulsar-cluster
创建租户(租户名:my-tenant)
./bin/pulsar-admin tenants create my-tenant
创建命名空间(命名空间名,指定了租户my-tenant:my-tenant/my-namespace)
./bin/pulsar-admin namespaces create my-tenant/my-namespace
创建持久性分区topic(topic全名:persistent://my-tenant/my-namespace/my-topic)
./bin/pulsar-admin topics create-partitioned-topic persistent://my-tenant/my-namespace/my-topic -p 3
更新命名空间为其指定集群
./bin/pulsar-admin namespaces set-clusters my-tenant/my-namespace --clusters pulsar-cluster

(6.2)生产者,消费者测试demo。
(6.2.1)maven依赖
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>2.2.0</version>
</dependency>

(6.2.2)生产者
public class PulsarProducerDemo {
private static String localClusterUrl = "pulsar://localhost:6650";

public static void main(String[] args) {
    try {
        Producer<byte[]> producer = getProducer();
        String msg = "hello world pulsar!";

        Long start = System.currentTimeMillis();
        MessageId msgId = producer.send(msg.getBytes());
        System.out.println("spend=" + (System.currentTimeMillis() - start) + ";send a message msgId = " + msgId.toString());
    } catch (Exception e) {
        System.err.println(e);
    }
}

public static Producer<byte[]> getProducer() throws Exception {
    PulsarClient client;
    client = PulsarClient.builder().serviceUrl(localClusterUrl).build();
    Producer<byte[]> producer = client.newProducer().topic("persistent://my-tenant/my-namespace/my-topic").producerName("producerName").create();
    return producer;
}

}

(6.2.3)消费者
public class PulsarConsumerDemo {
private static String localClusterUrl = "pulsar://localhost:6650";
public static void main(String[] args) {
try {
//将订阅消费者指定的主题和订阅
Consumer<byte[]> consumer = getClient().newConsumer()
.topic("persistent://my-tenant/my-namespace/my-topic")
.subscriptionName("my-subscription")
.subscribe();
while (true) {
Message msg = consumer.receive();
System.out.printf("consumer-Message received: %s. \n", new String(msg.getData()));
// 确认消息,以便broker删除消息
consumer.acknowledge(msg);
}
} catch (Exception e) {
System.out.println(e);
}
}

public static PulsarClient getClient() throws Exception {
    PulsarClient client;
    client = PulsarClient.builder().serviceUrl(localClusterUrl).build();
    return client;
}

}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,457评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,837评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,696评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,183评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,057评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,105评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,520评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,211评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,482评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,574评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,353评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,897评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,489评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,683评论 2 335