Apache Pulsar 单机模式docker部署

Apache Pulsar 是一个存储和队列分层的消息中间件,因为无状态所以可以支持动态扩展,毫无疑问是目前最适合云原生的消息中间件,并且支持支持多租户,跨地域复制,零数据丢等,目前最新版本是2.11.

使用docker部署单机版docker安装pulsar非常简单一行命令就行

docker run -it \-p 6650:6650 \-p 8080:8080 \--mount source=pulsardata,target=/pulsar/data \--mount source=pulsarconf,target=/pulsar/conf \apachepulsar/pulsar:2.11.0 \bin/pulsar standalone

安装manager可视化管理界面

docker run-it \-p9527:9527-p7750:7750\-eSPRING_CONFIGURATION_FILE=/pulsar-manager/pulsar-manager/application.properties \  apachepulsar/pulsar-manager:v0.3.0

给管理后台添加一个默认账号

CSRF_TOKEN=$(curl http://localhost:7750/pulsar-manager/csrf-token)curl \-H'X-XSRF-TOKEN:$CSRF_TOKEN' \-H'Cookie:XSRF-TOKEN=$CSRF_TOKEN;' \-H"Content-Type: application/json"\-X PUT http://localhost:7750/pulsar-manager/users/superuser \-d'{"name":"admin","password":"pulsar123","description":"test","email":"mingongge@test.org"}'

安装完成后访问http://192.168.30.130:9527

输入上面设置的admin/pulsar123 后就能进入后台页面

image.png

image.png

image.png

image.png

使用java测试Pulsar

maven 添加配置如下

<dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client-all</artifactId><version>2.11.0</version></dependency>

Java代码如下

//服务器地址staticString SERVICE_URL="pulsar://192.168.75.130:6650";staticString serviceHttpUrl="http://192.168.75.130:8080";

1.发送消息

publicstaticvoidsend()throws PulsarClientException{PulsarClientclient=PulsarClient.builder().serviceUrl(SERVICE_URL).build();/**

        * 发送方式

        * 默认是字节

        */Producer<byte[]>producer=client.newProducer().topic("my-tenant/my-namespace/my-topic").create();/**

        * 可以设置成字符串简化

        */Producer<String>stringProducer=client.newProducer(Schema.STRING).topic("my-tenant/my-namespace/my-topic").create();producer.send("这是一条测试的字节消息".getBytes());stringProducer.send("这是一条测试的字符串消息");//  producer.close();//  stringProducer.close();producer.closeAsync().thenRun(()->System.out.println("Producer 关闭")).exceptionally((ex)->{System.err.println("关闭Producer失败: "+ex);returnnull;});stringProducer.closeAsync().thenRun(()->System.out.println("Producer 关闭")).exceptionally((ex)->{System.err.println("关闭stringProducer失败: "+ex);returnnull;});}

2.接受消息

publicstaticvoidcustomer()throws PulsarClientException{PulsarClientclient=PulsarClient.builder().serviceUrl(SERVICE_URL).build();Consumerconsumer=client.newConsumer().topic("my-tenant/my-namespace/my-topic").subscriptionName("my-subscription").subscribe();while(true){// Wait for a messageMessagemsg=consumer.receive();try{// Do something with the messageSystem.out.printf("接受到消息: %s",newString(msg.getData()));System.out.println();// Acknowledge the message so that it can be deleted by the message brokerconsumer.acknowledge(msg);}catch(Exceptione){// Message failed to process, redeliver laterconsumer.negativeAcknowledge(msg);}}}

3.API的方式操作新增/删除租户

publicstaticvoidaddTant()throws PulsarAdminException,PulsarClientException{// 1.创建pulsar的Admin管理对象PulsarAdmin pulsarAdmin=PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build();// 2.2 查看当前有那些租户List<String>tenants=pulsarAdmin.tenants().getTenants();tenants.forEach(System.out::println);// 2.基于pulsar的Admin对象进行相关的操作// 2.1 创建租户Set<String>allowedClusters=newHashSet<>();allowedClusters.add("standalone");TenantInfo tenantInfo=TenantInfo.builder().allowedClusters(allowedClusters).build();pulsarAdmin.tenants().createTenant("my-tenant",tenantInfo);// 2.2 查看当前有那些租户// List<String> tenants = pulsarAdmin.tenants().getTenants();tenants.forEach(System.out::println);// 2.3 删除租户操作pulsarAdmin.tenants().deleteTenant("my-tenant");// 3.关闭管理对象pulsarAdmin.close();}

4.API方式操作新增/输出Namespace

publicstaticvoidaddNameSpace()throws PulsarAdminException,PulsarClientException{// 1.创建pulsar的Admin管理对象PulsarAdmin pulsarAdmin=PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build();// 2.基于pulsar的Admin对象进行相关的操作// 2.1 创建名称空间String tenant="my-tenant";Stringnamespace=tenant+"/my-namespace";pulsarAdmin.namespaces().createNamespace(namespace);// 2.2 获取租户下的名称空间列表List<String>namespaces=pulsarAdmin.namespaces().getNamespaces(tenant);namespaces.forEach(System.out::println);// 2.3 删除名称空间pulsarAdmin.namespaces().deleteNamespace(namespace);// 3.关闭管理对象pulsarAdmin.close();}

5.API方式操作TOPIC

publicstaticvoidtopicTest()throws PulsarAdminException,PulsarClientException{// 1.创建pulsar的Admin管理对象PulsarAdmin pulsarAdmin=PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build();// 2.基于pulsar的Admin对象进行相关的操作// 2.1 创建topic相关操作: 有分区和没有分区, 以及持久化和非持久化String tenant="my-tenant";Stringnamespace=tenant+"/my-namespace";String nonPartitionedTopicName="non-persistent://my-tenant/my-namespace/my-non-partitioned-topic";String partitionedTopicName="persistent://my-tenant/my-namespace/my-partitioned-topic";// 2.2 创建无分区的topicpulsarAdmin.topics().createNonPartitionedTopic(nonPartitionedTopicName);// 2.3 创建有分区的topicpulsarAdmin.topics().createPartitionedTopic(partitionedTopicName,3);// 2.4 修改有分区的Topic的分区数量pulsarAdmin.topics().updatePartitionedTopic(partitionedTopicName,6);// 2.5 查询当前有那些topic。如果一个topic有3个分区,则返回3个带-partition-N后缀的topicList<String>topics=pulsarAdmin.topics().getList(namespace);System.out.println("topic列表");topics.forEach(System.out::println);// 2.6 查询当前有分区的topic列表List<String>partitionedTopicList=pulsarAdmin.topics().getPartitionedTopicList(namespace);System.out.print("有分区的topic");partitionedTopicList.forEach(System.out::println);// 2.7 查询有分区的Topic,有多少个分区int partitions=pulsarAdmin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions;System.out.println(partitions);// 2.8 删除无分区的TopicpulsarAdmin.topics().delete(nonPartitionedTopicName);// 2.9 删除有分区的TopicpulsarAdmin.topics().deletePartitionedTopic(partitionedTopicName);// 3.关闭管理对象pulsarAdmin.close();}

至此Pulsar单机部署测试完成

作者:StrangenessWind

链接:https://www.jianshu.com/p/641689047028

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容