Apache Pulsar 单机模式docker部署

Apache Pulsar 是一个存储和队列分层的消息中间件,因为无状态所以可以支持动态扩展,毫无疑问是目前最适合云原生的消息中间件,并且支持支持多租户,跨地域复制,零数据丢等,目前最新版本是2.11.

使用docker部署单机版docker安装pulsar非常简单一行命令就行

docker run -it \
-p 6650:6650 \
-p 8080:8080 \
--mount source=pulsardata,target=/pulsar/data \
--mount source=pulsarconf,target=/pulsar/conf \
apachepulsar/pulsar:2.11.0 \
bin/pulsar standalone

安装manager可视化管理界面

docker run -it \
   -p 9527:9527 -p 7750:7750 \
   -e SPRING_CONFIGURATION_FILE=/pulsar-manager/pulsar-manager/application.properties \
   apachepulsar/pulsar-manager:v0.3.0

给管理后台添加一个默认账号

 CSRF_TOKEN=$(curl http://localhost:7750/pulsar-manager/csrf-token)
curl \
  -H 'X-XSRF-TOKEN: $CSRF_TOKEN' \
  -H 'Cookie: XSRF-TOKEN=$CSRF_TOKEN;' \
  -H "Content-Type: application/json" \
  -X PUT http://localhost:7750/pulsar-manager/users/superuser \
  -d '{"name": "admin", "password": "pulsar123", "description": "test", "email": "mingongge@test.org"}'

安装完成后访问http://192.168.30.130:9527
输入上面设置的admin/pulsar123 后就能进入后台页面

image.png

image.png
image.png
image.png

使用java测试Pulsar

maven 添加配置如下

  <dependency>
      <groupId>org.apache.pulsar</groupId>
      <artifactId>pulsar-client-all</artifactId>
      <version>2.11.0</version>
    </dependency>

Java代码如下

     //服务器地址
    static String SERVICE_URL = "pulsar://192.168.75.130:6650";
    static String serviceHttpUrl = "http://192.168.75.130:8080";

1.发送消息


    public static void send() throws PulsarClientException {
        PulsarClient client = PulsarClient.builder().serviceUrl(SERVICE_URL).build();
        /**
         * 发送方式
         * 默认是字节
         */
        Producer<byte[]> producer = client.newProducer().topic("my-tenant/my-namespace/my-topic").create();
        /**
         * 可以设置成字符串简化
         */
        Producer<String> stringProducer = client.newProducer(Schema.STRING).topic("my-tenant/my-namespace/my-topic").create();

        producer.send("这是一条测试的字节消息".getBytes());
        stringProducer.send("这是一条测试的字符串消息");
        //  producer.close();
        //  stringProducer.close();
        producer.closeAsync().thenRun(() -> System.out.println("Producer 关闭")).exceptionally((ex) -> {
            System.err.println("关闭Producer失败: " + ex);
            return null;
        });
        stringProducer.closeAsync().thenRun(() -> System.out.println("Producer 关闭")).exceptionally((ex) -> {
            System.err.println("关闭stringProducer失败: " + ex);
            return null;
        });
    }

2.接受消息


  public static void customer() throws PulsarClientException {
      PulsarClient client = PulsarClient.builder().serviceUrl(SERVICE_URL).build();
      Consumer consumer = client.newConsumer().topic("my-tenant/my-namespace/my-topic").subscriptionName("my-subscription").subscribe();
      while (true) {
          // Wait for a message
          Message msg = consumer.receive();
          try {
              // Do something with the message
              System.out.printf("接受到消息: %s", new String(msg.getData()));
              System.out.println();
              // Acknowledge the message so that it can be deleted by the message broker
              consumer.acknowledge(msg);
          } catch (Exception e) {
              // Message failed to process, redeliver later
              consumer.negativeAcknowledge(msg);
          }
      }
  }

3.API的方式操作新增/删除租户


  public static void addTant() throws PulsarAdminException, PulsarClientException {


      // 1.创建pulsar的Admin管理对象
      PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build();
      // 2.2 查看当前有那些租户
      List<String> tenants = pulsarAdmin.tenants().getTenants();
      tenants.forEach(System.out::println);
      // 2.基于pulsar的Admin对象进行相关的操作

      // 2.1 创建租户
      Set<String> allowedClusters = new HashSet<>();
      allowedClusters.add("standalone");
      TenantInfo tenantInfo = TenantInfo.builder().allowedClusters(allowedClusters).build();
      pulsarAdmin.tenants().createTenant("my-tenant", tenantInfo);

      // 2.2 查看当前有那些租户
      // List<String> tenants = pulsarAdmin.tenants().getTenants();
      tenants.forEach(System.out::println);

      // 2.3 删除租户操作
      pulsarAdmin.tenants().deleteTenant("my-tenant");

      // 3.关闭管理对象
      pulsarAdmin.close();
  }

4.API方式操作新增/输出Namespace


    public static void addNameSpace() throws PulsarAdminException, PulsarClientException {

        // 1.创建pulsar的Admin管理对象
        PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build();

        // 2.基于pulsar的Admin对象进行相关的操作

        // 2.1 创建名称空间
        String tenant = "my-tenant";
        String namespace = tenant + "/my-namespace";
        pulsarAdmin.namespaces().createNamespace(namespace);

        // 2.2 获取租户下的名称空间列表
        List<String> namespaces = pulsarAdmin.namespaces().getNamespaces(tenant);
        namespaces.forEach(System.out::println);

        // 2.3 删除名称空间
        pulsarAdmin.namespaces().deleteNamespace(namespace);

        // 3.关闭管理对象
        pulsarAdmin.close();
    }

5.API方式操作TOPIC


    public static void topicTest() throws PulsarAdminException, PulsarClientException {

        // 1.创建pulsar的Admin管理对象
        PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build();

        // 2.基于pulsar的Admin对象进行相关的操作

        // 2.1 创建topic相关操作: 有分区和没有分区, 以及持久化和非持久化
        String tenant = "my-tenant";
        String namespace = tenant + "/my-namespace";
        String nonPartitionedTopicName = "non-persistent://my-tenant/my-namespace/my-non-partitioned-topic";
        String partitionedTopicName = "persistent://my-tenant/my-namespace/my-partitioned-topic";

        // 2.2 创建无分区的topic
        pulsarAdmin.topics().createNonPartitionedTopic(nonPartitionedTopicName);

        // 2.3 创建有分区的topic
        pulsarAdmin.topics().createPartitionedTopic(partitionedTopicName, 3);

        // 2.4 修改有分区的Topic的分区数量
        pulsarAdmin.topics().updatePartitionedTopic(partitionedTopicName, 6);

        // 2.5 查询当前有那些topic。如果一个topic有3个分区,则返回3个带-partition-N后缀的topic
        List<String> topics = pulsarAdmin.topics().getList(namespace);
        System.out.println("topic列表");
        topics.forEach(System.out::println);

        // 2.6 查询当前有分区的topic列表
        List<String> partitionedTopicList = pulsarAdmin.topics().getPartitionedTopicList(namespace);
        System.out.print("有分区的topic");
        partitionedTopicList.forEach(System.out::println);

        // 2.7 查询有分区的Topic,有多少个分区
        int partitions = pulsarAdmin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions;
        System.out.println(partitions);

        // 2.8 删除无分区的Topic
        pulsarAdmin.topics().delete(nonPartitionedTopicName);

        // 2.9 删除有分区的Topic
        pulsarAdmin.topics().deletePartitionedTopic(partitionedTopicName);


        // 3.关闭管理对象
        pulsarAdmin.close();
    }

至此Pulsar单机部署测试完成

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容