kafka node 使用

KafkaClient:

概念:能够直接连接Kafka brokers的client
初始化:const client = new kafka.KafkaClient({kafkaHost: '10.3.100.196:9092'}); 没有填写kafkaHost默认是:localhost:9092

Producer

初始化:Producer(KafkaClient, [options], [customPartitioner])
例:

var kafka = require('kafka-node'),
    Producer = kafka.Producer,
    client = new kafka.KafkaClient(),
    producer = new Producer(client);

发送消息:
send(payloads, cb)

payloads:数组,item形如json
{
   topic: 'topicName',
   messages: ['message body'], // multi messages should be a array, single message can be just a string or a KeyedMessage instance
   key: 'theKey', // string or buffer, only needed when using keyed partitioner
   partition: 0, // default 0
   attributes: 2, // default: 0
   timestamp: Date.now() // <-- defaults to Date.now() (only available with kafka v0.10+)
}
cb: 处理成功或失败的回调函数

创建Topic
createTopics(topics, cb)

HighLevelProducer

HighLevelProducer(KafkaClient, [options], [customPartitioner])
send(payloads, cb)
createTopics(topics, async, cb)

ProducerStream

ProducerStream (options)

案例:
使用Transform去更新数据

Consumer

Consumer(client, payloads, options)
on('error', function (err) {})
on('offsetOutOfRange', function (err) {})

addTopics(topics, cb, fromOffset)
removeTopics(topics, cb)
commit(cb)
setOffset(topic, partition, offset)
pause()
resume()
pauseTopics(topics)
resumeTopics(topics)
close(force, cb)

ConsumerStream

ConsumerStream(client, payloads, options)

ConsumerGroup

ConsumerGroup(options, topics)

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容