一、添加pom.xml依赖
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>2.10.0</version>
</dependency>
二、tenant租户的Java API
public class PulsarTenant {
public static void main(String[] args) throws PulsarClientException, PulsarAdminException {
String serviceHttpUrl = "http://192.168.23.111:8080,192.168.23.112:8080,192.168.23.113:8080";
// 1.创建pulsar的Admin管理对象
PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build();
// 2.基于pulsar的Admin对象进行相关的操作
// 2.1 创建租户
Set<String> allowedClusters = new HashSet<>();
allowedClusters.add("pulsar-cluster");
TenantInfo tenantInfo = TenantInfo.builder().allowedClusters(allowedClusters).build();
pulsarAdmin.tenants().createTenant("my-tenant", tenantInfo);
// 2.2 查看当前有那些租户
List<String> tenants = pulsarAdmin.tenants().getTenants();
tenants.forEach(System.out::println);
// 2.3 删除租户操作
pulsarAdmin.tenants().deleteTenant("my-tenant");
// 3.关闭管理对象
pulsarAdmin.close();
}
}
三、namespace命令空间的Java API
public class PulsarNamespace {
public static void main(String[] args) throws PulsarAdminException, PulsarClientException {
String serviceHttpUrl = "http://192.168.23.111:8080,192.168.23.112:8080,192.168.23.113:8080";
// 1.创建pulsar的Admin管理对象
PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build();
// 2.基于pulsar的Admin对象进行相关的操作
// 2.1 创建名称空间
String tenant = "my-tenant";
String namespace = tenant + "/my-ns";
pulsarAdmin.namespaces().createNamespace(namespace);
// 2.2 获取租户下的名称空间列表
List<String> namespaces = pulsarAdmin.namespaces().getNamespaces(tenant);
namespaces.forEach(System.out::println);
// 2.3 删除名称空间
pulsarAdmin.namespaces().deleteNamespace(namespace);
// 3.关闭管理对象
pulsarAdmin.close();
}
}
四、topic的Java API
public class PulsarTopic {
public static void main(String[] args) throws PulsarClientException, PulsarAdminException {
String serviceHttpUrl = "http://192.168.23.111:8080,192.168.23.112:8080,192.168.23.113:8080";
// 1.创建pulsar的Admin管理对象
PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build();
// 2.基于pulsar的Admin对象进行相关的操作
// 2.1 创建topic相关操作: 有分区和没有分区, 以及持久化和非持久化
String tenant = "my-tenant";
String namespace = tenant + "/my-ns";
String nonPartitionedTopicName = "non-persistent://my-tenant/my-ns/my-non-partitioned-topic";
String partitionedTopicName = "persistent://my-tenant/my-ns/my-partitioned-topic";
// 2.2 创建无分区的topic
pulsarAdmin.topics().createNonPartitionedTopic(nonPartitionedTopicName);
// 2.3 创建有分区的topic
pulsarAdmin.topics().createPartitionedTopic(partitionedTopicName,3);
// 2.4 修改有分区的Topic的分区数量
pulsarAdmin.topics().updatePartitionedTopic(partitionedTopicName,6);
// 2.5 查询当前有那些topic。如果一个topic有3个分区,则返回3个带-partition-N后缀的topic
List<String> topics = pulsarAdmin.topics().getList(namespace);
topics.forEach(System.out::println);
// 2.6 查询当前有分区的topic列表
List<String> partitionedTopicList = pulsarAdmin.topics().getPartitionedTopicList(namespace);
partitionedTopicList.forEach(System.out::println);
// 2.7 查询有分区的Topic,有多少个分区
int partitions = pulsarAdmin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions;
System.out.println(partitions);
// 2.8 删除无分区的Topic
pulsarAdmin.topics().delete(nonPartitionedTopicName);
// 2.9 删除有分区的Topic
pulsarAdmin.topics().deletePartitionedTopic(partitionedTopicName);
// 3.关闭管理对象
pulsarAdmin.close();
}
}