Apache Pulsar 源码走读(三)TopicLookup 请求处理(一)

简单逻辑说明

  1. 通过topic名字确定namespace
  2. 查找这个namespace的bundle分配信息
  3. 根据bundle分配信息来确认这个topic属于哪个bundle
  4. 根据bundle信息来确认哪个broker负责这个bundle,返回broker的地址。

CommandLookup 主要用来查找Topic在被哪个broker负责。
一般客户端可以通过http协议或者二进制协议来查询。

message CommandLookupTopic {
    // topic 名字
    required string topic            = 1;
    // 网络层请求id
    required uint64 request_id       = 2;
    optional bool authoritative      = 3 [default = false];

    // TODO - Remove original_principal, original_auth_data, original_auth_method
    // Original principal that was verified by
    // a Pulsar proxy.
    optional string original_principal = 4;

    // Original auth role and auth Method that was passed
    // to the proxy.
    optional string original_auth_data = 5;
    optional string original_auth_method = 6;
    
   // 从哪个指定的连接点进行连接
    optional string advertised_listener_name = 7;
}

这里直接看服务端的代码ServerCnx

protected void handleLookup(CommandLookupTopic lookup) {
        final long requestId = lookup.getRequestId();
        final boolean authoritative = lookup.isAuthoritative();
        final String advertisedListenerName = lookup.hasAdvertisedListenerName() ? lookup.getAdvertisedListenerName()
                : null;
       // 校验topic名字
        TopicName topicName = validateTopicName(lookup.getTopic(), requestId, lookup);
        if (topicName == null) {
            return;
        }
       // 这里的Semaphore 是服务端Lookup请求的限流器
        final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();

        if (lookupSemaphore.tryAcquire()) {
            ....

            isTopicOperationAllowed(topicName, TopicOperation.LOOKUP)
            .thenApply(isAuthorized -> {

                // 通过鉴权
                if (isAuthorized) {
                    lookupTopicAsync(getBrokerService().pulsar(),
                            topicName,
                            authoritative,
                            getPrincipal(),
                            getAuthenticationData(),
                            requestId,
                            advertisedListenerName)
                            .handle((lookupResponse, ex) -> {
                                if (ex == null) {
                                    ctx.writeAndFlush(lookupResponse);
                                } else {
                                    ....
                                }
                                lookupSemaphore.release();
                                return null;
                            });
                } else {
                    ....
            }).exceptionally(ex -> {
                ....
            });
        } else {
            // 如果有异常是发送的`CommandLookupTopicResponse`
            // 这里已经是新的定义二进制消息的方式了
            // / Wire format
            // [TOTAL_SIZE] [CMD_SIZE][CMD]
ctx.writeAndFlush(newLookupErrorResponse(ServerError.TooManyRequests,
                    "Failed due to too many pending lookup requests", requestId));
        }
    }

TopicLookupBase.lookupTopicAsync

org.apache.pulsar.broker.lookup.TopicLookupBase#lookupTopicAsync
这个是一个静态方法
主要

  1. validation 校验集群,topic名字等(这里面有跨集群检查的逻辑,先略过)
  2. lookup逻辑

这里校验的逻辑先略过了,实际核心的逻辑在下面这2行上。

LookupOptions options = LookupOptions.builder()
                        .authoritative(authoritative)
                        .advertisedListenerName(advertisedListenerName)
                        .loadTopicsInBundle(true)    // 这里这个条件是true
                        .build();
                
pulsarService.getNamespaceService().getBrokerServiceUrlAsync(topicName, options)

这里面的主要逻辑在NamespaceService里面,PulsarService 可以认为是一个全局对象,pulsar需要的任何核心逻辑对象
(比如说NamspaceService,BrokerService,ConfigurationCacheService等)你都可以从这个对象里面拿到。

NamespaceService.getBrokerServiceUrlAsync

这里面的主要逻辑是
根据传递过来的topic名字定位namespace
之后确认这个topic属于哪个NamespaceBundle。
之后根据这个NamespaceBundle 来找到这个bundle 的owner broker的地址。


public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(TopicName topic, LookupOptions options) {
      ....

CompletableFuture<Optional<LookupResult>> future = getBundleAsync(topic)
                .thenCompose(bundle -> findBrokerServiceUrl(bundle, options));
      ....
}

public CompletableFuture<NamespaceBundle> getBundleAsync(TopicName topic) {
        return bundleFactory.getBundlesAsync(topic.getNamespaceObject())
                .thenApply(bundles -> bundles.findBundle(topic));
}

这里面的bundleFactory实际上是一个异步加载的cache。

我们看一下定义

// org.apache.pulsar.common.naming.NamespaceBundleFactory
private final AsyncLoadingCache<NamespaceName, NamespaceBundles> bundlesCache;

// 构造函数里面
public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) {
    // .....
this.bundlesCache = Caffeine.newBuilder()
                .recordStats()   // 记录metric
                .buildAsync(
// 加载cache 的逻辑
(NamespaceName namespace, Executor executor) -> {
            String path = AdminResource.joinPath(LOCAL_POLICIES_ROOT, namespace.toString());
            
             ....

            CompletableFuture<NamespaceBundles> future = new CompletableFuture<>();
            // Read the static bundle data from the policies
            pulsar
                  .getLocalZkCacheService()   // 获取LocalZooKeeperCacheService
                  .policiesCache()  
                  .getWithStatAsync(path)
                  .thenAccept(result -> {

                // 这里实际是去找有没有单独为这个namespace配置bundle数量
                BundlesData bundlesData = result.map(Entry::getKey).map(p -> p.bundles).orElse(null);

                 // 通过namespace拿到namespaceBundle
                NamespaceBundles namespaceBundles = getBundles(
                    namespace, bundlesData, result.map(Entry::getValue).map(s -> s.getVersion()).orElse(-1));

                ....

                future.complete(namespaceBundles);
            }).exceptionally(ex -> {
                future.completeExceptionally(ex);
                return null;
            });
            return future;
        });
      // .....
}

这里简单说一下NamespaceBundles 这个类,这个类会保存这个Namespace的所有NamespaceBundle,提供一个聚合的视图。

这个类表示一个hash环,这个环按照配置的分片个数,会被分成几个片段,
每个broker会按照一定算法来确定这个环上的哪一部分属于他自己。
topic也会按照一定的算法分配到这个hash环上。
这样broker就能确定自己负责哪些topic。
就可以返回lookup请求了,这个流程也会触发topic的加载流程。

NamespaceBundles.findBundle

这个函数就是确定这个topic属于哪个NamespaceBundle


 // 映射topic到hash环上的一段, 这一段就被NamespaceBundle 标识
public NamespaceBundle findBundle(TopicName topicName) {
        checkArgument(this.nsname.equals(topicName.getNamespaceObject()));
        long hashCode = factory.getLongHashCode(topicName.toString());
        NamespaceBundle bundle = getBundle(hashCode);
        if (topicName.getDomain().equals(TopicDomain.non_persistent)) {
            bundle.setHasNonPersistentTopic(true);
        }
        return bundle;
    }

到这一步我们就能确定这个namespace的信息了,namespce被分为多少个bundle。
而且可以确定这个topic属于哪个namespacebundle。
下一步是根据namespaceBundle查找负责的broker。

NamespaceService.findBrokerServiceUrl

到这里是根据namespacebundle 确定broker

// 这个记录的是一个broker的元数据信息
public class NamespaceEphemeralData {
    private String nativeUrl;
    private String nativeUrlTls;
    private String httpUrl;
    private String httpUrlTls;
    private boolean disabled;
    private Map<String, AdvertisedListener> advertisedListeners;
}


private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
            NamespaceBundle bundle, LookupOptions options) {
       

        ConcurrentOpenHashMap<NamespaceBundle, CompletableFuture<Optional<LookupResult>>> targetMap;
        
        return targetMap.computeIfAbsent(bundle, (k) -> {
            CompletableFuture<Optional<LookupResult>> future = new CompletableFuture<>();

            
            // First check if we or someone else already owns the bundle
            ownershipCache.getOwnerAsync(bundle)
                 
                    .thenAccept(nsData -> {
               // nsData : Optional<NamespaceEphemeralData>
                if (!nsData.isPresent()) {
                    // 如果没找到这个信息
                    if (options.isReadOnly()) {
                        // Do not attempt to acquire ownership
                        future.complete(Optional.empty());
                    } else {
                        // 目前还没有人负责这个bundle 尝试查找这个bundle的owner
                        pulsar.getExecutor().execute(() -> {
                            searchForCandidateBroker(bundle, future, options);
                        });
                    }
                } else if (nsData.get().isDisabled()) {
                    // namespce 正在unload
                    future.completeExceptionally(
                            new IllegalStateException(String.format("Namespace bundle %s is being unloaded", bundle)));
                } else {
                    // 到这里是找到了的逻辑,直接拼接正常的response就行了
                    ... 
                    // find the target
                    future.complete(Optional.of(new LookupResult(nsData.get())));
                }
            }).exceptionally(exception -> {
                ... 
            });

            // 这里实际上是使用这个targetMap来做一个锁的结构避免多次加载。  
            //  https://github.com/apache/pulsar/pull/1527
            future.whenComplete((r, t) -> pulsar.getExecutor().execute(
                () -> targetMap.remove(bundle)
            ));

            return future;
        });
    }

这样如果cache中存在这个topic的owner信息,就可以直接返回。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,640评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,254评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,011评论 0 355
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,755评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,774评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,610评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,352评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,257评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,717评论 1 315
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,894评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,021评论 1 350
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,735评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,354评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,936评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,054评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,224评论 3 371
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,974评论 2 355

推荐阅读更多精彩内容