RabbitMQ & Kafka

以下只针对Spring项目。

  • 消息队列具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。
    当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发RocketMQ等。
    下面我们先介绍RabbitMQ简单使用,再介绍Kafka的简单使用。

1.RabbitMQ

  • RabbitMQ基础知识不介绍,如果有兴趣可以去RabbitMQ官网了解。

引入依赖

        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
        </dependency>

1.1 配置文件

@Configuration
public class RabbitMQConfig {

    //定义交换机名称
    public static final String GOODS_UP_EXCHANGE="交换机名称";
 
    //定义队列名称
    public static final String AD_UPDATE_QUEUE="队列名称";

    //声明队列
    @Bean
    public Queue queue(){
        return new Queue(AD_UPDATE_QUEUE);
    }
    
    //声明交换机
    @Bean(GOODS_UP_EXCHANGE)
    public Exchange GOODS_UP_EXCHANGE(){
        return ExchangeBuilder.fanoutExchange(GOODS_UP_EXCHANGE).durable(true).build();
    }
  
    //队列与交换机的绑定
    @Bean
    public Binding GOODS_UP_EXCHANGE_BINDING(@Qualifier(SEARCH_ADD_QUEUE)Queue queue,@Qualifier(GOODS_UP_EXCHANGE)Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("").noargs();
    }
}

1.2 使用

  • 首先在你的项目注入:
    @Autowired
    private RabbitTemplate rabbitTemplate;
  • 生产方
    //发送消息
    rabbitTemplate.convertAndSend("", RabbitMQConfig.AD_UPDATE_QUEUE,"你的信息");
  • 消费方
    @RabbitListener(queues = "队列名称")
    public void receiveMsg(String message){
        System.out.println("接收到的消息为:"+message);
        //TODO 你的业务逻辑
    }

2.Kafka

  • Kafka基础知识暂不介绍,如果有兴趣可以去Kafka官网了解。

引入依赖

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>

2.1 配置文件

消费者

spring.kafka:
  consumer:
    #你的卡发卡集群
    bootstrap-servers: 192.168.18.114:9092,192.168.18.115:9092,192.168.18.116:9092
    #你的groupId
    group-id: groupId
    fetch-min-size: 100
    auto-offset-reset: latest
    enable-auto-commit: true
    auto-commit-interval: 100
    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

生产者

spring:
  kafka:
    #你的kafka集群
    bootstrap-servers: 172.31.64.10:9092,172.31.64.11:9092,172.31.64.12:9092
    #你的groupId
    group-id: id
    enable-auto-commit: true
    auto-commit-interval: 100
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

2.2 使用

  • 首先在你的项目中注入
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;
  • 生产者
    kafkaTemplate.send(topic, data);
  • 消费者
        @KafkaListener(topics = {topic})
        public void consumer(ConsumerRecord<?, ?> record){
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            if (kafkaMessage.isPresent()) {
                Object message = kafkaMessage.get();
                log.info("----------------- record =" + record);
                log.info("------------------ message =" + message);
            }
        }

3.总结

上面仅仅是对RabbitMQ和Kafka做了简单的使用介绍,若用于企业开发,还需进一步开发和优化。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
禁止转载,如需转载请通过简信或评论联系作者。

相关阅读更多精彩内容

友情链接更多精彩内容