1.启动入口
PulsarStandaloneStarter
在standalone模式下,主要启动了以下几个服务
- PulsarService
- PulsarAdmin
- LocalBookeeperEnsemble
- WorkerService
PulsarBrokerStarter.BrokerStarter
在普通模式下,启动了以下几个服务
- PulsarService
- BookieServer
- AutoRecoveryMain
- StatsProvider
- WorkerService
简单说一些这几个服务
- WorkerService: Pulsar function 相关,可以不启动
- PulsarService: 主要的PulsarBroker相关
- BookieServer: Bookeeper相关
- AutoRecoveryMain: Bookeeper autorecovery相关
- StatsProvider: Metric Exporter类似的功能
2. PulsarService
PulsarService.start
ProtocolHandlers
支持不同protocol处理(kafka协议等)localZookeeperConnectionProvider
维护zk session 和zk连接-
startZkCacheService
- LocalZooKeeperCache => LocalZooKeeperCacheService
- GlobalZooKeeperCache => ConfigurationCacheService
BookkeeperClientFactory
创建配置Bookkeeper 客户端managedLedgerClientFactory
维护一个ManagedLedger的客户端,借用BookkeeperClientBrokerService
这个是服务器的主要逻辑了,这个放在后面说loadManager
收集集群机器负载,并根据负载情况均衡负载startNamespaceService
NameSpaceService,管理放置的ResourceBundle,和LoadManager相关schemaStorage
schemaRegistryService
上面2个都是和Schema相关的defaultOffloader
LedgerOffloader,用来将Ledger(Bookkeeper)中的冷数据放到其他存储当中
WebService
webSocketService
http,websocket相关LeaderElectionService
和LoadManager有关,如果是集中方式的话需要选出一个Leader定期根据集群情况进行均衡负载transactionMetadataStoreService
事务相关metricGenerator
metric相关WorkerService
pulsar function 相关
3. BrokerService
public void start() throws Exception {
// producer id 分布式生成器
this.producerNameGenerator = new DistributedIdGenerator(pulsar.getZkClient(), producerNameGeneratorPath,
pulsar.getConfiguration().getClusterName());
// 网络层配置
ServerBootstrap bootstrap = defaultServerBootstrap.clone();
ServiceConfiguration serviceConfig = pulsar.getConfiguration();
bootstrap.childHandler(new PulsarChannelInitializer(pulsar, false));
...
// 绑定端口
listenChannel = bootstrap.bind(addr).sync().channel();
...
// metric
this.startStatsUpdater(
serviceConfig.getStatsUpdateInitialDelayInSecs(),
serviceConfig.getStatsUpdateFrequencyInSecs());
// 启动了一堆需要定期执行的任务
this.startInactivityMonitor();
// 启动3个schedule任务分别检测
// 1. 长时间无效的topic
// 2. 长时间无效的producer(和message去重相关)
// 3. 长时间无效的subscription
this.startMessageExpiryMonitor();
this.startCompactionMonitor();
this.startMessagePublishBufferMonitor();
this.startConsumedLedgersMonitor();
this.startBacklogQuotaChecker();
this.updateBrokerPublisherThrottlingMaxRate();
this.startCheckReplicationPolicies();
// register listener to capture zk-latency
ClientCnxnAspect.addListener(zkStatsListener);
ClientCnxnAspect.registerExecutor(pulsar.getExecutor());
4. PulsarChannelInitializer
顺着netty的初始化方式我们直接看ChannelInitializer,这里应该和Kafka类似进行处理请求的操作。
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
brokerConf.getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
ch.pipeline().addLast("flowController", new FlowControlHandler());
ServerCnx cnx = new ServerCnx(pulsar);
ch.pipeline().addLast("handler", cnx);
connections.put(ch.remoteAddress(), cnx);
}
5. ServerCnx
这个类的作用可以对标KafkaApis,处理各种Api请求
这个类实际上是一个ChannelHandler
继承了PulsarHandler(主要负责一些连接的keepalive逻辑)
PulsarHandler继承了 PulsarDecoder ( 主要负责序列化,反序列化Api请求)
PulsarDecoder实际上是一个 ChannelInboundHandlerAdapter
而PulsarAPi实际上是通过Pulsar.proto 生成的,这里编写了各种Api的定义