消息队列kafka

kafka官方中文文档:https://kafka.apachecn.org/

本章结合jfinal集成kafka消息队列作为实例来对消息中间件进行总结

kafka的特点

作为消息队列,企业中选择mq的还是多数,像Rabbit,Rocket等mq中间件都非常火,那么kafka和这些比较热门的消息中间件相比有什么优势呢
kafka是一个分布式消息发布-订阅消息系统,原本设计的初衷是日志统计分析,它主要用于处理活跃的流式数据和大数据量的数据处理上,他的吞吐量要高于其他的mq中间件,并且内部采用消息的批量处理
例如我们公司目前做了一个垃圾车实时数据推送的项目,每辆车每隔几秒钟就会向服务器发送一些称重信息,而且每天都会有几百辆垃圾车同时工作,这种场景就非常适用于kafka

kafka原理

kafka常用名词:


1596549416(1).jpg

kafka概念图:


1596549692(1).jpg

上代码

转载kafka2.0的实现
https://blog.csdn.net/yitian_z/article/details/87909862

  • 数据接收端集成kafka(spring boot):

配置文件application-dev.properties

message.topic=shenzhou_topic

producer.bootstrapServers=47.101.42.23:9092
producer.acks=all
producer.retries=0
producer.batchSize=16384
producer.lingerMs=1
producer.bufferMemory=65536
producer.keySerializer=org.apache.kafka.common.serialization.StringSerializer
producer.valueSerializer=org.apache.kafka.common.serialization.StringSerializer

生产者java bean: KafkaProducerConfig

package com.diasit.bean;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Component()
@ConfigurationProperties(prefix="producer")
public class KafkaProducerConfig {

    private String bootstrapServers;

    private String acks;

    private Integer retries;

    private Integer batchSize;

    private Integer lingerMs;

    private Integer bufferMemory;

    private String keySerializer;

    private String valueSerializer;

    public String getBootstrapServers() {
        return bootstrapServers;
    }

    public void setBootstrapServers(String bootstrapServers) {
        this.bootstrapServers = bootstrapServers;
    }

    public String getAcks() {
        return acks;
    }

    public void setAcks(String acks) {
        this.acks = acks;
    }

    public Integer getRetries() {
        return retries;
    }

    public void setRetries(Integer retries) {
        this.retries = retries;
    }

    public Integer getBatchSize() {
        return batchSize;
    }

    public void setBatchSize(Integer batchSize) {
        this.batchSize = batchSize;
    }

    public Integer getLingerMs() {
        return lingerMs;
    }

    public void setLingerMs(Integer lingerMs) {
        this.lingerMs = lingerMs;
    }

    public Integer getBufferMemory() {
        return bufferMemory;
    }

    public void setBufferMemory(Integer bufferMemory) {
        this.bufferMemory = bufferMemory;
    }

    public String getKeySerializer() {
        return keySerializer;
    }

    public void setKeySerializer(String keySerializer) {
        this.keySerializer = keySerializer;
    }

    public String getValueSerializer() {
        return valueSerializer;
    }

    public void setValueSerializer(String valueSerializer) {
        this.valueSerializer = valueSerializer;
    }
}

生产者制造工厂:KafkaProducerFactory

package com.diasit.pool;

import com.diasit.bean.KafkaProducerConfig;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.context.ContextLoader;
import org.springframework.web.context.WebApplicationContext;

import java.util.Properties;

/**
 * kafaka 生产者制造工厂
 */
public class KafkaProducerFactory extends BasePooledObjectFactory<Producer<String, String>> {

    private KafkaProducerConfig config;

    public KafkaProducerFactory(KafkaProducerConfig config) {
        super();
        this.config = config;
    }

    @Override
    public Producer<String, String> create() throws Exception {

        Properties props = new Properties();

        props.put("bootstrap.servers", config.getBootstrapServers());
        props.put("acks", config.getAcks());
        props.put("retries", config.getRetries());
        props.put("batch.size", config.getBatchSize());
        props.put("linger.ms", config.getLingerMs());
        props.put("buffer.memory", config.getBufferMemory());
        props.put("key.serializer", config.getKeySerializer());
        props.put("value.serializer", config.getValueSerializer());

        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        return producer;
    }

    @Override
    public PooledObject<Producer<String, String>> wrap(Producer<String, String> producer) {
        return new DefaultPooledObject<>(producer);
    }
}

package com.diasit.pool;

import com.diasit.bean.KafkaProducerConfig;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.kafka.clients.producer.Producer;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.Properties;

public class KafkaProducerPool extends GenericObjectPool<Producer<String, String>> {

    public KafkaProducerPool(KafkaProducerConfig config) {
        super(new KafkaProducerFactory(config), new GenericObjectPoolConfig<>());
    }

}

kafka生产者工具:KafkaProducerKit

package com.diasit.socket.kafka;

import com.diasit.bean.KafkaProducerConfig;
import com.diasit.pool.KafkaProducerPool;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component
public class KafkaProducerKit {

    private static KafkaProducerPool pool;
    private static String TOPIC;

    @Autowired
    private KafkaProducerConfig config;

    @Value("${message.topic}")
    private String topic;

    @PostConstruct
    public void init() {
        TOPIC = topic;
        pool = new KafkaProducerPool(config);
    }

    /**
     * 发送消息
     * @param key
     * @param value
     */
    public static void send(String key, String value) {
        Producer<String, String> producer = null;

        try {
            producer = pool.borrowObject();

            producer.send(new ProducerRecord<>(TOPIC, key, value), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e != null) {
                        System.err.println("the producer has a error:" + e.getMessage());
                    }
                }
            });
            producer.flush();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (producer != null) {
                pool.returnObject(producer);
            }
        }
    }
}

使用:发送到消息队列(将每一条消息的车辆id和设备号发送至kafka服务器)

KafkaProducerKit.send(data.getId(), data.getImei());
  • 平台端集成kafka,并消费(jfinal):

配置文件config-dev.properties

#kafka 设置
kafka.bootstrap.servers=47.101.42.23:9092
kafka.enable.auto.commit=true
kafka.auto.commit.interval.ms=1000
kafka.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.topic=shenzhou_topic

kafka配置类:KafkaConfig

package com.diasit.shenzhou.kafka;

import com.jfinal.kit.PropKit;

/**
 * kafka 配置信息
 */
public class KafkaConfig {

    private static KafkaConfig config;

    private static final Object LOCK = new Object();

    private KafkaConfig() {}

    private String bootstrapServers;

    private boolean autoCommit;

    private Integer intervalMs;

    private String keyDeserializer;

    private String valueDeserializer;

    private String topic;

    public String getBootstrapServers() {
        return bootstrapServers;
    }

    public void setBootstrapServers(String bootstrapServers) {
        this.bootstrapServers = bootstrapServers;
    }

    public boolean isAutoCommit() {
        return autoCommit;
    }

    public void setAutoCommit(boolean autoCommit) {
        this.autoCommit = autoCommit;
    }

    public Integer getIntervalMs() {
        return intervalMs;
    }

    public void setIntervalMs(Integer intervalMs) {
        this.intervalMs = intervalMs;
    }

    public String getKeyDeserializer() {
        return keyDeserializer;
    }

    public void setKeyDeserializer(String keyDeserializer) {
        this.keyDeserializer = keyDeserializer;
    }

    public String getValueDeserializer() {
        return valueDeserializer;
    }

    public void setValueDeserializer(String valueDeserializer) {
        this.valueDeserializer = valueDeserializer;
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public static KafkaConfig getInstance() {
        if (config != null) {
            return config;
        }

        synchronized (LOCK){
            if (config != null) {
                return config;
            }

            config = new KafkaConfig();

            config.setAutoCommit(PropKit.getBoolean("kafka.enable.auto.commit", true));
            config.setBootstrapServers(PropKit.get("kafka.bootstrap.servers"));
            config.setIntervalMs(PropKit.getInt("kafka.auto.commit.interval.ms", 1000));
            config.setKeyDeserializer(PropKit.get("kafka.key.deserializer"));
            config.setValueDeserializer(PropKit.get("kafka.value.deserializer"));
            config.setTopic(PropKit.get("kafka.topic"));

            return config;
        }
    }
}

消费者线程:KafkaMsgConsumer

public class KafkaMsgConsumer implements Runnable{

    private boolean isContinue = true;

    private String groupId;

    private KafkaConfig config;

    public KafkaMsgConsumer(String groupId, KafkaConfig config) {
        this.groupId = groupId;
        this.config = config;
    }

    public Properties getProperties() {

        Properties props = new Properties();

        props.put("bootstrap.servers", config.getBootstrapServers());
        props.put("enable.auto.commit", config.isAutoCommit());
        props.put("auto.commit.interval.ms", config.getIntervalMs());
        props.put("key.deserializer", config.getKeyDeserializer());
        props.put("value.deserializer", config.getValueDeserializer());
        props.put("group.id", groupId);

        return props;
    }

    /**
     * 开始接收消息
     */
    @Override
    public void run() {
        Properties props = getProperties();
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        consumer.subscribe(Arrays.asList(config.getTopic()));

        while(isContinue) {
            ConsumerRecords<String, String> records = consumer.poll(500);

            if (records == null || records.isEmpty()) {
                continue;
            }

            for(ConsumerRecord<String, String> record: records) {

                String rowKey = record.key();
                String imei = record.value();

                // 处理
                process(rowKey, imei);
            }
        }

        consumer.close();
    }
}

private void process(String rowKey, String imei) {
//处理规则略
}

启动程序开启线程:KafkaMsgKit

package com.diasit.shenzhou.kafka;

import java.util.ArrayList;
import java.util.List;

public class KafkaMsgKit {

    private static int count = 3;

    private static List<KafkaMsgConsumer> list;

    /**
     * 开始监听topic
     */
    public static void start() {
        list = new ArrayList<>();

        for( int i = 0; i < count;  ++i) {
            list.add(new KafkaMsgConsumer("group_" + i, KafkaConfig.getInstance()));
        }


        // 启动程序
        list.forEach(x -> new Thread(x).start());
    }

    /**
     * 关闭线程处理
     */
    public static void stop() {

        if (list == null) {
           return;
        }

        list.forEach(x -> {
            x.stop();
        });
    }

}

jfinal启动后回调,开启kafka消息处理

KafkaMsgKit.start();
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,444评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,421评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,363评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,460评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,502评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,511评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,280评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,736评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,014评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,190评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,848评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,531评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,159评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,411评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,067评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,078评论 2 352