1.kafka的搭建需要依赖zookeeper,现官网最新版已经内置了zookeeper,此处使用内置的ZK.
先去官网下载kafka,
kafka参考:https://blog.csdn.net/woshixiazaizhe/article/details/80610432
,下载后解压目录格式如图
其中config是配置文件,复制一下这个文件夹,放到bin/windows下
2.修改config目录下的zookeeper.properties
新建一个文件夹用于存放ZK的日志
回到bin/windows目录下,在此目录打开cmd(shift+鼠标右键),输入:
zookeeper-server-start.bat config\zookeeper.properties
特别注意,此目录层级不能过深和带空格,否则会无法启动
3.此时ZK服务就已经启动了,接下来安装ZK可视化工具ZK-UI,下载地址
https://github.com/DeemOpen/zkui
这是一个java工程,依赖maven搭建,mvn install出jar包
复制出这个jar包,和config文件,放到同一个目录
输入
java -jar [你的jar包名字]
例如
java -jar zkui-2.0-SNAPSHOT-jar-with-dependencies.jar
ZKui启动成功,参考:https://www.jianshu.com/p/8320a6c52f15
网页输入
账号admin
密码manager
4.启动kafka
修改config下server.properties文件,日志改为你需要的目录,
去kafka的bin/windows目录下
kafka-server-start.bat config\server.properties
,kafka启动成功,此时ZKUI界面可以看到节点了
5.kafka可视化管理软件kafkatool,安装后连接到节点即可管理
收到的信息默认是byte类型,可以改成string就可以显示为正常字符串.
6.使用java发送消息
此处使用的springboot版本是2.1.3,查看依赖的spring版本是5.1.5,因此spring_kafuka包需要对应的版本否则会依赖冲突
去maven仓库中找到2.2.4刚好版本符合,引入2.2.4版本
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.4.RELEASE</version>
</dependency>
配置配置文件,注意有
factory.setAutoStartup(autoMark);
方法,为kafka是否随系统自动启动,不需要的时候关闭即可,这个还挺好用的
@Configuration
@EnableKafka
public class KafkaConfiguration {
//写入配置文件中,控制项目启动时是否启用kafka
private String kafkaListenerFlag = PropertiesUtil.getProperty("application", "datacenter.kafkaListenerFlag", "false");
//写入配置文件中,控制kafka的IP端口连接
private String kafkaIpPort = PropertiesUtil.getProperty("application", "datacenter.kafkaIpPort", "localhost:9092");
//ConcurrentKafkaListenerContainerFactory为创建Kafka监听器的工程类,这里只配置了消费者
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
boolean autoMark = Boolean.parseBoolean(kafkaListenerFlag);
System.out.println("是否启动kafka:" + autoMark);
factory.setAutoStartup(autoMark);
return factory;
}
//根据consumerProps填写的参数创建消费者工厂
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerProps());
}
//根据senderProps填写的参数创建生产者工厂
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(senderProps());
}
//kafkaTemplate实现了Kafka发送接收等功能
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
KafkaTemplate template = new KafkaTemplate<Integer, String>(producerFactory());
return template;
}
//消费者配置参数
private Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>();
//连接地址
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaIpPort);
//GroupID
props.put(ConsumerConfig.GROUP_ID_CONFIG, "bootKafka");
//是否自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
//自动提交的频率
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
//Session超时设置
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
//键的反序列化方式
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
//值的反序列化方式
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
//生产者配置
private Map<String, Object> senderProps() {
Map<String, Object> props = new HashMap<>();
//连接地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaIpPort);
//重试,0为不启用重试机制
props.put(ProducerConfig.RETRIES_CONFIG, 1);
//控制批处理大小,单位为字节
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
//批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
//生产者可以使用的总内存字节来缓冲等待发送到服务器的记录
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024000);
//键的序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
//值的序列化方式
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
}
配置监听者
@Component
public class DemoListener {
//声明consumerID为demo,监听topicName为topic.quick.demo的Topic
@KafkaListener(id = "demo", topics = "topic.quick.demo")
public void listen(String msgData) {
System.out.println("测试接收kafka: " + msgData + " 时间:" + new Date());
}
}
模拟发送信息
@RestController
@RequestMapping("/test/kafka")
public class KafKaAction extends BaseAction {
@Autowired
private KafkaTemplate<Integer, String> kafkaTemplate;
@GetMapping(value = "/testKafKa")
public ModelAndResult getDataBySocket() {
for (int i = 0; i < 20; i++) {
kafkaTemplate.send("topic.quick.demo", "测试kafka:" + i);
}
return new ModelAndResult(new Date());
}
}
效率还是挺高的
注意:
使用远程连接的时候,有时候kafka会去解析写在ZK里面的远程计算机的名字,而不是IP,如果没解析到正确的IP,就会报出 Can't resolve address [计算机:9092]的解析错误,网上很多方法都是改本地hosts来解析,个人觉得不靠谱,总不能每台机器都去改本机hosts吧?
这里可以通过修改zk里面的配置来正确解析,登录到ZKUI界面,找到kafka,进入brokers/ids,将无法解析的计算机名字换成对应的IP地址即可正常解析,不需要改hosts文件
此时的kafka只有一个分区,即使你的消费者开启了多个实例,也只会有一个消费者能接收到信息,我们一般是需要多个消费者功能消费信息吧,这就要增加partitions节点了,就是加分区
输入命令
windows的话在win下输入
kafka-topics.bat --zookeeper [zk服务地址]:2181 --alter --partitions [需要加入的节点数] --topic [需要加入的topic]
例如
kafka-topics.bat --zookeeper localhost:2181 --alter --partitions 3 --topic topic.quick.demo
加完后,注意此处分区数应该大于消费者数量,不然又要有消费者接不到
再次发送效果则被多消费者消费到
消费者1:
消费者2:
暂时就这么多,有新内容再更新
2019年11月16日10:59:30