Apache Pulsar 是一个存储和队列分层的消息中间件,因为无状态所以可以支持动态扩展,毫无疑问是目前最适合云原生的消息中间件,并且支持支持多租户,跨地域复制,零数据丢等,目前最新版本是2.11.
使用docker部署单机版docker安装pulsar非常简单一行命令就行
docker run -it \
-p 6650:6650 \
-p 8080:8080 \
--mount source=pulsardata,target=/pulsar/data \
--mount source=pulsarconf,target=/pulsar/conf \
apachepulsar/pulsar:2.11.0 \
bin/pulsar standalone
安装manager可视化管理界面
docker run -it \
-p 9527:9527 -p 7750:7750 \
-e SPRING_CONFIGURATION_FILE=/pulsar-manager/pulsar-manager/application.properties \
apachepulsar/pulsar-manager:v0.3.0
给管理后台添加一个默认账号
CSRF_TOKEN=$(curl http://localhost:7750/pulsar-manager/csrf-token)
curl \
-H 'X-XSRF-TOKEN: $CSRF_TOKEN' \
-H 'Cookie: XSRF-TOKEN=$CSRF_TOKEN;' \
-H "Content-Type: application/json" \
-X PUT http://localhost:7750/pulsar-manager/users/superuser \
-d '{"name": "admin", "password": "pulsar123", "description": "test", "email": "mingongge@test.org"}'
安装完成后访问http://192.168.30.130:9527
输入上面设置的admin/pulsar123 后就能进入后台页面
使用java测试Pulsar
maven 添加配置如下
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-all</artifactId>
<version>2.11.0</version>
</dependency>
Java代码如下
//服务器地址
static String SERVICE_URL = "pulsar://192.168.75.130:6650";
static String serviceHttpUrl = "http://192.168.75.130:8080";
1.发送消息
public static void send() throws PulsarClientException {
PulsarClient client = PulsarClient.builder().serviceUrl(SERVICE_URL).build();
/**
* 发送方式
* 默认是字节
*/
Producer<byte[]> producer = client.newProducer().topic("my-tenant/my-namespace/my-topic").create();
/**
* 可以设置成字符串简化
*/
Producer<String> stringProducer = client.newProducer(Schema.STRING).topic("my-tenant/my-namespace/my-topic").create();
producer.send("这是一条测试的字节消息".getBytes());
stringProducer.send("这是一条测试的字符串消息");
// producer.close();
// stringProducer.close();
producer.closeAsync().thenRun(() -> System.out.println("Producer 关闭")).exceptionally((ex) -> {
System.err.println("关闭Producer失败: " + ex);
return null;
});
stringProducer.closeAsync().thenRun(() -> System.out.println("Producer 关闭")).exceptionally((ex) -> {
System.err.println("关闭stringProducer失败: " + ex);
return null;
});
}
2.接受消息
public static void customer() throws PulsarClientException {
PulsarClient client = PulsarClient.builder().serviceUrl(SERVICE_URL).build();
Consumer consumer = client.newConsumer().topic("my-tenant/my-namespace/my-topic").subscriptionName("my-subscription").subscribe();
while (true) {
// Wait for a message
Message msg = consumer.receive();
try {
// Do something with the message
System.out.printf("接受到消息: %s", new String(msg.getData()));
System.out.println();
// Acknowledge the message so that it can be deleted by the message broker
consumer.acknowledge(msg);
} catch (Exception e) {
// Message failed to process, redeliver later
consumer.negativeAcknowledge(msg);
}
}
}
3.API的方式操作新增/删除租户
public static void addTant() throws PulsarAdminException, PulsarClientException {
// 1.创建pulsar的Admin管理对象
PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build();
// 2.2 查看当前有那些租户
List<String> tenants = pulsarAdmin.tenants().getTenants();
tenants.forEach(System.out::println);
// 2.基于pulsar的Admin对象进行相关的操作
// 2.1 创建租户
Set<String> allowedClusters = new HashSet<>();
allowedClusters.add("standalone");
TenantInfo tenantInfo = TenantInfo.builder().allowedClusters(allowedClusters).build();
pulsarAdmin.tenants().createTenant("my-tenant", tenantInfo);
// 2.2 查看当前有那些租户
// List<String> tenants = pulsarAdmin.tenants().getTenants();
tenants.forEach(System.out::println);
// 2.3 删除租户操作
pulsarAdmin.tenants().deleteTenant("my-tenant");
// 3.关闭管理对象
pulsarAdmin.close();
}
4.API方式操作新增/输出Namespace
public static void addNameSpace() throws PulsarAdminException, PulsarClientException {
// 1.创建pulsar的Admin管理对象
PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build();
// 2.基于pulsar的Admin对象进行相关的操作
// 2.1 创建名称空间
String tenant = "my-tenant";
String namespace = tenant + "/my-namespace";
pulsarAdmin.namespaces().createNamespace(namespace);
// 2.2 获取租户下的名称空间列表
List<String> namespaces = pulsarAdmin.namespaces().getNamespaces(tenant);
namespaces.forEach(System.out::println);
// 2.3 删除名称空间
pulsarAdmin.namespaces().deleteNamespace(namespace);
// 3.关闭管理对象
pulsarAdmin.close();
}
5.API方式操作TOPIC
public static void topicTest() throws PulsarAdminException, PulsarClientException {
// 1.创建pulsar的Admin管理对象
PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build();
// 2.基于pulsar的Admin对象进行相关的操作
// 2.1 创建topic相关操作: 有分区和没有分区, 以及持久化和非持久化
String tenant = "my-tenant";
String namespace = tenant + "/my-namespace";
String nonPartitionedTopicName = "non-persistent://my-tenant/my-namespace/my-non-partitioned-topic";
String partitionedTopicName = "persistent://my-tenant/my-namespace/my-partitioned-topic";
// 2.2 创建无分区的topic
pulsarAdmin.topics().createNonPartitionedTopic(nonPartitionedTopicName);
// 2.3 创建有分区的topic
pulsarAdmin.topics().createPartitionedTopic(partitionedTopicName, 3);
// 2.4 修改有分区的Topic的分区数量
pulsarAdmin.topics().updatePartitionedTopic(partitionedTopicName, 6);
// 2.5 查询当前有那些topic。如果一个topic有3个分区,则返回3个带-partition-N后缀的topic
List<String> topics = pulsarAdmin.topics().getList(namespace);
System.out.println("topic列表");
topics.forEach(System.out::println);
// 2.6 查询当前有分区的topic列表
List<String> partitionedTopicList = pulsarAdmin.topics().getPartitionedTopicList(namespace);
System.out.print("有分区的topic");
partitionedTopicList.forEach(System.out::println);
// 2.7 查询有分区的Topic,有多少个分区
int partitions = pulsarAdmin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions;
System.out.println(partitions);
// 2.8 删除无分区的Topic
pulsarAdmin.topics().delete(nonPartitionedTopicName);
// 2.9 删除有分区的Topic
pulsarAdmin.topics().deletePartitionedTopic(partitionedTopicName);
// 3.关闭管理对象
pulsarAdmin.close();
}
至此Pulsar单机部署测试完成