Kafka将EMQX的消息传递给消费者处理(Windows平台)

1、Docker安装Kafka

打开Cmd命令行

docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka

创建docker-compose.yml文件

version: '1'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    depends_on: [ zookeeper ]
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 本机IP
      KAFKA_CREATE_TOPICS: "test:1:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - 本地路径(如D:\)docker.sock:/var/run/docker.sock

在docker-compose.yml文件目录进行服务打包

docker-compose build

启动服务

docker-compose up -d

创建两个Topic为后面的程序使用

kafka-topics.sh --zookeeper 本机IP:2181 --create --replication-factor 1 --partitions 3 --topic first  // 生产者使用 
kafka-topics.sh --zookeeper 本机IP:2181 --create --replication-factor 1 --partitions 3 --topic second // 消费者使用 

2、用Java获取Emq(见之前的文档)的数据,并由生产者发出

pom.xml(同消费者)

<dependencies>
    <!--mqtt-->
    <dependency>
        <groupId>org.eclipse.paho</groupId>
        <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
        <version>1.2.5</version>
    </dependency>
     <!--kafka-->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.6.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.6.0</version>
    </dependency>
    <!--日志-->
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-nop</artifactId>
        <version>1.7.22</version>
    </dependency>
</dependencies>

MqttKafkaProducer.java

package cc.hiver.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;

/**
 * MQTT生产者
 */
public class MqttKafkaProducer {
    /**
     * 向Kafka传入数据
     * @param msgData
     */
    public static void pushData(String msgData) {
        Properties props = new Properties();
        // 集群地址,多个服务器用","分隔
        props.put("bootstrap.servers", "本机IP:9092");
        // 重新发送消息次数,到达次数返回错误
        props.put("retries", 0);
        // Producer会尝试去把发往同一个Partition的多个Requests进行合并,batch.size指明了一次Batch合并后Requests总大小的上限。如果这个值设置的太小,可能会导致所有的Request都不进行Batch。
        props.put("batch.size", 163840);
        // Producer默认会把两次发送时间间隔内收集到的所有Requests进行一次聚合然后再发送,以此提高吞吐量,而linger.ms则更进一步,这个参数为每次发送增加一些delay,以此来聚合更多的Message。
        props.put("linger.ms", 1);
        // 在Producer端用来存放尚未发送出去的Message的缓冲区大小
        props.put("buffer.memory", 33554432);
        // key、value的序列化,此处以字符串为例,使用kafka已有的序列化类
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // props.put("partitioner.class", "com.kafka.demo.Partitioner");//分区操作,此处未写
        props.put("acks", "1");
        props.put("request.timeout.ms", "60000");
        props.put("compression.type", "lz4");
        //创建生产者
        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        //通过时间做轮循,均匀分布设置的partition,提高效率。
        int partition = (int) (System.currentTimeMillis() % 3);
        //写入名为"test-partition-1"的topic
        ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("first", partition, UUID.randomUUID().toString(), msgData);
        try {
            producer.send(producerRecord).get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        System.out.println("写入emqtopic到first:" + msgData);
    }
}

OnMessageCallback.java

package cc.hiver.producer;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

/**
 * 消息回调函数
 */
public class OnMessageCallback implements MqttCallback {
    public void connectionLost(Throwable cause) {
        // 连接丢失后,一般在这里面进行重连
        System.out.println("连接断开,可以做重连");
    }

    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // subscribe后得到的消息会执行到这里面
        System.out.println("接收消息主题:" + topic);
        System.out.println("接收消息Qos:" + message.getQos());
        System.out.println("接收消息内容:" + new String(message.getPayload()));

        //接收到的消息发送到Kafka
        MqttKafkaProducer.pushData(new String(message.getPayload()));
    }

    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("deliveryComplete---------" + token.isComplete());
    }
}

ProducerApp.java

package cc.hiver.producer;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

/**
 * 生产者App
 */
public class ProducerApp {
    /**
     * 主程序
     *
     * @param args
     */
    public static void main(String[] args) {
        // 订阅的主题
        String subTopic = "testtopic/#";
        // Broker服务
        String broker = "tcp://本机IP:1883";
        // 客户端名称
        String clientId = "mqtt_java_hiver";
        MemoryPersistence persistence = new MemoryPersistence();

        try {
            MqttClient client = new MqttClient(broker, clientId, persistence);
            // MQTT 连接选项
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setUserName("admin");    // 用户名
            connOpts.setPassword("public".toCharArray());     // 密码
            // 保留会话
            connOpts.setCleanSession(true);
            // 设置回调
            client.setCallback(new OnMessageCallback());
            // 建立连接
            System.out.println("Connecting to broker: " + broker);
            client.connect(connOpts);
            System.out.println("Connected");
            // 订阅
            client.subscribe(subTopic);
        } catch (MqttException me) {
            // 异常捕捉
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
        }
    }
}

3、消费者

LogProcessor.java

package cc.hiver.consumer;

import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;

/**
 * 日志清理
 */
public class LogProcessor implements Processor<byte[], byte[]> {

    private ProcessorContext context;

    public void init(ProcessorContext context) {
        this.context = context;
    }

    public void process(byte[] key, byte[] value) {
        String input = new String(value);
        if(input.contains("hello")) {
            System.out.println("logProcessor:" + input.toString());
            context.forward("logProcessor".getBytes(), input.getBytes());
        } else {
            // 这里可以进行数据清理
            // 输出到下一个topic
            context.forward("logProcessor".getBytes(), input.getBytes());
        }
    }

    public void punctuate(long timestamp) {

    }

    public void close() {

    }
}

ConsumerApp.java

package cc.hiver.consumer;

import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;

public class ConsumerApp {
    public static void main(String[] args) {
        // 定义输入的topic
        String from = "first";
        // 定义输出的topic
        String to = "second";

        // 设置参数
        Properties settings = new Properties();
        settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");
        settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "本机IP:9092");

        StreamsConfig config = new StreamsConfig(settings);

        // 构建拓扑
        Topology builder = new Topology();

        builder.addSource("SOURCE", from)
                .addProcessor("PROCESS", new ProcessorSupplier<byte[], byte[]>() {

                    @Override
                    public Processor<byte[], byte[]> get() {
                        // 具体分析处理
                        return new LogProcessor();
                    }
                }, "SOURCE")
                .addSink("SINK", to, "PROCESS");

        // 创建kafka stream
        KafkaStreams streams = new KafkaStreams(builder, config);
        streams.start();
    }
}

4、测试数据

Emq X Broker消息模拟

接收消息并通过Kafka转发至消费者

消费者接收消息进行处理

附录:kafka集群管理界面

docker run -itd --name=kafka-manager -p 9000:9000 -e ZK_HOSTS="本机IP:2181" sheepkiller/kafka-manager

访问:http://本机IP:9000


管理后台
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351

推荐阅读更多精彩内容