kafka官方中文文档:https://kafka.apachecn.org/
本章结合jfinal集成kafka消息队列作为实例来对消息中间件进行总结
kafka的特点
作为消息队列,企业中选择mq的还是多数,像Rabbit,Rocket等mq中间件都非常火,那么kafka和这些比较热门的消息中间件相比有什么优势呢
kafka是一个分布式消息发布-订阅消息系统,原本设计的初衷是日志统计分析,它主要用于处理活跃的流式数据和大数据量的数据处理上,他的吞吐量要高于其他的mq中间件,并且内部采用消息的批量处理
例如我们公司目前做了一个垃圾车实时数据推送的项目,每辆车每隔几秒钟就会向服务器发送一些称重信息,而且每天都会有几百辆垃圾车同时工作,这种场景就非常适用于kafka
kafka原理
kafka常用名词:
kafka概念图:
上代码
转载kafka2.0的实现
https://blog.csdn.net/yitian_z/article/details/87909862
- 数据接收端集成kafka(spring boot):
配置文件application-dev.properties
message.topic=shenzhou_topic
producer.bootstrapServers=47.101.42.23:9092
producer.acks=all
producer.retries=0
producer.batchSize=16384
producer.lingerMs=1
producer.bufferMemory=65536
producer.keySerializer=org.apache.kafka.common.serialization.StringSerializer
producer.valueSerializer=org.apache.kafka.common.serialization.StringSerializer
生产者java bean: KafkaProducerConfig
package com.diasit.bean;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Component()
@ConfigurationProperties(prefix="producer")
public class KafkaProducerConfig {
private String bootstrapServers;
private String acks;
private Integer retries;
private Integer batchSize;
private Integer lingerMs;
private Integer bufferMemory;
private String keySerializer;
private String valueSerializer;
public String getBootstrapServers() {
return bootstrapServers;
}
public void setBootstrapServers(String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}
public String getAcks() {
return acks;
}
public void setAcks(String acks) {
this.acks = acks;
}
public Integer getRetries() {
return retries;
}
public void setRetries(Integer retries) {
this.retries = retries;
}
public Integer getBatchSize() {
return batchSize;
}
public void setBatchSize(Integer batchSize) {
this.batchSize = batchSize;
}
public Integer getLingerMs() {
return lingerMs;
}
public void setLingerMs(Integer lingerMs) {
this.lingerMs = lingerMs;
}
public Integer getBufferMemory() {
return bufferMemory;
}
public void setBufferMemory(Integer bufferMemory) {
this.bufferMemory = bufferMemory;
}
public String getKeySerializer() {
return keySerializer;
}
public void setKeySerializer(String keySerializer) {
this.keySerializer = keySerializer;
}
public String getValueSerializer() {
return valueSerializer;
}
public void setValueSerializer(String valueSerializer) {
this.valueSerializer = valueSerializer;
}
}
生产者制造工厂:KafkaProducerFactory
package com.diasit.pool;
import com.diasit.bean.KafkaProducerConfig;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.context.ContextLoader;
import org.springframework.web.context.WebApplicationContext;
import java.util.Properties;
/**
* kafaka 生产者制造工厂
*/
public class KafkaProducerFactory extends BasePooledObjectFactory<Producer<String, String>> {
private KafkaProducerConfig config;
public KafkaProducerFactory(KafkaProducerConfig config) {
super();
this.config = config;
}
@Override
public Producer<String, String> create() throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", config.getBootstrapServers());
props.put("acks", config.getAcks());
props.put("retries", config.getRetries());
props.put("batch.size", config.getBatchSize());
props.put("linger.ms", config.getLingerMs());
props.put("buffer.memory", config.getBufferMemory());
props.put("key.serializer", config.getKeySerializer());
props.put("value.serializer", config.getValueSerializer());
Producer<String, String> producer = new KafkaProducer<String, String>(props);
return producer;
}
@Override
public PooledObject<Producer<String, String>> wrap(Producer<String, String> producer) {
return new DefaultPooledObject<>(producer);
}
}
package com.diasit.pool;
import com.diasit.bean.KafkaProducerConfig;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.kafka.clients.producer.Producer;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.Properties;
public class KafkaProducerPool extends GenericObjectPool<Producer<String, String>> {
public KafkaProducerPool(KafkaProducerConfig config) {
super(new KafkaProducerFactory(config), new GenericObjectPoolConfig<>());
}
}
kafka生产者工具:KafkaProducerKit
package com.diasit.socket.kafka;
import com.diasit.bean.KafkaProducerConfig;
import com.diasit.pool.KafkaProducerPool;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
public class KafkaProducerKit {
private static KafkaProducerPool pool;
private static String TOPIC;
@Autowired
private KafkaProducerConfig config;
@Value("${message.topic}")
private String topic;
@PostConstruct
public void init() {
TOPIC = topic;
pool = new KafkaProducerPool(config);
}
/**
* 发送消息
* @param key
* @param value
*/
public static void send(String key, String value) {
Producer<String, String> producer = null;
try {
producer = pool.borrowObject();
producer.send(new ProducerRecord<>(TOPIC, key, value), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
System.err.println("the producer has a error:" + e.getMessage());
}
}
});
producer.flush();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (producer != null) {
pool.returnObject(producer);
}
}
}
}
使用:发送到消息队列(将每一条消息的车辆id和设备号发送至kafka服务器)
KafkaProducerKit.send(data.getId(), data.getImei());
- 平台端集成kafka,并消费(jfinal):
配置文件config-dev.properties
#kafka 设置
kafka.bootstrap.servers=47.101.42.23:9092
kafka.enable.auto.commit=true
kafka.auto.commit.interval.ms=1000
kafka.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.topic=shenzhou_topic
kafka配置类:KafkaConfig
package com.diasit.shenzhou.kafka;
import com.jfinal.kit.PropKit;
/**
* kafka 配置信息
*/
public class KafkaConfig {
private static KafkaConfig config;
private static final Object LOCK = new Object();
private KafkaConfig() {}
private String bootstrapServers;
private boolean autoCommit;
private Integer intervalMs;
private String keyDeserializer;
private String valueDeserializer;
private String topic;
public String getBootstrapServers() {
return bootstrapServers;
}
public void setBootstrapServers(String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}
public boolean isAutoCommit() {
return autoCommit;
}
public void setAutoCommit(boolean autoCommit) {
this.autoCommit = autoCommit;
}
public Integer getIntervalMs() {
return intervalMs;
}
public void setIntervalMs(Integer intervalMs) {
this.intervalMs = intervalMs;
}
public String getKeyDeserializer() {
return keyDeserializer;
}
public void setKeyDeserializer(String keyDeserializer) {
this.keyDeserializer = keyDeserializer;
}
public String getValueDeserializer() {
return valueDeserializer;
}
public void setValueDeserializer(String valueDeserializer) {
this.valueDeserializer = valueDeserializer;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public static KafkaConfig getInstance() {
if (config != null) {
return config;
}
synchronized (LOCK){
if (config != null) {
return config;
}
config = new KafkaConfig();
config.setAutoCommit(PropKit.getBoolean("kafka.enable.auto.commit", true));
config.setBootstrapServers(PropKit.get("kafka.bootstrap.servers"));
config.setIntervalMs(PropKit.getInt("kafka.auto.commit.interval.ms", 1000));
config.setKeyDeserializer(PropKit.get("kafka.key.deserializer"));
config.setValueDeserializer(PropKit.get("kafka.value.deserializer"));
config.setTopic(PropKit.get("kafka.topic"));
return config;
}
}
}
消费者线程:KafkaMsgConsumer
public class KafkaMsgConsumer implements Runnable{
private boolean isContinue = true;
private String groupId;
private KafkaConfig config;
public KafkaMsgConsumer(String groupId, KafkaConfig config) {
this.groupId = groupId;
this.config = config;
}
public Properties getProperties() {
Properties props = new Properties();
props.put("bootstrap.servers", config.getBootstrapServers());
props.put("enable.auto.commit", config.isAutoCommit());
props.put("auto.commit.interval.ms", config.getIntervalMs());
props.put("key.deserializer", config.getKeyDeserializer());
props.put("value.deserializer", config.getValueDeserializer());
props.put("group.id", groupId);
return props;
}
/**
* 开始接收消息
*/
@Override
public void run() {
Properties props = getProperties();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(config.getTopic()));
while(isContinue) {
ConsumerRecords<String, String> records = consumer.poll(500);
if (records == null || records.isEmpty()) {
continue;
}
for(ConsumerRecord<String, String> record: records) {
String rowKey = record.key();
String imei = record.value();
// 处理
process(rowKey, imei);
}
}
consumer.close();
}
}
private void process(String rowKey, String imei) {
//处理规则略
}
启动程序开启线程:KafkaMsgKit
package com.diasit.shenzhou.kafka;
import java.util.ArrayList;
import java.util.List;
public class KafkaMsgKit {
private static int count = 3;
private static List<KafkaMsgConsumer> list;
/**
* 开始监听topic
*/
public static void start() {
list = new ArrayList<>();
for( int i = 0; i < count; ++i) {
list.add(new KafkaMsgConsumer("group_" + i, KafkaConfig.getInstance()));
}
// 启动程序
list.forEach(x -> new Thread(x).start());
}
/**
* 关闭线程处理
*/
public static void stop() {
if (list == null) {
return;
}
list.forEach(x -> {
x.stop();
});
}
}
jfinal启动后回调,开启kafka消息处理
KafkaMsgKit.start();