Kafka实操:整合spring开发操作

Kafka + Spring 实操

之前曾经写过关于Kafka的介绍和单机安装,这里我们来看下如何在实际代码中使用Kafka

还不知道如何安装的伙计们可以点击此处查看

业务

在我的小说项目中有一个这样的功能:

在功能小说管理中, 如果我们删除了其中一本小说,我们需要删除这本小说的 章节, 章节内容, 书评等等和小说相关的数据, 如果直接写到删除小说的代码中的话,首先是代码比较冗余不优雅, 二是对效率有一定的影响,

所以这里我采用Kafka消息通知的方式, 通过异步来进行操作

下面来看具体实现

发送端

项目采用分布式架构来进行开发的, 所以我这边新建一个模块专门处理kafka消息发送端的代码

maven 这是需要的jar

<!--kafka-->
<kafka.version>2.1.0</kafka.version>
<integration.verion>3.1.0.RELEASE</integration.verion>
<spring-kafka.verion>2.2.2.RELEASE</spring-kafka.verion>

<!--kafka start-->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>${kafka.version}</version>
    <scope>compile</scope>
    <exclusions>
        <exclusion>
            <artifactId>jmxri</artifactId>
            <groupId>com.sun.jmx</groupId>
        </exclusion>
        <exclusion>
            <artifactId>jms</artifactId>
            <groupId>javax.jms</groupId>
        </exclusion>
        <exclusion>
            <artifactId>jmxtools</artifactId>
            <groupId>com.sun.jdmk</groupId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>${kafka.version}</version>
</dependency>

<!--spring kafka-->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-kafka</artifactId>
    <version>${integration.verion}</version>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>${spring-kafka.verion}</version>
</dependency>
<!--kafka end-->

接下来是spring-producer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
         http://www.springframework.org/schema/context
         http://www.springframework.org/schema/context/spring-context.xsd">

  <!-- 定义producer的参数 -->
  <bean id="producerProperties" class="java.util.HashMap">
      <constructor-arg>
          <map>
              <!--这里配置kafka ip和端口-->
              <!--有关配置中的详细描述, 单独讲解-->
              <entry key="bootstrap.servers" value="${bootstrap.servers}" />
              <entry key="group.id" value="0" />
              <entry key="retries" value="0" />
              <entry key="batch.size" value="16384" />
              <entry key="linger.ms" value="1" />
              <entry key="buffer.memory" value="33554432" />
              <entry key="acks" value="1" />
              <entry key="key.serializer"
                      value="org.apache.kafka.common.serialization.StringSerializer" />
              <entry key="value.serializer"
                      value="org.apache.kafka.common.serialization.StringSerializer" />
          </map>
      </constructor-arg>
  </bean>

  <!-- 创建kafkatemplate需要使用的producerfactory -->
  <bean id="producerFactory"
        class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
      <constructor-arg>
          <ref bean="producerProperties" />
      </constructor-arg>
  </bean>

  <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
  <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
      <constructor-arg ref="producerFactory" />
      <constructor-arg name="autoFlush" value="true" />
      <property name="defaultTopic" value="default" />
  </bean>
</beans>

接下来我们来发送消息吧
Kafka 基于 kafkaTemplate发送消息, 我们对其进行封装一下, 以防止我们换了MQ需要整个项目的替换

BooksMQService

public interface BooksMQService {
    void deleteByBooksId(String booksId);
}

对该接口进行实现
BooksMQServiceImpl

@Service("booksMQService")
public class BooksMQServiceImpl implements BooksMQService {

  @Resource
  private KafkaTemplate<String, Object> kafkaTemplate;

  //这里是topic, 写在properties中
  @Value("${kafka.books.del}")
  public String kafkaBooksDelTopic;

  @Override
  public void deleteByBooksId(String booksId) {
    //这是具体发送消息
    kafkaTemplate.send(kafkaBooksDelTopic, booksId);
  }
}

到此发送端就已经完成了,我们只要这样操作

@Resource
private BooksMQService booksMQService;

//发送消息
booksMQService.deleteByBooksId(booksVo.getId());

接收端

这里我将消息接收端已一个web项目单独进行运行 jar包没有改变, 和发送端是一样的, 我们来看下具体的配置

spring-consumer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns:context="http://www.springframework.org/schema/context"
      xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context.xsd">

  <!-- 定义producer的参数 -->
  <bean id="consumerProperties" class="java.util.HashMap">
      <constructor-arg>
          <map>
              <!--这里链接配置-->
              <entry key="bootstrap.servers" value="${bootstrap.servers}"/>
              <entry key="group.id" value="0"/>
              <entry key="enable.auto.commit" value="true"/>
              <entry key="session.timeout.ms" value="30000"/>
              <entry key="auto.commit.interval.ms" value="1000"/>
              <entry key="retry.backoff.ms" value="100"/>
              <entry key="key.deserializer"
                      value="org.apache.kafka.common.serialization.StringDeserializer"/>
              <entry key="value.deserializer"
                      value="org.apache.kafka.common.serialization.StringDeserializer"/>
          </map>
      </constructor-arg>
  </bean>

  <!-- 2.创建consumerFactory bean -->
  <bean id="consumerFactory"
        class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
      <constructor-arg>
          <ref bean="consumerProperties"/>
      </constructor-arg>
  </bean>

  <!-- 4.消费者容器配置信息 -->
  <bean id="containerProperties" class="org.springframework.kafka.listener.ContainerProperties">
      <!-- topic -->
      <constructor-arg name="topics">
          <list>
              <!--对topic集中配置-->
              <value>${kafka.books.del}</value>
          </list>
      </constructor-arg>
      <property name="messageListener" ref="messageListener"/>
  </bean>

  <bean id="messageListener" class="com.sanq.product.books.kafka.producer.KafkaMessageListenerImpl">
  </bean>
  <!-- 5.消费者并发消息监听容器,执行doStart()方法 -->
  <bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer"
        init-method="doStart">
      <constructor-arg ref="containerProperties"/>
      <constructor-arg ref="consumerFactory"/>
      <property name="concurrency" value="3"/>
  </bean>
</beans>

针对不同的topic, 在KafkaMessageListenerImpl中具体的处理方式有两种

  1. 实现MessageListener接口, 通过if判断是那个topic
  2. 通过@KafkaListener(topics = "${kafka.books.del}")这种注解的方式

在这里我采用第一种方式, 第二种方式等随后研究研究发出来(或者谁来给我发个demo看看啊) -_-!!

public class KafkaMessageListenerImpl implements MessageListener<String, Object> {

    @Override
    public void onMessage(ConsumerRecord<String, Object> record) {
        //在这里进行判断
        System.out.println(String.format("%s:::%s", record.topic(), record.value()));
    }
}

注意事项

在properties配置bootstrap.servers的时候, 地址需要和kafka中server.properties配置的advertised.listeners一致

如果没有配置advertised.listeners, 那么需要和listeners一致

完结

这样我们就实现了通过整合Kafka + Spring来发送一些简单的消息

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。