Pulsar认证和授权使用demo

创建token

有两种方式创建token:

1.通过命令,例如:

bin/pulsar tokens create --secret-key file:///pulsar/conf/secret.key --subject  test_user

2.通过java API接口调用:

pom文件中添加pulsar相关依赖:

<!-- https://mvnrepository.com/artifact/org.apache.pulsar/pulsar-common -->

<dependency>

    <groupId>org.apache.pulsar</groupId>

    <artifactId>pulsar-common</artifactId>

    <version>2.8.1</version>

</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.pulsar/pulsar-client -->

<dependency>

    <groupId>org.apache.pulsar</groupId>

    <artifactId>pulsar-client</artifactId>

    <version>2.8.1</version>

</dependency>

调用接口创建token demo:

Key signingKey;

privateKey ="file:///data/tdbank/pulsar/conf/secret.key";

byte[] encodedKey = AuthTokenUtils.readKeyFromUrl(privateKey);

signingKey = AuthTokenUtils.decodePrivateKey(encodedKey, algorithm);

Optional optExpiryTime = Optional.empty();

if (expiryTime !=null) {

long relativeTimeMillis = TimeUnit.SECONDS

            .toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds("30s"));

    optExpiryTime = Optional.of(new Date(System.currentTimeMillis() + relativeTimeMillis));

}

String token = AuthTokenUtils.createToken(signingKey, subject, optExpiryTime);


建议做到pulsar manager内部,然后manager提供一个http接口给用户调用,用户只需要发送申请的用户名,manager返回对应token即可。

授权

namespace维度权限申请

1.申请写权限,例如,为用户test_user1申请namespace=public/default的写权限:

sh bin/pulsar-admin namespaces grant-permission --actions produce --role test_user1  public/default

2.申请读权限:

sh bin/pulsar-admin namespaces grant-permission --actions consume  --role test_user1  public/default

3.同时申请读写权限:

sh bin/pulsar-admin namespaces grant-permission --actions produce,consume  --role test_user1  public/default

4.查看权限:

sh bin/pulsar-admin namespaces  permissions public/default

5.删除该namespace下,具体某个角色的所有权限:

sh bin/pulsar-admin namespaces  revoke-permission  public/default  --role test_user1

6.分配某个角色具体某个订阅组权限:

sh bin/pulsar-admin namespaces  grant-subscription-permission public/default --role test_user1  --subscription  sub1

7.删除某个juese具体某个订阅组的权限:

sh bin/pulsar-admin namespaces  revoke-subscription-permission public/default --role test_user1  --subscription  sub1

对应的API接口调用demo代码:

import com.beust.jcommander.ParameterException;

import org.apache.pulsar.client.admin.PulsarAdmin;

import org.apache.pulsar.client.admin.PulsarAdminBuilder;

import org.apache.pulsar.client.admin.PulsarAdminException;

import org.apache.pulsar.client.api.PulsarClientException;

import org.apache.pulsar.common.policies.data.AuthAction;

import java.util.*;

public class Test1 {

static Set getAuthActions(List actions) {

Set res =new TreeSet<>();

        AuthAction authAction;

        for (String action : actions) {

try {

authAction = AuthAction.valueOf(action);

            }catch (IllegalArgumentException exception) {

throw new ParameterException(String.format("Illegal auth action '%s'. Possible values: %s",

                        action, Arrays.toString(AuthAction.values())));

            }

res.add(authAction);

        }

return res;

    }

public static void main(String[] args)throws PulsarClientException, PulsarAdminException {

String authPlugin ="org.apache.pulsar.client.impl.auth.AuthenticationToken";

        // 这个是集群的admin超级管理员的token

        String authParams ="exxxxQ5CYAF74";

        PulsarAdminBuilder adminBuilder = PulsarAdmin.builder().allowTlsInsecureConnection(false)

.enableTlsHostnameVerification(false)

.useKeyStoreTls(false)

.tlsTrustStoreType("JKS")

.authentication(authPlugin, authParams)

.serviceHttpUrl("http://gz-csigshare-pulsar-discovery-1.tianqiong.woa.com:8080/");

        PulsarAdmin pulsaradmin = adminBuilder.build();

        List actions=new ArrayList();

        String namespace ="public/default";

        String role="test_user1";

        actions.add("produce");

        actions.add("consume");

        pulsaradmin.namespaces().grantPermissionOnNamespace(namespace, role, getAuthActions(actions));

        pulsaradmin.namespaces().revokePermissionsOnNamespace(namespace, role);

        Map> ret = pulsaradmin.namespaces().getPermissions(namespace);

        System.out.println(ret);

    }

}


topic维度权限申请

如果要支持topic维度的权限控制,服务端需要开启配置:topicLevelPoliciesEnabled=true

1.申请写权限,例如,为用户test_user1申请namespace=public/default/test1的写权限:

sh bin/pulsar-admin topics grant-permission --actions produce --role test_user1 public/default/test1

2.申请读权限:

sh bin/pulsar-admin topics grant-permission --actions consume --role test_user1 public/default/test1

3.同时申请读写权限:

sh bin/pulsar-admin topics grant-permission --actions produce,consume --role test_user1 public/default/test1

4.查看权限:

sh bin/pulsar-admin topics permissions public/default/test1

5.删除该topic的某个角色的所有权限:

sh bin/pulsar-admin topics revoke-permission public/default/test1 --role test_user1

对应的API接口调用demo代码:

import org.apache.pulsar.client.admin.PulsarAdmin;

import org.apache.pulsar.client.admin.PulsarAdminBuilder;

import org.apache.pulsar.client.admin.PulsarAdminException;

import org.apache.pulsar.client.api.PulsarClientException;

import org.apache.pulsar.common.policies.data.AuthAction;

import java.util.*;

public class Test2 {

static SetgetAuthActions(List actions) {

Set res =new TreeSet<>();

        AuthAction authAction;

        for (String action : actions) {

try {

authAction = AuthAction.valueOf(action);

            }catch (IllegalArgumentException exception) {

throw new ParameterException(String.format("Illegal auth action '%s'. Possible values: %s",

                        action, Arrays.toString(AuthAction.values())));

            }

res.add(authAction);

        }

return res;

    }

public static void main(String[] args)throws PulsarClientException, PulsarAdminException {

String topic ="public/default/test1";

        String authPlugin ="org.apache.pulsar.client.impl.auth.AuthenticationToken";

        // 这个是集群的admin超级管理员的token

        String authParams ="eyJhbxxxxxxF74";

        PulsarAdminBuilder adminBuilder = PulsarAdmin.builder().allowTlsInsecureConnection(false)

.enableTlsHostnameVerification(false)

.useKeyStoreTls(false)

.tlsTrustStoreType("JKS")

.authentication(authPlugin, authParams)

.serviceHttpUrl("http://gz-csigshare-pulsar-discovery-1.tianqiong.woa.com:8080/");

        PulsarAdmin pulsaradmin = adminBuilder.build();

        List actions=new ArrayList();

        String role="test_user1";

        actions.add("produce");

        actions.add("consume");

        pulsaradmin.topics().grantPermission(topic, role, getAuthActions(actions));

        pulsaradmin.topics().revokePermissions(topic, role);

        Map> ret = pulsaradmin.topics().getPermissions(topic);

        System.out.println(ret);

    }

}


对应的pom依赖:

    <groupId>org.apache.pulsar

    <artifactId>pulsar-client-tools

    <version>2.8.1


    <groupId>org.apache.pulsar

    <artifactId>pulsar-common

    <version>2.8.1

</dependency>

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容