Kafka + Spring 实操
之前曾经写过关于Kafka的介绍和单机安装,这里我们来看下如何在实际代码中使用Kafka
还不知道如何安装的伙计们可以点击此处查看
业务
在我的小说项目中有一个这样的功能:
在功能小说管理中, 如果我们删除了其中一本小说,我们需要删除这本小说的 章节, 章节内容, 书评等等和小说相关的数据, 如果直接写到删除小说的代码中的话,首先是代码比较冗余不优雅, 二是对效率有一定的影响,
所以这里我采用Kafka消息通知的方式, 通过异步来进行操作
下面来看具体实现
发送端
项目采用分布式架构来进行开发的, 所以我这边新建一个模块专门处理kafka消息发送端的代码
maven 这是需要的jar
<!--kafka-->
<kafka.version>2.1.0</kafka.version>
<integration.verion>3.1.0.RELEASE</integration.verion>
<spring-kafka.verion>2.2.2.RELEASE</spring-kafka.verion>
<!--kafka start-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<artifactId>jmxri</artifactId>
<groupId>com.sun.jmx</groupId>
</exclusion>
<exclusion>
<artifactId>jms</artifactId>
<groupId>javax.jms</groupId>
</exclusion>
<exclusion>
<artifactId>jmxtools</artifactId>
<groupId>com.sun.jdmk</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
<!--spring kafka-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>${integration.verion}</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.verion}</version>
</dependency>
<!--kafka end-->
接下来是spring-producer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<!-- 定义producer的参数 -->
<bean id="producerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<!--这里配置kafka ip和端口-->
<!--有关配置中的详细描述, 单独讲解-->
<entry key="bootstrap.servers" value="${bootstrap.servers}" />
<entry key="group.id" value="0" />
<entry key="retries" value="0" />
<entry key="batch.size" value="16384" />
<entry key="linger.ms" value="1" />
<entry key="buffer.memory" value="33554432" />
<entry key="acks" value="1" />
<entry key="key.serializer"
value="org.apache.kafka.common.serialization.StringSerializer" />
<entry key="value.serializer"
value="org.apache.kafka.common.serialization.StringSerializer" />
</map>
</constructor-arg>
</bean>
<!-- 创建kafkatemplate需要使用的producerfactory -->
<bean id="producerFactory"
class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<ref bean="producerProperties" />
</constructor-arg>
</bean>
<!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
<bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg ref="producerFactory" />
<constructor-arg name="autoFlush" value="true" />
<property name="defaultTopic" value="default" />
</bean>
</beans>
接下来我们来发送消息吧
Kafka 基于kafkaTemplate
发送消息, 我们对其进行封装一下, 以防止我们换了MQ需要整个项目的替换
BooksMQService
public interface BooksMQService {
void deleteByBooksId(String booksId);
}
对该接口进行实现
BooksMQServiceImpl
@Service("booksMQService")
public class BooksMQServiceImpl implements BooksMQService {
@Resource
private KafkaTemplate<String, Object> kafkaTemplate;
//这里是topic, 写在properties中
@Value("${kafka.books.del}")
public String kafkaBooksDelTopic;
@Override
public void deleteByBooksId(String booksId) {
//这是具体发送消息
kafkaTemplate.send(kafkaBooksDelTopic, booksId);
}
}
到此发送端就已经完成了,我们只要这样操作
@Resource
private BooksMQService booksMQService;
//发送消息
booksMQService.deleteByBooksId(booksVo.getId());
接收端
这里我将消息接收端已一个web项目单独进行运行 jar包没有改变, 和发送端是一样的, 我们来看下具体的配置
spring-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<!-- 定义producer的参数 -->
<bean id="consumerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<!--这里链接配置-->
<entry key="bootstrap.servers" value="${bootstrap.servers}"/>
<entry key="group.id" value="0"/>
<entry key="enable.auto.commit" value="true"/>
<entry key="session.timeout.ms" value="30000"/>
<entry key="auto.commit.interval.ms" value="1000"/>
<entry key="retry.backoff.ms" value="100"/>
<entry key="key.deserializer"
value="org.apache.kafka.common.serialization.StringDeserializer"/>
<entry key="value.deserializer"
value="org.apache.kafka.common.serialization.StringDeserializer"/>
</map>
</constructor-arg>
</bean>
<!-- 2.创建consumerFactory bean -->
<bean id="consumerFactory"
class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<ref bean="consumerProperties"/>
</constructor-arg>
</bean>
<!-- 4.消费者容器配置信息 -->
<bean id="containerProperties" class="org.springframework.kafka.listener.ContainerProperties">
<!-- topic -->
<constructor-arg name="topics">
<list>
<!--对topic集中配置-->
<value>${kafka.books.del}</value>
</list>
</constructor-arg>
<property name="messageListener" ref="messageListener"/>
</bean>
<bean id="messageListener" class="com.sanq.product.books.kafka.producer.KafkaMessageListenerImpl">
</bean>
<!-- 5.消费者并发消息监听容器,执行doStart()方法 -->
<bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer"
init-method="doStart">
<constructor-arg ref="containerProperties"/>
<constructor-arg ref="consumerFactory"/>
<property name="concurrency" value="3"/>
</bean>
</beans>
针对不同的topic, 在KafkaMessageListenerImpl中具体的处理方式有两种
- 实现MessageListener接口, 通过if判断是那个topic
- 通过@KafkaListener(topics = "${kafka.books.del}")这种注解的方式
在这里我采用第一种方式, 第二种方式等随后研究研究发出来(或者谁来给我发个demo看看啊) -_-!!
public class KafkaMessageListenerImpl implements MessageListener<String, Object> {
@Override
public void onMessage(ConsumerRecord<String, Object> record) {
//在这里进行判断
System.out.println(String.format("%s:::%s", record.topic(), record.value()));
}
}
注意事项
在properties配置bootstrap.servers的时候, 地址需要和kafka中server.properties配置的advertised.listeners
一致
如果没有配置advertised.listeners
, 那么需要和listeners
一致
完结
这样我们就实现了通过整合Kafka + Spring来发送一些简单的消息