-
引入依赖
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.66</version> </dependency> </dependencies>
-
加入配置
spring.application.name=springboot_rabbitmq spring.rabbitmq.host=192.168.233.102 spring.rabbitmq.virtual-host=/ spring.rabbitmq.username=root spring.rabbitmq.password=123456 spring.rabbitmq.port=5672
-
队列配置
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.amqp.core.*; import java.util.HashMap; import java.util.Map; /** * 定义订单的队列、交换机、队列绑定交换机、设置路由key */ @Configuration public class OrderRabbitConfig { @Bean public Queue orderQueueTTL(){ Map<String, Object> arguments = new HashMap<>(); // ttl = 30s arguments.put("x-message-ttl", 30 * 1000); // 设置关联的死信队列 if (true) { arguments.put("x-dead-letter-exchange", "ex.order.dlx"); arguments.put("x-dead-letter-routing-key", "order.pay.dlx"); } return new Queue("q.order.ttl", false, false, false, arguments); } @Bean public Exchange orderExchangeTTL() { return new DirectExchange("ex.order.ttl", false, false); } @Bean public Binding orderBindingTTL(){ return BindingBuilder.bind(orderQueueTTL()).to(orderExchangeTTL()).with("order.pay.ttl").noargs(); } }
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * 定义死信:队列、交换机、队列绑定交换机、设置路由key */ @Configuration public class DLXRabbitConfig { @Bean public Queue orderQueueDLX(){ return new Queue("q.order.dlx", false, false, false, null); } @Bean public Exchange orderExchangeDLX() { return new DirectExchange("ex.order.dlx", false, false); } @Bean public Binding orderBindingDLX(){ return BindingBuilder.bind(orderQueueDLX()).to(orderExchangeDLX()).with("order.pay.dlx").noargs(); } }
-
订单实现
/** * 订单控制器 */ @Controller @RequestMapping("/order") public class OrderController { @Autowired private AmqpTemplate rabbitTemplate; /** * 提交订单 * @return */ @RequestMapping("/submit") @ResponseBody public String submit(){ String orderNo = UUID.randomUUID().toString(); final Order order = new Order(orderNo, "测试商品", new BigDecimal(1000.00), new Date(), Order.STATUS_WAIT_PAY); OrderDB.orderMap.put(orderNo, order); rabbitTemplate.convertAndSend("ex.order.ttl", "order.pay.ttl", JSONObject.toJSON(order).toString()); return "下单成功待支付:" + orderNo; } /** * 订单列表 * @return */ @RequestMapping("/list") @ResponseBody public Object[] list(){ return OrderDB.orderMap.values().toArray(); } }
-
支付实现
/** * 支付控制器 */ @Controller @RequestMapping("/pay") public class PayController { @Autowired CachingConnectionFactory factory; @RequestMapping("/{orderNo}") @ResponseBody public String pay(@PathVariable("orderNo") String orderNo){ final Order order = OrderDB.orderMap.get(orderNo); // 从mq中拉取支付信息,确认支付状态 if (order.getOrderStatus().equals(Order.STATUS_WAIT_PAY)) { final Connection connection = factory.createConnection(); try(final Channel channel = connection.createChannel(true)){ // 主动拉取消息: 从队列; 指定是否自动确认消息 final GetResponse getResponse = channel.basicGet("q.order.ttl", true); // 需要注意的 basicAck 方法需要传递两个参数 // deliveryTag(唯一标识 ID):当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel // multiple:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息 channel.basicAck(getResponse.getEnvelope().getDeliveryTag(),false); // 更新订单状态 order.setOrderStatus(Order.STATUS_PAY_SUCCESS); OrderDB.orderMap.put(orderNo, order); } catch (Exception e){ e.printStackTrace(); } finally { if (connection.isOpen()){ connection.close(); } } return "支付成功"; } else if (order.getOrderStatus().equals(Order.STATUS_PAY_SUCCESS)) { return "支付已支付成功,请勿重复支付"; } else { return "订单已超时"; } } /** * 监听订单死信队列消息 */ @RabbitListener(queues = "q.order.dlx") public void dixLinseed(String message){ System.out.println(message); try { final Order order = JSONObject.parseObject(message, Order.class); if (order.getOrderStatus().equals(Order.STATUS_WAIT_PAY)) { order.setOrderStatus(Order.STATUS_TIMEOUT); OrderDB.orderMap.put(order.getOrderNo(), order); System.out.println("订单支付超时:" + order.getOrderNo()); } else if (order.getOrderStatus().equals(Order.STATUS_PAY_SUCCESS)) { System.out.println("订单已支付成功,死信队列监听处理。。。跳过"); } } catch (Exception e){ e.printStackTrace(); } } }
操作地址
提交订单:
http://localhost:8080/order/submit
订单列表:
http://localhost:8080/order/list
订单支付:
http://localhost:8080/pay/{orderNo}