PulsarClient 解析(一)

PulsarClient

PulsarClient client = PulsarClient.builder()
        .serviceUrl("pulsar://localhost:6650")
        .build();

让我们看一下这个类的主要方法


image-20210104211849635.png
  • 创建producer/consumer/reader

  • 元数据信息相关

  • transaction相关

  • close方法


ClientBuilder

这里有一个builder方法用来传递一些PulsarClient的配置

支持的配置项

  1. 连接配置相关:

    • 连接地址:serviceUrl / serviceUrlProvider / listener / proxyServiceUrl

    • operation超时时间: operationTimeout

    • tcp配置:

      • tcpNoDelay

      • keepAliveinterval

      • 建立连接超时:connectionTimeout

      • 一个broker创建多少连接

    • 请求重试策略(请求出错后backOff时间是多少)

  2. lookup请求配置:

    • lookup请求并发

    • 最大重定向次数

    • 连接最大拒绝的请求数目

  3. 线程数目:

    • ioThreads

    • listenerThreads

  4. TLS + 鉴权相关

  5. 事务相关

  6. metric相关

这里面Builder.build就直接配置参数传入了PulsarClientImpl的构造函数了

我们看下这里面做了什么操作

PulsarClientImpl

package org.apache.pulsar.client.impl;

public class PulsarClientImpl implements PulsarClient {

        // 查找服务
    private LookupService lookup;
    
    // 连接池
    private final ConnectionPool cnxPool;
    
    // netty 里面的HashedWheelTimer,用来调度一些延迟操作
    private final Timer timer;
    private final ExecutorProvider externalExecutorProvider;
    private final ExecutorProvider internalExecutorService;

        // 当前PulsarClient的状态
    private AtomicReference<State> state = new AtomicReference<>();
    
    // 所有的业务处理单元(客户端逻辑)
    private final Set<ProducerBase<?>> producers;
    private final Set<ConsumerBase<?>> consumers;

        // id发号器
    private final AtomicLong producerIdGenerator = new AtomicLong();
    private final AtomicLong consumerIdGenerator = new AtomicLong();
    private final AtomicLong requestIdGenerator = new AtomicLong();

      // 这里面的EventLoopGroup好像只被当成线程池来用了
    // 0. ConnectionPool 里面初始化作为连接的io线程池(netty客户端常规用法)
    // 1. 在Consumer里面用来定时flush PersistentAcknowledgmentsGroupingTracker
    // 2. Producer 里面用来定时生成加密的key
    // 3. 作为AsyncHttpClient的构造参数
    private final EventLoopGroup eventLoopGroup;

        // Schema 的cache
    private final LoadingCache<String, SchemaInfoProvider> schemaProviderLoadingCache;

    // producer 用来生成PublishTime
    private final Clock clientClock;

    @Getter
    private TransactionCoordinatorClientImpl tcClient;

这个类的构造函数主要就是初始化这几个关键变量,没有特殊操作

LookUpService根据配置参数会选择HttpLookupService 或者是BinaryProtoLookupService


ConnectionPool

我们先看一下ConnectionPool

package org.apache.pulsar.client.impl;

public class ConnectionPool implements Closeable {
  
    // 连接池,保存连接
    // 地址 -> 第x个连接 -> 连接
    // 如果配置maxConnectionsPerHosts=0 则把pooling关闭了
    protected final ConcurrentHashMap<InetSocketAddress, ConcurrentMap<Integer, CompletableFuture<ClientCnx>>> pool;
  
    // netty 相关
    // PulsarClient 传递过来的
    private final EventLoopGroup eventLoopGroup;
    private final Bootstrap bootstrap;
    private final PulsarChannelInitializer channelInitializerHandler;
    protected final DnsNameResolver dnsResolver;
  
    // 配置
    private final ClientConfigurationData clientConfig;
    private final int maxConnectionsPerHosts;
  
    // 是否是Server Name Indication 代理,TLS 相关,先忽略
    private final boolean isSniProxy;
 

构造函数主要是按照netty 网络客户端方式初始化相关成员变量

        bootstrap = new Bootstrap();
        // 绑定io线程池
        bootstrap.group(eventLoopGroup);
        // 配置了channel类型,如果支持Epoll的话会变成Epoll的channel
        bootstrap.channel(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup));
        // 设置tcp的连接超时时间
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.getConnectionTimeoutMs());
        // 设置tcp no delay
        bootstrap.option(ChannelOption.TCP_NODELAY, conf.isUseTcpNoDelay());
        // 配置allocator
        bootstrap.option(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
        // 绑定channelInitializer
            channelInitializerHandler = new PulsarChannelInitializer(conf, clientCnxSupplier);
            bootstrap.handler(channelInitializerHandler);
        // 这个类是netty提供的,用来解析DNS,后面专门会说
        this.dnsResolver = new DnsNameResolverBuilder(eventLoopGroup.next()).traceEnabled(true)
                .channelType(EventLoopUtil.getDatagramChannelClass(eventLoopGroup)).build();
    }

这里面传入的BufferPool是一个自定义的

这个连接池的主要功能

  1. 创建并cache连接

  2. 归还连接

  3. 按照配置的maxConnectionsPerHosts限制连接数目

具体使用方式可以参照org.apache.pulsar.client.impl.ConnectionPoolTest 这个类

ConnectionPool pool;
InetSocketAddress brokerAddress = ....;

// 获取连接,如果之前没有的话,会创建一个
CompletableFuture<ClientCnx> conn = pool.getConnection(brokerAddress);
ClientCnx cnx = conn.get();

// 使用连接做事情
...
  
// 归还给连接池
pool.releaseConnection(cnx);
          
pool.closeAllConnections();
pool.close();

我们先看一下这个类PulsarChannelInitializer用来初始化和pulsar broker 端的连接。

public void initChannel(SocketChannel ch) throws Exception {

    // tls相关
    ch.pipeline().addLast("ByteBufPairEncoder", tlsEnabled ? ByteBufPair.COPYING_ENCODER : ByteBufPair.ENCODER);

    // 定长解码器
    ch.pipeline().addLast("frameDecoder", 
                          new LengthFieldBasedFrameDecoder(
            Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
  
    // 到这里可以拿到了RPC协议反序列化后的对象,进行客户端逻辑处理
    // 实际在这个类ClientCnx里面处理所有逻辑
    ch.pipeline().addLast("handler", clientCnxSupplier.get());
}
创建连接逻辑 (connectToAddress)

netty 的bootstrap.connect(忽略tls)


ClientCnx

我们看一下这个类的层次结构

public class ClientCnx extends PulsarHandler;
public abstract class PulsarHandler extends PulsarDecoder;
public abstract class PulsarDecoder extends ChannelInboundHandlerAdapter;

PulsarDecoder

PulsarDecoder 这个类前面在初始化连接的时候还加入了一个LengthFieldBasedFrameDecoder.

所以到这里的channelRead就可以直接反序列化RPC就可以,之后会调用相应的RPC处理方法(handleXXXXXX)

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ...
          
        // Get a buffer that contains the full frame
        ByteBuf buffer = (ByteBuf) msg;
        BaseCommand cmd = null;
        BaseCommand.Builder cmdBuilder = null;
        try {
            // De-serialize the command
            int cmdSize = (int) buffer.readUnsignedInt();
            int writerIndex = buffer.writerIndex();
            buffer.writerIndex(buffer.readerIndex() + cmdSize);
          
            // 从对象池里拿到一个ByteBufCodedInputStream
            ByteBufCodedInputStream cmdInputStream = ByteBufCodedInputStream.get(buffer);
            cmdBuilder = BaseCommand.newBuilder();
            // 反序列化
            cmd = cmdBuilder.mergeFrom(cmdInputStream, null).build();
            buffer.writerIndex(writerIndex);

            cmdInputStream.recycle();

            ...
            // 下面按照不同的RPC类型调用不用的方法进行处理
            switch (cmd.getType()) {
            case PARTITIONED_METADATA:
                checkArgument(cmd.hasPartitionMetadata());
                try {
                    interceptCommand(cmd);
                    handlePartitionMetadataRequest(cmd.getPartitionMetadata());
                } catch (InterceptException e) {
                    ctx.writeAndFlush(Commands.newPartitionMetadataResponse(getServerError(e.getErrorCode()),
                            e.getMessage(), cmd.getPartitionMetadata().getRequestId()));
                } finally {
                    cmd.getPartitionMetadata().recycle();
                }
                break;
            ...
              // 省略其他RPC方法,都是正常handleXXXXX
        } finally {
               // 清理方法
            if (cmdBuilder != null) {
                cmdBuilder.recycle();
            }

            if (cmd != null) {
                cmd.recycle();
            }

            buffer.release();
        }
    }

PulsarHandler

这个类实际里面主要增加了KeepAlive逻辑的实现。

具体查看相应方法即可,比较容易


ClientCnx

这里主要负责和服务端交互的逻辑。

package org.apache.pulsar.client.impl;


public class ClientCnx extends PulsarHandler {


    // 连接状态
    enum State {
        None, SentConnectFrame, Ready, Failed, Connecting
    }
    private State state;

   //----------------------------------------------------------------------
  
    // 临时的请求队列
    // requestId -> 请求
    private final ConcurrentLongHashMap<CompletableFuture<? extends Object>> pendingRequests =
        new ConcurrentLongHashMap<>(16, 1);
  
    // Lookup 请求队列
    private final Queue<Pair<Long, Pair<ByteBuf, CompletableFuture<LookupDataResult>>>> waitingLookupRequests;

   //----------------------------------------------------------------------
  
    // 一些业务逻辑单元
    private final ConcurrentLongHashMap<ProducerImpl<?>> producers = new ConcurrentLongHashMap<>(16, 1);
    private final ConcurrentLongHashMap<ConsumerImpl<?>> consumers = new ConcurrentLongHashMap<>(16, 1);
    private final ConcurrentLongHashMap<TransactionMetaStoreHandler> transactionMetaStoreHandlers = new ConcurrentLongHashMap<>(16, 1);
  
   //----------------------------------------------------------------------
  
    // 异步新建连接的handle
    private final CompletableFuture<Void> connectionFuture = new CompletableFuture<Void>();
  
   //----------------------------------------------------------------------
   
    // PulsarClient 构造时传递进来的线程池
    private final EventLoopGroup eventLoopGroup;

   //----------------------------------------------------------------------
  
    // 限流(和lookup有关)
    private final Semaphore pendingLookupRequestSemaphore;
    private final Semaphore maxLookupRequestSemaphore;
  
    // 连接拒绝相关的成员(和lookup有关)
    private final int maxNumberOfRejectedRequestPerConnection;
    private final int rejectedRequestResetTimeSec = 60;
    // 被拒绝的请求数目(和lookup有关)
    private static final AtomicIntegerFieldUpdater<ClientCnx> NUMBER_OF_REJECTED_REQUESTS_UPDATER = AtomicIntegerFieldUpdater
            .newUpdater(ClientCnx.class, "numberOfRejectRequests");
    @SuppressWarnings("unused")
    private volatile int numberOfRejectRequests = 0;

    //----------------------------------------------------------------------
    // 用来检查请求是否超时的数据结构
    private static class RequestTime {
        final long creationTimeMs;
        final long requestId;
        final RequestType requestType;

        RequestTime(long creationTime, long requestId, RequestType requestType) {
            this.creationTimeMs = creationTime;
            this.requestId = requestId;
            this.requestType = requestType;
        }
    }
  
    // 超时的请求队列
    private final ConcurrentLinkedQueue<RequestTime> requestTimeoutQueue = new ConcurrentLinkedQueue<>();
    
   //----------------------------------------------------------------------
  
    // 消息的最大大小
    @Getter
    private static int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE;

    // RPC协议版本
    private final int protocolVersion;
  
    // operation超时时间
    private final long operationTimeoutMs;
    // 用来检查operation超时时间的handle
    private ScheduledFuture<?> timeoutTask;
  
   //----------------------------------------------------------------------
  
    // 一些记录是否从proxy连接的信息
    protected String proxyToTargetBrokerAddress = null;
    protected String remoteHostName = null;
  
    // TLS 相关
    private boolean isTlsHostnameVerificationEnable;
    private static final TlsHostnameVerifier HOSTNAME_VERIFIER = new TlsHostnameVerifier();
    protected final Authentication authentication;
    protected AuthenticationDataProvider authenticationDataProvider;
  
   //----------------------------------------------------------------------
  
    // 事务相关
    private TransactionBufferHandler transactionBufferHandler;
    

    private enum RequestType {
        Command,
        GetLastMessageId,
        GetTopics,
        GetSchema,
        GetOrCreateSchema;

        String getDescription() {
            if (this == Command) {
                return "request";
            } else {
                return name() + " request";
            }
        }
    }

这里临时回到ConnectionPool的逻辑中,之前创建连接的时候实际调用Bootstrap.connect这里返回的实际是一个Netty的Channel对象,但是ConnectionPool里面返回的ClientCnx对象。

ConnectionPool

private CompletableFuture<ClientCnx> createConnection(InetSocketAddress logicalAddress,
            InetSocketAddress physicalAddress, int connectionKey) {
     
        final CompletableFuture<ClientCnx> cnxFuture = new CompletableFuture<ClientCnx>();

        // Trigger async connect to broker
        createConnection(physicalAddress).thenAccept(channel -> {
            ....
            // 这里面ClientCnx对象实际是从这个已经成功连接的Channel的pipeline里拿到的
            final ClientCnx cnx = (ClientCnx) channel.pipeline().get("handler");
            ....

            if (!logicalAddress.equals(physicalAddress)) {
                // We are connecting through a proxy. We need to set the target broker in the ClientCnx object so that
                // it can be specified when sending the CommandConnect.
                // That phase will happen in the ClientCnx.connectionActive() which will be invoked immediately after
                // this method.
                cnx.setTargetBroker(logicalAddress);
            }
            
            // 保存了远端连接的地址
            cnx.setRemoteHostName(physicalAddress.getHostName());

            cnx.connectionFuture().thenRun(() -> {
                ... 
                // 连接成功则返回
                cnxFuture.complete(cnx);
            }).exceptionally(exception -> {
               
                cnxFuture.completeExceptionally(exception);
                cleanupConnection(logicalAddress, connectionKey, cnxFuture);
                cnx.ctx().close();
                return null;
            });
              
           ...

ClientCnx的主要方法(功能)
  • 连接生命周期管理(netty Handler里面的方法)

    • channelActive

    • channelInActive

    • exceptionCaught

    • ......

  • 发送request:主动发送RPC的方法,并按照业务逻辑处理

    • Lookup请求

    • getLastMessageId

    • getSchema

    • .....

  • 处理response:继承自PulsarDecoder 的handleXXXXX RPC 处理逻辑

  • 主动发送RPC方法获得原始的response

CompletableFuture<T> sendRequestAndHandleTimeout(ByteBuf requestMessage,long requestId,RequestType requestType)
  • 检查请求是否超时checkRequestTimeout

  • 注册/ 删除业务逻辑对象(业务逻辑对象后面单出文章说)

    • consumer

    • producer

    • transactionMetaStoreHandler

    • transactionBufferHandler


sendRequestAndHandleTimeout方法
private <T> CompletableFuture<T> sendRequestAndHandleTimeout(ByteBuf requestMessage, long requestId, RequestType requestType) {
  
        // 放入到pending请求队列里面,用来等待response
        CompletableFuture<T> future = new CompletableFuture<>();
        pendingRequests.put(requestId, future);
  
        // 直接发送RPC body
        ctx.writeAndFlush(requestMessage).addListener(writeFuture -> {
            if (!writeFuture.isSuccess()) {
                log.warn("{} Failed to send {} to broker: {}", ctx.channel(), requestType.getDescription(), writeFuture.cause().getMessage());
                pendingRequests.remove(requestId);
                future.completeExceptionally(writeFuture.cause());
            }
        });
        // 在超时队列里面增加一个数据结构用来记录超时
        requestTimeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId, requestType));
  
        return future;
    }

chanelActive 方法

这个方法逻辑比较简单

  • PulsarHandler.channelActive方法里面开启了KeepAlive逻辑的调度任务

  • ClientCnx.channelActive 方法里面开启了requestTimeout逻辑的调度任务

  • 发送一个ConnectCommand请求给服务端(服务端处理逻辑到后面会说)


请求超时的处理

这个逻辑也比较容易。

使用了EventLoopGroup调度了一个定时任务,每次去查看requestTimeoutQueue里面的请求是否有超时的

有的话就把这个请求的response设置成TimeoutException

这里的请求超时检查时间间隔是operationTimeoutMs决定的


PulsarClient 功能回顾

这样让我们回顾一下PulsarClient的总体功能

  • 包含了一个连接池用来创建ClientCnx和服务端进行沟通

  • 保存了一些自定义业务处理单元(consumer,producer, tcClient)

  • LookupService

  • 一些周期check的动作

  • Schema 的LoadingCache

业务单元通过注册到ClientCnx上面,可以使用这个连接发送RPC,获得response,这样传递回业务逻辑单元里面

PulsarClient这个类对使用者来说提供了一个RPC层面的抽象,其他类使用RPC完成自己的逻辑

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,607评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,239评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,960评论 0 355
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,750评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,764评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,604评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,347评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,253评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,702评论 1 315
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,893评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,015评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,734评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,352评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,934评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,052评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,216评论 3 371
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,969评论 2 355

推荐阅读更多精彩内容