以下只针对Spring项目。
- 消息队列具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。
当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发RocketMQ等。
下面我们先介绍RabbitMQ简单使用,再介绍Kafka的简单使用。
1.RabbitMQ
- RabbitMQ基础知识不介绍,如果有兴趣可以去RabbitMQ官网了解。
引入依赖
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
1.1 配置文件
@Configuration
public class RabbitMQConfig {
//定义交换机名称
public static final String GOODS_UP_EXCHANGE="交换机名称";
//定义队列名称
public static final String AD_UPDATE_QUEUE="队列名称";
//声明队列
@Bean
public Queue queue(){
return new Queue(AD_UPDATE_QUEUE);
}
//声明交换机
@Bean(GOODS_UP_EXCHANGE)
public Exchange GOODS_UP_EXCHANGE(){
return ExchangeBuilder.fanoutExchange(GOODS_UP_EXCHANGE).durable(true).build();
}
//队列与交换机的绑定
@Bean
public Binding GOODS_UP_EXCHANGE_BINDING(@Qualifier(SEARCH_ADD_QUEUE)Queue queue,@Qualifier(GOODS_UP_EXCHANGE)Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("").noargs();
}
}
1.2 使用
- 首先在你的项目注入:
@Autowired
private RabbitTemplate rabbitTemplate;
- 生产方
//发送消息
rabbitTemplate.convertAndSend("", RabbitMQConfig.AD_UPDATE_QUEUE,"你的信息");
- 消费方
@RabbitListener(queues = "队列名称")
public void receiveMsg(String message){
System.out.println("接收到的消息为:"+message);
//TODO 你的业务逻辑
}
2.Kafka
- Kafka基础知识暂不介绍,如果有兴趣可以去Kafka官网了解。
引入依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
2.1 配置文件
消费者
spring.kafka:
consumer:
#你的卡发卡集群
bootstrap-servers: 192.168.18.114:9092,192.168.18.115:9092,192.168.18.116:9092
#你的groupId
group-id: groupId
fetch-min-size: 100
auto-offset-reset: latest
enable-auto-commit: true
auto-commit-interval: 100
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
生产者
spring:
kafka:
#你的kafka集群
bootstrap-servers: 172.31.64.10:9092,172.31.64.11:9092,172.31.64.12:9092
#你的groupId
group-id: id
enable-auto-commit: true
auto-commit-interval: 100
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
2.2 使用
- 首先在你的项目中注入
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
- 生产者
kafkaTemplate.send(topic, data);
- 消费者
@KafkaListener(topics = {topic})
public void consumer(ConsumerRecord<?, ?> record){
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
log.info("----------------- record =" + record);
log.info("------------------ message =" + message);
}
}
3.总结
上面仅仅是对RabbitMQ和Kafka做了简单的使用介绍,若用于企业开发,还需进一步开发和优化。