Apache Pulsar 是一个存储和队列分层的消息中间件,因为无状态所以可以支持动态扩展,毫无疑问是目前最适合云原生的消息中间件,并且支持支持多租户,跨地域复制,零数据丢等,目前最新版本是2.11.
使用docker部署单机版docker安装pulsar非常简单一行命令就行
docker run -it \-p 6650:6650 \-p 8080:8080 \--mount source=pulsardata,target=/pulsar/data \--mount source=pulsarconf,target=/pulsar/conf \apachepulsar/pulsar:2.11.0 \bin/pulsar standalone
安装manager可视化管理界面
docker run-it \-p9527:9527-p7750:7750\-eSPRING_CONFIGURATION_FILE=/pulsar-manager/pulsar-manager/application.properties \ apachepulsar/pulsar-manager:v0.3.0
给管理后台添加一个默认账号
CSRF_TOKEN=$(curl http://localhost:7750/pulsar-manager/csrf-token)curl \-H'X-XSRF-TOKEN:$CSRF_TOKEN' \-H'Cookie:XSRF-TOKEN=$CSRF_TOKEN;' \-H"Content-Type: application/json"\-X PUT http://localhost:7750/pulsar-manager/users/superuser \-d'{"name":"admin","password":"pulsar123","description":"test","email":"mingongge@test.org"}'
安装完成后访问http://192.168.30.130:9527
输入上面设置的admin/pulsar123 后就能进入后台页面

image.png

image.png

image.png

image.png
使用java测试Pulsar
maven 添加配置如下
<dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client-all</artifactId><version>2.11.0</version></dependency>
Java代码如下
//服务器地址staticString SERVICE_URL="pulsar://192.168.75.130:6650";staticString serviceHttpUrl="http://192.168.75.130:8080";
1.发送消息
publicstaticvoidsend()throws PulsarClientException{PulsarClientclient=PulsarClient.builder().serviceUrl(SERVICE_URL).build();/**
* 发送方式
* 默认是字节
*/Producer<byte[]>producer=client.newProducer().topic("my-tenant/my-namespace/my-topic").create();/**
* 可以设置成字符串简化
*/Producer<String>stringProducer=client.newProducer(Schema.STRING).topic("my-tenant/my-namespace/my-topic").create();producer.send("这是一条测试的字节消息".getBytes());stringProducer.send("这是一条测试的字符串消息");// producer.close();// stringProducer.close();producer.closeAsync().thenRun(()->System.out.println("Producer 关闭")).exceptionally((ex)->{System.err.println("关闭Producer失败: "+ex);returnnull;});stringProducer.closeAsync().thenRun(()->System.out.println("Producer 关闭")).exceptionally((ex)->{System.err.println("关闭stringProducer失败: "+ex);returnnull;});}
2.接受消息
publicstaticvoidcustomer()throws PulsarClientException{PulsarClientclient=PulsarClient.builder().serviceUrl(SERVICE_URL).build();Consumerconsumer=client.newConsumer().topic("my-tenant/my-namespace/my-topic").subscriptionName("my-subscription").subscribe();while(true){// Wait for a messageMessagemsg=consumer.receive();try{// Do something with the messageSystem.out.printf("接受到消息: %s",newString(msg.getData()));System.out.println();// Acknowledge the message so that it can be deleted by the message brokerconsumer.acknowledge(msg);}catch(Exceptione){// Message failed to process, redeliver laterconsumer.negativeAcknowledge(msg);}}}
3.API的方式操作新增/删除租户
publicstaticvoidaddTant()throws PulsarAdminException,PulsarClientException{// 1.创建pulsar的Admin管理对象PulsarAdmin pulsarAdmin=PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build();// 2.2 查看当前有那些租户List<String>tenants=pulsarAdmin.tenants().getTenants();tenants.forEach(System.out::println);// 2.基于pulsar的Admin对象进行相关的操作// 2.1 创建租户Set<String>allowedClusters=newHashSet<>();allowedClusters.add("standalone");TenantInfo tenantInfo=TenantInfo.builder().allowedClusters(allowedClusters).build();pulsarAdmin.tenants().createTenant("my-tenant",tenantInfo);// 2.2 查看当前有那些租户// List<String> tenants = pulsarAdmin.tenants().getTenants();tenants.forEach(System.out::println);// 2.3 删除租户操作pulsarAdmin.tenants().deleteTenant("my-tenant");// 3.关闭管理对象pulsarAdmin.close();}
4.API方式操作新增/输出Namespace
publicstaticvoidaddNameSpace()throws PulsarAdminException,PulsarClientException{// 1.创建pulsar的Admin管理对象PulsarAdmin pulsarAdmin=PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build();// 2.基于pulsar的Admin对象进行相关的操作// 2.1 创建名称空间String tenant="my-tenant";Stringnamespace=tenant+"/my-namespace";pulsarAdmin.namespaces().createNamespace(namespace);// 2.2 获取租户下的名称空间列表List<String>namespaces=pulsarAdmin.namespaces().getNamespaces(tenant);namespaces.forEach(System.out::println);// 2.3 删除名称空间pulsarAdmin.namespaces().deleteNamespace(namespace);// 3.关闭管理对象pulsarAdmin.close();}
5.API方式操作TOPIC
publicstaticvoidtopicTest()throws PulsarAdminException,PulsarClientException{// 1.创建pulsar的Admin管理对象PulsarAdmin pulsarAdmin=PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build();// 2.基于pulsar的Admin对象进行相关的操作// 2.1 创建topic相关操作: 有分区和没有分区, 以及持久化和非持久化String tenant="my-tenant";Stringnamespace=tenant+"/my-namespace";String nonPartitionedTopicName="non-persistent://my-tenant/my-namespace/my-non-partitioned-topic";String partitionedTopicName="persistent://my-tenant/my-namespace/my-partitioned-topic";// 2.2 创建无分区的topicpulsarAdmin.topics().createNonPartitionedTopic(nonPartitionedTopicName);// 2.3 创建有分区的topicpulsarAdmin.topics().createPartitionedTopic(partitionedTopicName,3);// 2.4 修改有分区的Topic的分区数量pulsarAdmin.topics().updatePartitionedTopic(partitionedTopicName,6);// 2.5 查询当前有那些topic。如果一个topic有3个分区,则返回3个带-partition-N后缀的topicList<String>topics=pulsarAdmin.topics().getList(namespace);System.out.println("topic列表");topics.forEach(System.out::println);// 2.6 查询当前有分区的topic列表List<String>partitionedTopicList=pulsarAdmin.topics().getPartitionedTopicList(namespace);System.out.print("有分区的topic");partitionedTopicList.forEach(System.out::println);// 2.7 查询有分区的Topic,有多少个分区int partitions=pulsarAdmin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions;System.out.println(partitions);// 2.8 删除无分区的TopicpulsarAdmin.topics().delete(nonPartitionedTopicName);// 2.9 删除有分区的TopicpulsarAdmin.topics().deletePartitionedTopic(partitionedTopicName);// 3.关闭管理对象pulsarAdmin.close();}
至此Pulsar单机部署测试完成
作者:StrangenessWind
链接:https://www.jianshu.com/p/641689047028