1 、消息发送方法:Producer.send(new ProducerRecord())
2 、Producer 生产者对象,这个对象是线程安全的,所以可以在多线程下用来发送消息,
3、ProducerRecord 为具体的消息对象,构造方法有很多,多使用new ProducerRecord("topic","message") 其他构造方法如下:
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)
public ProducerRecord(String topic, Integer partition, Long timestamp ,K key , V value)
public ProducerRecord(String topic , Integer partition, K key, V value, Iterable<Header> headers))
public ProducerRecord(String topic, Integer partition, K key, V value)
public ProducerRecord(String topic, K key, V value)
public ProducerRecord(String topic, V value)
描述:(topic:主题 value :消息 key: 消息主键 partition:分区 headers: 头部信息 timestamp:时间)
在发送消息时ProducerRecord 会被频繁的创建,每条消息都会创建一个ProducerRecord对象。
4、创建了生产者和消息之后就可以发送消息了,发送消息有三种模式,发后既忘(fire-and-forget) ,同步(sync), 异步(async)三种。
KafkaProduct的send()方法,返回值并不是void,而是Feature<RecordMetadata>类型,send() 有两个重载方法
同步方法 : public Future send(ProducerRecord record)
异步方法:public Future<RecordMetadata> send(ProducerRecord<K , V> record ,Callback callback)
发后即忘:
同步:利用返回的Future实现同步 Product.send(ProducerRecord).get()
实际上,send()方法本来就是异步的,返回的Future对象可以使调用方获得发送结果
如上述 send()方法获取的Future<RecordMetadata> 对象, 调用get()方法可以阻塞等待kafka的响应,直到发送成功或者发生异常。
RecordMetadata 中包含了一些元数据信息 比如分区、主题、分区中的偏移量、时间戳等。如果不需要这些信息 使用send().get()方法更方便。
异步:
异步使用Callback 方式返回,要么成功,要么返回错误信息,RecordMetadata 和Exception 是互斥的 Exception == null 时,发送成功RecordMetadata 有信息, Exception !=null的时候 RecordMetadata就为空。
5消息顺序性、
producer .send(record1, callback1) ;producer .send(record2, callback2) ; 相对于同一个分区而言,record1 在record2 之前发送,那么callback1 一定在callback2 之前调用,也就是说相同的分区,回调函数也可以保准有序性返回。