【Kafka源码】处理请求

[TOC]


在KafkaServer中的入口在:

apis = new KafkaApis(socketServer.requestChannel, replicaManager, groupCoordinator,
        kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer)
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)

首先根据相关参数,实例化KafkaApis,然后实例化KafkaRequestHandlerPool。下面我们首先看下KafkaRequestHandlerPool。

一、KafkaRequestHandlerPool

class KafkaRequestHandlerPool(val brokerId: Int,
                              val requestChannel: RequestChannel,
                              val apis: KafkaApis,
                              numThreads: Int) extends Logging with KafkaMetricsGroup {

  /* a meter to track the average free capacity of the request handlers */
  private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)

  this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "
  val threads = new Array[Thread](numThreads)
  val runnables = new Array[KafkaRequestHandler](numThreads)
  for(i <- 0 until numThreads) {
    runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis)
    threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
    threads(i).start()
  }
//...
}

主要是启动了numThreads个数的线程,然后线程中执行的内容是KafkaRequestHandler。

/**
 * 响应kafka请求的线程
 */
class KafkaRequestHandler(id: Int,
                          brokerId: Int,
                          val aggregateIdleMeter: Meter,
                          val totalHandlerThreads: Int,
                          val requestChannel: RequestChannel,
                          apis: KafkaApis) extends Runnable with Logging {
  this.logIdent = "[Kafka Request Handler " + id + " on Broker " + brokerId + "], "

  def run() {
    while(true) {
      try {
        var req : RequestChannel.Request = null
        while (req == null) {
          // We use a single meter for aggregate idle percentage for the thread pool.
          // Since meter is calculated as total_recorded_value / time_window and
          // time_window is independent of the number of threads, each recorded idle
          // time should be discounted by # threads.
          val startSelectTime = SystemTime.nanoseconds
          req = requestChannel.receiveRequest(300)
          val idleTime = SystemTime.nanoseconds - startSelectTime
          aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
        }

        if(req eq RequestChannel.AllDone) {
          debug("Kafka request handler %d on broker %d received shut down command".format(
            id, brokerId))
          return
        }
        req.requestDequeueTimeMs = SystemTime.milliseconds
        trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
        apis.handle(req)//这边是如何处理请求的重点
      } catch {
        case e: Throwable => error("Exception when handling request", e)
      }
    }
  }
    //shutdown。。
}

在run方法中,我们可以看到,主要处理消息的地方是api.handle(req)。下面我们主要看下这块的内容。

二、KafkaApis.handle

直接看代码:

/**
 * Top-level method that handles all requests and multiplexes to the right api
 */
def handle(request: RequestChannel.Request) {
  try {
    trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".
    format(request.requestDesc(true), request.connectionId, request.securityProtocol, request.session.principal))
      ApiKeys.forId(request.requestId) match {//根据requestId,调用不同的方法,处理不同的请求
        case ApiKeys.PRODUCE => handleProducerRequest(request)
        case ApiKeys.FETCH => handleFetchRequest(request)
        case ApiKeys.LIST_OFFSETS => handleOffsetRequest(request)
        case ApiKeys.METADATA => handleTopicMetadataRequest(request)
        case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
        case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
        case ApiKeys.UPDATE_METADATA_KEY => handleUpdateMetadataRequest(request)
        case ApiKeys.CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request)
        case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
        case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
        case ApiKeys.GROUP_COORDINATOR => handleGroupCoordinatorRequest(request)
        case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
        case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
        case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
        case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
        case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
        case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
        case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
        case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
        case requestId => throw new KafkaException("Unknown api code " + requestId)
      }
    } catch {
      case e: Throwable =>
        if (request.requestObj != null) {
          request.requestObj.handleError(e, requestChannel, request)
          error("Error when handling request %s".format(request.requestObj), e)
        } else {
          val response = request.body.getErrorResponse(request.header.apiVersion, e)
          val respHeader = new ResponseHeader(request.header.correlationId)

          /* If request doesn't have a default error response, we just close the connection.
             For example, when produce request has acks set to 0 */
          if (response == null)
            requestChannel.closeConnection(request.processor, request)
          else
            requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, response)))

          error("Error when handling request %s".format(request.body), e)
     }
  } finally
    request.apiLocalCompleteTimeMs = SystemTime.milliseconds
}

2.1 ApiKeys枚举类

PRODUCE(0, "Produce"),//生产者消息
FETCH(1, "Fetch"),//消费者获取消息
LIST_OFFSETS(2, "Offsets"),//获取偏移量
METADATA(3, "Metadata"),//获取topic源数据
LEADER_AND_ISR(4, "LeaderAndIsr"),
STOP_REPLICA(5, "StopReplica"),//停止副本复制
UPDATE_METADATA_KEY(6, "UpdateMetadata"),//更新源数据
CONTROLLED_SHUTDOWN_KEY(7, "ControlledShutdown"),//controller停止
OFFSET_COMMIT(8, "OffsetCommit"),//提交offset
OFFSET_FETCH(9, "OffsetFetch"),//获取offset
GROUP_COORDINATOR(10, "GroupCoordinator"),//组协调
JOIN_GROUP(11, "JoinGroup"),//加入组
HEARTBEAT(12, "Heartbeat"),//心跳
LEAVE_GROUP(13, "LeaveGroup"),//离开组
SYNC_GROUP(14, "SyncGroup"),//同步组
DESCRIBE_GROUPS(15, "DescribeGroups"),//描述组
LIST_GROUPS(16, "ListGroups"),//列出组
SASL_HANDSHAKE(17, "SaslHandshake"),//加密握手
API_VERSIONS(18, "ApiVersions");//版本

这块比较简单,主要的是Request的数据结构,还有后续的处理方法。下面我们逐步来分析。

三、Request数据结构

所有的请求,最终都会变成这个RequestChannel.Request。所以我们先看下这个Request。

case class Request(processor: Int, connectionId: String, session: Session, private var buffer: ByteBuffer, startTimeMs: Long, securityProtocol: SecurityProtocol) {
    //...
    val requestId = buffer.getShort()

    private val keyToNameAndDeserializerMap: Map[Short, (ByteBuffer) => RequestOrResponse]=
      Map(ApiKeys.FETCH.id -> FetchRequest.readFrom,
        ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> ControlledShutdownRequest.readFrom
      )

    val requestObj =
      keyToNameAndDeserializerMap.get(requestId).map(readFrom => readFrom(buffer)).orNull

    val header: RequestHeader =
      if (requestObj == null) {
        buffer.rewind
        try RequestHeader.parse(buffer)
        catch {
          case ex: Throwable =>
            throw new InvalidRequestException(s"Error parsing request header. Our best guess of the apiKey is: $requestId", ex)
        }
      } else
        null
    val body: AbstractRequest =
      if (requestObj == null)
        try {
          // For unsupported version of ApiVersionsRequest, create a dummy request to enable an error response to be returned later
          if (header.apiKey == ApiKeys.API_VERSIONS.id && !Protocol.apiVersionSupported(header.apiKey, header.apiVersion))
            new ApiVersionsRequest
          else
            AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer)
        } catch {
          case ex: Throwable =>
            throw new InvalidRequestException(s"Error getting request for apiKey: ${header.apiKey} and apiVersion: ${header.apiVersion}", ex)
        }
      else
        null

    buffer = null
    private val requestLogger = Logger.getLogger("kafka.request.logger")

    def requestDesc(details: Boolean): String = {
      if (requestObj != null)
        requestObj.describe(details)
      else
        header.toString + " -- " + body.toString
    }
    //...
}

主要有几个部分,

  • 首先是requestId,是一个short类型的值。
  • 然后是header,即消息头,是一个RequestHeader
  • 最后是body,是消息的内容,类型为AbstractRequest

3.1 requestId

这个requestId表示的是api的类型,KafkaApis需要根据这个requestId,来判断调用哪个方法处理消息。

3.2 header

我们看下RequestHeader的结构。

private final short apiKey;
private final short apiVersion;
private final String clientId;
private final int correlationId;

主要是四个变量,apiKey,APIVersion,clientId,correlationId。

3.3 body

消息体,对应的类为AbstractRequest。主要的内容是根据版本号和apiKey来解析出消息的具体内容。

public static AbstractRequest getRequest(int requestId, int versionId, ByteBuffer buffer) {
    ApiKeys apiKey = ApiKeys.forId(requestId);
    switch (apiKey) {
        case PRODUCE:
            return ProduceRequest.parse(buffer, versionId);
        case FETCH:
            return FetchRequest.parse(buffer, versionId);
        case LIST_OFFSETS:
            return ListOffsetRequest.parse(buffer, versionId);
        case METADATA:
            return MetadataRequest.parse(buffer, versionId);
        case OFFSET_COMMIT:
            return OffsetCommitRequest.parse(buffer, versionId);
        case OFFSET_FETCH:
            return OffsetFetchRequest.parse(buffer, versionId);
        case GROUP_COORDINATOR:
            return GroupCoordinatorRequest.parse(buffer, versionId);
        case JOIN_GROUP:
            return JoinGroupRequest.parse(buffer, versionId);
        case HEARTBEAT:
            return HeartbeatRequest.parse(buffer, versionId);
        case LEAVE_GROUP:
            return LeaveGroupRequest.parse(buffer, versionId);
        case SYNC_GROUP:
            return SyncGroupRequest.parse(buffer, versionId);
        case STOP_REPLICA:
            return StopReplicaRequest.parse(buffer, versionId);
        case CONTROLLED_SHUTDOWN_KEY:
            return ControlledShutdownRequest.parse(buffer, versionId);
        case UPDATE_METADATA_KEY:
            return UpdateMetadataRequest.parse(buffer, versionId);
        case LEADER_AND_ISR:
            return LeaderAndIsrRequest.parse(buffer, versionId);
        case DESCRIBE_GROUPS:
                return DescribeGroupsRequest.parse(buffer, versionId);
        case LIST_GROUPS:
            return ListGroupsRequest.parse(buffer, versionId);
        case SASL_HANDSHAKE:
            return SaslHandshakeRequest.parse(buffer, versionId);
        case API_VERSIONS:
            return ApiVersionsRequest.parse(buffer, versionId);
        default:
            throw new AssertionError(String.format("ApiKey %s is not currently handled in `getRequest`, the " +
                    "code should be updated to do so.", apiKey));
    }
}

这块的请求类型很多,想要了解具体结构的,可以到每个类中具体看。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,047评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,807评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,501评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,839评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,951评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,117评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,188评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,929评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,372评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,679评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,837评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,536评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,168评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,886评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,129评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,665评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,739评论 2 351

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,642评论 18 139
  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 171,889评论 25 707
  • 第一章 Nginx简介 Nginx是什么 没有听过Nginx?那么一定听过它的“同行”Apache吧!Ngi...
    JokerW阅读 32,653评论 24 1,002
  • 抢劫前 时间:8:30a.m 地点:暴雨突袭的街道上,...
    海阔天空_266b阅读 465评论 0 1
  • 刚刚接到了一个电话,手拿起的时候,看到归属地是湖南吉首,犹豫了几秒才接,是我爸爸的电话,是我那还在狱中的爸爸的电话...
    3cdc28ccc838阅读 219评论 0 0