Kafka之Producer篇

Kafka之Producer篇

Kafka生产者案例
本篇我们叙述Kafka是如何将数据发送到服务端的;
首先我们来看下案例,如下代码所示,这是一个简单的将消息发送到服务端的例子,首先配置链接Kafka的参数,然后用生成的配置生成一个KafkaProducer,这个就是kafka的生产者,紧接着生成一个ProducerRecord用于封装要发送的消息,最终调用producer.send(recoder)将消息发送至服务端,这个Demo比较粗糙;
消息发送有两种方式:
1.同步发送消息,producer.send(recoder)会返回一个future对象,然后future.get()会一直等待Kafka服务器的响应;
2.异步发送消息,producer.send(recoder,new DemoProducerCallback());传入一个DemoProducerCallback类,但是这类要实现kafka的Callback,而异步处理便在onCompletion()方法中完成;

  public class ProdectDemo {


    public static void main(String[] args) {

        Properties props = new Properties();
        //broker地址
        props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
        //请求时候需要验证
        props.put("acks", "all");
        //请求失败时候需要重试
        props.put("retries", 0);
        //指定消息key序列化方式
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        //指定消息本身的序列化方式
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");


        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        ProducerRecord<String, String> recoder=new ProducerRecord<String, String>("test","test1","test value");
        //同步发送消息
        try {
            RecordMetadata recordMetadata=producer.send(recoder).get();
        } catch (Exception e) {
            e.printStackTrace();
        }

        //异步发送消息
        try {
            producer.send(recoder,new DemoProducerCallback());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

 class DemoProducerCallback implements Callback {
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if (e != null) {//如果Kafka返回一个错误,onCompletion方法抛出一个non null异常。
            e.printStackTrace();//对异常进行一些处理,这里只是简单打印出来
        }
    }
}

Kafka生产者常用参数配置

ack机制:
Ack=0:生产者在成功写入消息之前不会等待任何服务器的响应;
Ack=1:只要集群的leader接收到消息,生产者就会收到一个来自服务器的成功响应;
Ack=all:只有当所有参与与复制的服务器全部都收到消息时,生产者才能收到一个来自服务器的成功响应;

Buffer.memory
用来设置生产者内存缓冲区的大小,生产者用这个来缓冲要发送到broker的消息;

Comperession.type
指定消息在发送给broker之前采用哪种压缩方式,snappy,gzip,lz4…..

Retries
生产者有可能会收到服务器的错误,它指定生产者可以重发消息的次数,如果达到这个次数生产者会放弃重试并返回错误;

Batch.size
指定一个批次可以使用的内存大小,按照字节来计算;

Lingers.ms
指定在生产者在发送批次之前等待更多的消息加入批次的时间,以便提高吞吐量;

Client.id
消息id

Max.in.flight.request.per.connection
指定生产者在收到服务器相应之前可以发送多少个消息;设置为1,可以保证消息按照发送的顺序写入服务器,即使发生重试;

Request.timeout.ms
指定生产者再发送数据时等待服务器返回相应的时间;

Metadata.fatch.timeout.ms
指定生产者在获取元数据时等待服务器返回响应的时间;

Timeout.ms
指定broker等待同步副本返回消息确认的时间,与ask搭配使用;

Max.block.ms
指定调用send()或者partitionsFor()方法获取元数据时阻塞的时间;

Max.request.size
控制单条消息的最大值;

Kafka生产者的工作原理
上面介绍了Kafka如何生产数据的,下面我们来看下Kafka的数据生产的工作流程,如下图所示,根据上面的Demo我们可以看的出:
1.我们先构建了一个KafkaProducer,然后就构建出了ProducerRecord;
2.然后就调用producer.send(recoder)进行发送;
3.而KafkaProducer接收到消息后会先对其进行序列化,然后结合本地缓存的元数据信息一起发送给partitioner去确定目标分区;
4.确认分区后追加写入到内存中的消息缓冲池(accumulator),此时的send方法完成;
5.最后发送给服务端Broker,服务器对生产者做出响应,至此消息发送完毕;

image

Kafka生产者的内部原理
下面将详细展开Kafka生产者的工作原理;
ProducerRecord和KafkaProducer的产生就不说了,那就先说下消息的序列化和确定目标分区,如果消息是属于字符串的那么就可以直接使用StringSerializer,如果是对象之类的话可以使用Avro,Thrift,Protobuf或者自定义序列化器;传输的对象序列化后结合KafkaProducerr缓存的元数据共同传递给后面Partitioner实现类进行目标分区的计算,当然这里我们可以使用我们自定义的分区,直接实现Partitioner,重写它的partition方法即可;

image

序列化和计算完分区之后便要向缓冲区追加消息,producer创建时会创建一个默认32MB(由buffer.memory参数指定)的accumulator缓冲区,专门保存待发送的消息。该数据结构中还包含了一个特别重要的集合信息:消息批次信息(batches)。该集合本质上是一个HashMap,里面分别保存了每个topic分区下的batch队列,即前面说的批次是按照topic分区进行分组的。这样发往不同分区的消息保存在对应分区下的batch队列中。举个简单的例子,假设消息M1, M2被发送到test的0分区但属于不同的batch,M3分送到test的1分区,那么batches中包含的信息就是:{"test-0" -> [batch1, batch2], "test-1" -> [batch3]};

单个topic分区下的batch队列中保存的是若干个消息批次。每个batch中最重要的3个组件包括:
compressor: 负责执行追加写入操作
batch缓冲区:由batch.size参数控制,消息被真正追加写入到的地方
thunks:保存消息回调逻辑的集合

image

当消息被追加到缓冲区后就需要将消息发送给Broker了,那么这个动作谁来做呢?其实在KafkaProducer创建完成后会有一个IO线程即Sender线程,它负责将缓冲区的消息发送至Broker;
具体的工作流程:
1.不断轮询缓冲区寻找已做好发送准备的分区;
2.将轮询获得的各个batch按照目标分区所在的leader broker进行分组;
3.将分组后的batch通过底层创建的Socket连接发送给各个broker;
4.等待服务器端发送response回来;

image

如果我们发送消息的时候使用的是同步方法,则会一直等待服务器端的相应,如果是异步的话会在callback中获取到服务器的相应,在上面我们已经将消息发送至服务端,如果Broker处理完毕后会就会做出相应,broker会把响应信息发给Sender线程,然后Sender线程会依次往回传递,直至callback函数,如下图所示;

image

至此Kafka生产者的使用以及工作原理以及叙述完毕!

参考:
《Kafka权威指南》
http://www.cnblogs.com/huxi2b/p/6364613.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,254评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,875评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,682评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,896评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,015评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,152评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,208评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,962评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,388评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,700评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,867评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,551评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,186评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,901评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,689评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,757评论 2 351