Apache Pulsar 单机模式docker部署

Apache Pulsar 是一个存储和队列分层的消息中间件,因为无状态所以可以支持动态扩展,毫无疑问是目前最适合云原生的消息中间件,并且支持支持多租户,跨地域复制,零数据丢等,目前最新版本是2.11.

使用docker部署单机版docker安装pulsar非常简单一行命令就行

docker run -it \
-p 6650:6650 \
-p 8080:8080 \
--mount source=pulsardata,target=/pulsar/data \
--mount source=pulsarconf,target=/pulsar/conf \
apachepulsar/pulsar:2.11.0 \
bin/pulsar standalone

安装manager可视化管理界面

docker run -it \
   -p 9527:9527 -p 7750:7750 \
   -e SPRING_CONFIGURATION_FILE=/pulsar-manager/pulsar-manager/application.properties \
   apachepulsar/pulsar-manager:v0.3.0

给管理后台添加一个默认账号

 CSRF_TOKEN=$(curl http://localhost:7750/pulsar-manager/csrf-token)
curl \
  -H 'X-XSRF-TOKEN: $CSRF_TOKEN' \
  -H 'Cookie: XSRF-TOKEN=$CSRF_TOKEN;' \
  -H "Content-Type: application/json" \
  -X PUT http://localhost:7750/pulsar-manager/users/superuser \
  -d '{"name": "admin", "password": "pulsar123", "description": "test", "email": "mingongge@test.org"}'

安装完成后访问http://192.168.30.130:9527
输入上面设置的admin/pulsar123 后就能进入后台页面

image.png

image.png
image.png
image.png

使用java测试Pulsar

maven 添加配置如下

  <dependency>
      <groupId>org.apache.pulsar</groupId>
      <artifactId>pulsar-client-all</artifactId>
      <version>2.11.0</version>
    </dependency>

Java代码如下

     //服务器地址
    static String SERVICE_URL = "pulsar://192.168.75.130:6650";
    static String serviceHttpUrl = "http://192.168.75.130:8080";

1.发送消息


    public static void send() throws PulsarClientException {
        PulsarClient client = PulsarClient.builder().serviceUrl(SERVICE_URL).build();
        /**
         * 发送方式
         * 默认是字节
         */
        Producer<byte[]> producer = client.newProducer().topic("my-tenant/my-namespace/my-topic").create();
        /**
         * 可以设置成字符串简化
         */
        Producer<String> stringProducer = client.newProducer(Schema.STRING).topic("my-tenant/my-namespace/my-topic").create();

        producer.send("这是一条测试的字节消息".getBytes());
        stringProducer.send("这是一条测试的字符串消息");
        //  producer.close();
        //  stringProducer.close();
        producer.closeAsync().thenRun(() -> System.out.println("Producer 关闭")).exceptionally((ex) -> {
            System.err.println("关闭Producer失败: " + ex);
            return null;
        });
        stringProducer.closeAsync().thenRun(() -> System.out.println("Producer 关闭")).exceptionally((ex) -> {
            System.err.println("关闭stringProducer失败: " + ex);
            return null;
        });
    }

2.接受消息


  public static void customer() throws PulsarClientException {
      PulsarClient client = PulsarClient.builder().serviceUrl(SERVICE_URL).build();
      Consumer consumer = client.newConsumer().topic("my-tenant/my-namespace/my-topic").subscriptionName("my-subscription").subscribe();
      while (true) {
          // Wait for a message
          Message msg = consumer.receive();
          try {
              // Do something with the message
              System.out.printf("接受到消息: %s", new String(msg.getData()));
              System.out.println();
              // Acknowledge the message so that it can be deleted by the message broker
              consumer.acknowledge(msg);
          } catch (Exception e) {
              // Message failed to process, redeliver later
              consumer.negativeAcknowledge(msg);
          }
      }
  }

3.API的方式操作新增/删除租户


  public static void addTant() throws PulsarAdminException, PulsarClientException {


      // 1.创建pulsar的Admin管理对象
      PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build();
      // 2.2 查看当前有那些租户
      List<String> tenants = pulsarAdmin.tenants().getTenants();
      tenants.forEach(System.out::println);
      // 2.基于pulsar的Admin对象进行相关的操作

      // 2.1 创建租户
      Set<String> allowedClusters = new HashSet<>();
      allowedClusters.add("standalone");
      TenantInfo tenantInfo = TenantInfo.builder().allowedClusters(allowedClusters).build();
      pulsarAdmin.tenants().createTenant("my-tenant", tenantInfo);

      // 2.2 查看当前有那些租户
      // List<String> tenants = pulsarAdmin.tenants().getTenants();
      tenants.forEach(System.out::println);

      // 2.3 删除租户操作
      pulsarAdmin.tenants().deleteTenant("my-tenant");

      // 3.关闭管理对象
      pulsarAdmin.close();
  }

4.API方式操作新增/输出Namespace


    public static void addNameSpace() throws PulsarAdminException, PulsarClientException {

        // 1.创建pulsar的Admin管理对象
        PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build();

        // 2.基于pulsar的Admin对象进行相关的操作

        // 2.1 创建名称空间
        String tenant = "my-tenant";
        String namespace = tenant + "/my-namespace";
        pulsarAdmin.namespaces().createNamespace(namespace);

        // 2.2 获取租户下的名称空间列表
        List<String> namespaces = pulsarAdmin.namespaces().getNamespaces(tenant);
        namespaces.forEach(System.out::println);

        // 2.3 删除名称空间
        pulsarAdmin.namespaces().deleteNamespace(namespace);

        // 3.关闭管理对象
        pulsarAdmin.close();
    }

5.API方式操作TOPIC


    public static void topicTest() throws PulsarAdminException, PulsarClientException {

        // 1.创建pulsar的Admin管理对象
        PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build();

        // 2.基于pulsar的Admin对象进行相关的操作

        // 2.1 创建topic相关操作: 有分区和没有分区, 以及持久化和非持久化
        String tenant = "my-tenant";
        String namespace = tenant + "/my-namespace";
        String nonPartitionedTopicName = "non-persistent://my-tenant/my-namespace/my-non-partitioned-topic";
        String partitionedTopicName = "persistent://my-tenant/my-namespace/my-partitioned-topic";

        // 2.2 创建无分区的topic
        pulsarAdmin.topics().createNonPartitionedTopic(nonPartitionedTopicName);

        // 2.3 创建有分区的topic
        pulsarAdmin.topics().createPartitionedTopic(partitionedTopicName, 3);

        // 2.4 修改有分区的Topic的分区数量
        pulsarAdmin.topics().updatePartitionedTopic(partitionedTopicName, 6);

        // 2.5 查询当前有那些topic。如果一个topic有3个分区,则返回3个带-partition-N后缀的topic
        List<String> topics = pulsarAdmin.topics().getList(namespace);
        System.out.println("topic列表");
        topics.forEach(System.out::println);

        // 2.6 查询当前有分区的topic列表
        List<String> partitionedTopicList = pulsarAdmin.topics().getPartitionedTopicList(namespace);
        System.out.print("有分区的topic");
        partitionedTopicList.forEach(System.out::println);

        // 2.7 查询有分区的Topic,有多少个分区
        int partitions = pulsarAdmin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions;
        System.out.println(partitions);

        // 2.8 删除无分区的Topic
        pulsarAdmin.topics().delete(nonPartitionedTopicName);

        // 2.9 删除有分区的Topic
        pulsarAdmin.topics().deletePartitionedTopic(partitionedTopicName);


        // 3.关闭管理对象
        pulsarAdmin.close();
    }

至此Pulsar单机部署测试完成

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,142评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,298评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,068评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,081评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,099评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,071评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,990评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,832评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,274评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,488评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,649评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,378评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,979评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,625评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,643评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,545评论 2 352

推荐阅读更多精彩内容