1、Docker安装Kafka
打开Cmd命令行
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
创建docker-compose.yml文件
version: '1'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
depends_on: [ zookeeper ]
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 本机IP
KAFKA_CREATE_TOPICS: "test:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- 本地路径(如D:\)docker.sock:/var/run/docker.sock
在docker-compose.yml文件目录进行服务打包
docker-compose build
启动服务
docker-compose up -d
创建两个Topic为后面的程序使用
kafka-topics.sh --zookeeper 本机IP:2181 --create --replication-factor 1 --partitions 3 --topic first // 生产者使用
kafka-topics.sh --zookeeper 本机IP:2181 --create --replication-factor 1 --partitions 3 --topic second // 消费者使用
2、用Java获取Emq(见之前的文档)的数据,并由生产者发出
pom.xml(同消费者)
<dependencies>
<!--mqtt-->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
<!--kafka-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.6.0</version>
</dependency>
<!--日志-->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.22</version>
</dependency>
</dependencies>
MqttKafkaProducer.java
package cc.hiver.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
/**
* MQTT生产者
*/
public class MqttKafkaProducer {
/**
* 向Kafka传入数据
* @param msgData
*/
public static void pushData(String msgData) {
Properties props = new Properties();
// 集群地址,多个服务器用","分隔
props.put("bootstrap.servers", "本机IP:9092");
// 重新发送消息次数,到达次数返回错误
props.put("retries", 0);
// Producer会尝试去把发往同一个Partition的多个Requests进行合并,batch.size指明了一次Batch合并后Requests总大小的上限。如果这个值设置的太小,可能会导致所有的Request都不进行Batch。
props.put("batch.size", 163840);
// Producer默认会把两次发送时间间隔内收集到的所有Requests进行一次聚合然后再发送,以此提高吞吐量,而linger.ms则更进一步,这个参数为每次发送增加一些delay,以此来聚合更多的Message。
props.put("linger.ms", 1);
// 在Producer端用来存放尚未发送出去的Message的缓冲区大小
props.put("buffer.memory", 33554432);
// key、value的序列化,此处以字符串为例,使用kafka已有的序列化类
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// props.put("partitioner.class", "com.kafka.demo.Partitioner");//分区操作,此处未写
props.put("acks", "1");
props.put("request.timeout.ms", "60000");
props.put("compression.type", "lz4");
//创建生产者
Producer<String, String> producer = new KafkaProducer<String, String>(props);
//通过时间做轮循,均匀分布设置的partition,提高效率。
int partition = (int) (System.currentTimeMillis() % 3);
//写入名为"test-partition-1"的topic
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("first", partition, UUID.randomUUID().toString(), msgData);
try {
producer.send(producerRecord).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("写入emqtopic到first:" + msgData);
}
}
OnMessageCallback.java
package cc.hiver.producer;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
* 消息回调函数
*/
public class OnMessageCallback implements MqttCallback {
public void connectionLost(Throwable cause) {
// 连接丢失后,一般在这里面进行重连
System.out.println("连接断开,可以做重连");
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe后得到的消息会执行到这里面
System.out.println("接收消息主题:" + topic);
System.out.println("接收消息Qos:" + message.getQos());
System.out.println("接收消息内容:" + new String(message.getPayload()));
//接收到的消息发送到Kafka
MqttKafkaProducer.pushData(new String(message.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
}
ProducerApp.java
package cc.hiver.producer;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
* 生产者App
*/
public class ProducerApp {
/**
* 主程序
*
* @param args
*/
public static void main(String[] args) {
// 订阅的主题
String subTopic = "testtopic/#";
// Broker服务
String broker = "tcp://本机IP:1883";
// 客户端名称
String clientId = "mqtt_java_hiver";
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient client = new MqttClient(broker, clientId, persistence);
// MQTT 连接选项
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName("admin"); // 用户名
connOpts.setPassword("public".toCharArray()); // 密码
// 保留会话
connOpts.setCleanSession(true);
// 设置回调
client.setCallback(new OnMessageCallback());
// 建立连接
System.out.println("Connecting to broker: " + broker);
client.connect(connOpts);
System.out.println("Connected");
// 订阅
client.subscribe(subTopic);
} catch (MqttException me) {
// 异常捕捉
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
}
3、消费者
LogProcessor.java
package cc.hiver.consumer;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
/**
* 日志清理
*/
public class LogProcessor implements Processor<byte[], byte[]> {
private ProcessorContext context;
public void init(ProcessorContext context) {
this.context = context;
}
public void process(byte[] key, byte[] value) {
String input = new String(value);
if(input.contains("hello")) {
System.out.println("logProcessor:" + input.toString());
context.forward("logProcessor".getBytes(), input.getBytes());
} else {
// 这里可以进行数据清理
// 输出到下一个topic
context.forward("logProcessor".getBytes(), input.getBytes());
}
}
public void punctuate(long timestamp) {
}
public void close() {
}
}
ConsumerApp.java
package cc.hiver.consumer;
import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
public class ConsumerApp {
public static void main(String[] args) {
// 定义输入的topic
String from = "first";
// 定义输出的topic
String to = "second";
// 设置参数
Properties settings = new Properties();
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "本机IP:9092");
StreamsConfig config = new StreamsConfig(settings);
// 构建拓扑
Topology builder = new Topology();
builder.addSource("SOURCE", from)
.addProcessor("PROCESS", new ProcessorSupplier<byte[], byte[]>() {
@Override
public Processor<byte[], byte[]> get() {
// 具体分析处理
return new LogProcessor();
}
}, "SOURCE")
.addSink("SINK", to, "PROCESS");
// 创建kafka stream
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
}
}
4、测试数据
附录:kafka集群管理界面
docker run -itd --name=kafka-manager -p 9000:9000 -e ZK_HOSTS="本机IP:2181" sheepkiller/kafka-manager
访问:http://本机IP:9000