生产者向Kafka发送消息,需要创建一个ProducerRecord对象,包含目标主题和要发送的内容,还可指定键或分区。(Topic,Partition,Key,Value),发送对象要把键和值对象序列化成字节数组。接着数据传给分区器,记录被添加到一个批次中,这个批次中的所有消息会被发送到相同的主题和分区上,有一个独立的线程负责把这些记录批次发送到相应的broker上。服务器收到这些消息会返回一个响应。如果消息写入成功,返回一个RecordMetaData对象,包含主题和分区信息,以及记录在分区里的偏移量。
发送消息的方式
发送并忘记:并不关心是否正常到达
同步发送:使用send发送消息,返回一个Future对象,调用get等待判断消息是否发送成功。
异步发送:调用send方法,指定一个回调函数,服务器响应时调用。
消息同步级别ack
ack指定必须有多少个分区副本收到消息,生产者才认为消息写是成功的。该参数对消息丢失的可能性有重要影响。
ack=0,生产者在成功写入消息之前不会等待任何来自服务器的响应,可以达到很高的吞吐量。
ack=1,只要集群首领节点收到消息,生产者就收到一个成功响应。此时吞吐量取决于是同步发送还是异步发送。
ack=all,只有所有参与复制的节点全部收到消息,生产者才会收到一个来自服务器的成功响应,该模式最安全。
对顺序严格要求的配置
retries:生产者收到错误,可重试的次数。
max.in.flight.requests.per.connection:指定生产者收到服务器响应之前可发送多少个消息。设为1可保证消息是按发送顺序写入服务器的,即使发生重试。
序列化Avro
为了保证通用性,建议用Avro序列化对象。Avro数据通过schema来定义。序列化器和反序列化器分别负责处理schema的注册和拉取。