对于支付功能 第三方支付(如:支付宝、微信 )系统采用的是消息同步返回、异步通知+主动补偿查询的补偿机制。
以前都是我作为用户去调用支付宝或者微信支付,让他们来异步回调我的接口,
而现在公司要做开放平台,要让我们平台去异步通知用户的接口,如果异步请求未成功接收则要进行重新发送
间隔频率一般是平台交易处理成功后的 5s、30s、1m
最终打算使用rabbitmq的延迟队列+死信队列来实现。消息模型如下:
producer发布消息,通过exchangeA的消息会被分发到QueueA,Consumer监听queueA,一旦有消息到来就被消费,这边的消费业务就是开放平台给客户发送请求,如果失败,就创建一个延迟队列declareQueue(设置了ttl的) ,设置每个消息的ttl然后通过declare_exchange将消息分发到declare_queue,因为declare_queue没有consumer并且declare_queue中的消息设置了ttl,当ttl到期后,将通过DEX路由到queueA,被重新消费。
为此我写了个demo 代码如下:
1.先创建个配置文件:
/**
* RabbitMqConfig类配置队列交换机
* @author baojl
*
*/
@Configuration
public class RabbitMqConfig {
/**
* 订单消息实际消费队列所绑定的交换机
*/
@Bean
DirectExchange orderDirect() {
return (DirectExchange) ExchangeBuilder
.directExchange(QueueEnum.QUEUE_THIRD_POST.getExchange())
.durable(true)
.build();
}
/**
* 订单延迟队列队列所绑定的交换机
*/
@Bean
DirectExchange orderTtlDirect() {
return (DirectExchange) ExchangeBuilder
.directExchange(QueueEnum.QUEUE_TTL_THIRD_POST.getExchange())
.durable(true)
.build();
}
/**
* 订单实际消费队列
*/
@Bean
public Queue orderQueue() {
return new Queue(QueueEnum.QUEUE_THIRD_POST.getName());
}
/**
* 订单延迟队列(死信队列)
*/
@Bean
public Queue orderTtlQueue() {
return QueueBuilder
.durable(QueueEnum.QUEUE_TTL_THIRD_POST.getName())
.withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_THIRD_POST.getExchange())//到期后转发的交换机
.withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_THIRD_POST.getRouteKey())//到期后转发的路由键
.build();
}
/**
* 将订单队列绑定到交换机
*/
@Bean
Binding orderBinding(DirectExchange orderDirect, Queue orderQueue){
return BindingBuilder
.bind(orderQueue)
.to(orderDirect)
.with(QueueEnum.QUEUE_THIRD_POST.getRouteKey());
}
/**
* 将订单延迟队列绑定到交换机
*/
@Bean
Binding orderTtlBinding(DirectExchange orderTtlDirect, Queue orderTtlQueue){
return BindingBuilder
.bind(orderTtlQueue)
.to(orderTtlDirect)
.with(QueueEnum.QUEUE_TTL_THIRD_POST.getRouteKey());
}
}
2.消息枚举类
public enum QueueEnum {
/**
* 消息通知队列
*/
QUEUE_THIRD_POST("my.thirdpost.direct.exchange", "my.thirdpost.direct.queue", "my.thirdpost.direct.route"),
/**
* 消息通知ttl队列
*/
QUEUE_TTL_THIRD_POST("my.thirdpost.direct.ttl.exchange", "my.thirdpost.direct.ttl.queue", "my.thirdpost.direct.ttl.route");
/**
* 交换名称
*/
private String exchange;
/**
* 队列名称
*/
private String name;
/**
* 路由键
*/
private String routeKey;
QueueEnum(String exchange, String name, String routeKey) {
this.exchange = exchange;
this.name = name;
this.routeKey = routeKey;
}
public String getExchange() {
return exchange;
}
public String getName() {
return name;
}
public String getRouteKey() {
return routeKey;
}
}
3.发送方 我暂时设置的延迟时间为1分钟 1000*60毫秒 可以根据不同的次数设置不同的时间
次数我是在redis里记录的
@Component
public class MySender {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private StringRedisTemplate stringRedisTemplate;
/**
* 发送死信队列
* @param user
*/
public void sendDeadMessage(User user) {
System.out.println("++++++++++++++++++++++++++++++++++++++++++++");
System.out.println("【sendMessage 发送的消息 死信队列】 - 【发送时间】 - ["
+new Date()+"]- 【消息内容】 - ["+user.getName()+"]");
System.out.println("++++++++++++++++++++++++++++++++++++++++++++");
//给延迟队列发送消息
rabbitTemplate.convertAndSend(QueueEnum.QUEUE_TTL_THIRD_POST.getExchange(), QueueEnum.QUEUE_TTL_THIRD_POST.getRouteKey(), user, message -> {
message.getMessageProperties().setExpiration("60000");
return message;
});
}
/**
* 发送普通队列
* @param user
*/
public void sendMessage(User user){
System.out.println("++++++++++++++++++++++++++++++++++++++++++++");
System.out.println("【sendMessage 发送的消息 非死信队列】 - 【发送时间】 - ["
+new Date()+"]- 【消息内容】 - ["+user.getName()+"]");
System.out.println("++++++++++++++++++++++++++++++++++++++++++++");
stringRedisTemplate.opsForValue().set(user.getId(), "0");
//给延迟队列发送消息
rabbitTemplate.convertAndSend(QueueEnum.QUEUE_THIRD_POST.getExchange(), QueueEnum.QUEUE_THIRD_POST.getRouteKey(), user);
}
}
4.接收方:
@Component
@RabbitListener(queues = "my.thirdpost.direct.queue")
public class MyReceiver {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Autowired
private MySender mySender;
@RabbitHandler
public void handle(User user/*, Channel channel, Message message*/){
System.out.println("###########################################");
System.out.println("【MyReceiver 监听的消息】 - 【接收时间】 - ["
+new Date()+"]- 【消息内容】 - ["+user.getName()+"]");
System.out.println("###########################################");
//TODO 发送POST请求
this.sendHTTPRequest(user);
}
/**
* 发get请求
* @param user
*/
private void sendHTTPRequest(User user) {
Map<String, String> map = new HashMap<String, String>();
map.put("account", "132131");
map.put("password", "abc");
HttpClient client = HttpClientUtils.getConnection();
HttpUriRequest post = HttpClientUtils.getRequestMethod(map, "http://localhost:8088/return", "get");
HttpResponse response = null;
try {
response = client.execute(post);
if (response.getStatusLine().getStatusCode() == 200) {
//成功
HttpEntity entity = response.getEntity();
String message = EntityUtils.toString(entity, "utf-8");
//返回success请求成功
System.out.println(message);
if ("success".equals(message) || "SUCCESS".equals(message)){
//TODO redis清除key
stringRedisTemplate.delete(user.getId());
}else {
//失败 fail
//TODO redis key value+1 并且重新发送请求
this.handleError(user);
System.out.println("请求失败");
}
} else {
//失败
//TODO redis key value+1 并且重新发送请求
System.out.println("请求失败");
this.handleError(user);
}
} catch (IOException e) {
//TODO redis key value+1 并且重新发送请求
this.handleError(user);
e.printStackTrace();
}
}
private void handleError(User user) {
Integer value = Integer.parseInt(stringRedisTemplate.opsForValue().get(user.getId()));
if(null != value){
if (value <= 3){
if (value == 3){
//已经到了第三次 删除key??还是保存
stringRedisTemplate.delete(user.getId());
}else {
Integer val = value+1;
stringRedisTemplate.opsForValue().set(user.getId(),val.toString());
}
mySender.sendDeadMessage(user);
}
}
}
}
实体类
public class User implements Serializable {
private String id;
private String name;
private String password;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
}
HttpClientUtils
public class HttpClientUtils {
private static PoolingHttpClientConnectionManager connectionManager = null;
private static HttpClientBuilder httpBuilder = null;
private static RequestConfig requestConfig = null;
private static int MAXCONNECTION = 500;
private static int DEFAULTMAXCONNECTION = 10;
static {
//设置http的状态参数
requestConfig = RequestConfig.custom()
.setSocketTimeout(5000)
.setConnectTimeout(5000)
.setConnectionRequestTimeout(5000)
.build();
//HttpHost target = new HttpHost(IP, PORT);
connectionManager = new PoolingHttpClientConnectionManager();
connectionManager.setMaxTotal(MAXCONNECTION);//客户端总并行链接最大数
connectionManager.setDefaultMaxPerRoute(DEFAULTMAXCONNECTION);//每个主机的最大并行链接数
//connectionManager.setMaxPerRoute(new HttpRoute(target), 20);
httpBuilder = HttpClients.custom();
httpBuilder.setConnectionManager(connectionManager);
}
public static CloseableHttpClient getConnection() {
CloseableHttpClient httpClient = httpBuilder.build();
return httpClient;
}
public static HttpUriRequest getRequestMethod(Map<String, String> map, String url, String method) {
List<NameValuePair> params = new ArrayList<NameValuePair>();
Set<Map.Entry<String, String>> entrySet = map.entrySet();
for (Map.Entry<String, String> e : entrySet) {
String name = e.getKey();
String value = e.getValue();
NameValuePair pair = new BasicNameValuePair(name, value);
params.add(pair);
}
HttpUriRequest reqMethod = null;
if ("post".equals(method)) {
reqMethod = RequestBuilder.post().setUri(url)
.addParameters(params.toArray(new BasicNameValuePair[params.size()]))
.setConfig(requestConfig).build();
} else if ("get".equals(method)) {
reqMethod = RequestBuilder.get().setUri(url)
.addParameters(params.toArray(new BasicNameValuePair[params.size()]))
.setConfig(requestConfig).build();
}
return reqMethod;
}
public static void main(String args[]) throws IOException {
Map<String, String> map = new HashMap<String, String>();
map.put("account", "132131");
map.put("password", "abc");
HttpClient client = getConnection();
//HttpUriRequest post = getRequestMethod(map, "http://cnivi.com.cn/login", "post");
//HttpUriRequest post = getRequestMethod(map, "http://cnivi.com.cn/login", "post");
HttpUriRequest post = getRequestMethod(map, "http://localhost:8088/return", "get");
HttpResponse response = client.execute(post);
if (response.getStatusLine().getStatusCode() == 200) {
//成功
HttpEntity entity = response.getEntity();
String message = EntityUtils.toString(entity, "utf-8");
//success
System.out.println(message);
} else {
System.out.println("请求失败");
}
}
}
测试一下
@RestController
public class TestController {
@Autowired
private MySender mySender;
/**
* 发送普通请求
* @return
*/
@GetMapping("/test1")
private String sendMessage1(){
User user = new User();
user.setId("111111");
user.setName("测试普通队列");
user.setPassword("123456");
mySender.sendMessage(user);
//0次
return "success";
}
/**
* 测试回调
* @param request
* @param out
*/
@RequestMapping(value = "/return")
private void testPost(HttpServletRequest request, PrintWriter out){
Map<String, String> params = new HashMap<String, String>();
Map requestParams = request.getParameterMap();
for (Iterator iter = requestParams.keySet().iterator(); iter.hasNext(); ) {
String name = (String) iter.next();
String[] values = (String[]) requestParams.get(name);
String valueStr = "";
for (int i = 0; i < values.length; i++) {
valueStr = (i == values.length - 1) ? valueStr + values[i] : valueStr + values[i] + ",";
}
params.put(name, valueStr);
}
System.out.println("======================================================");
for(String key:params.keySet()){
String value = params.get(key).toString();
System.out.println("key:"+key+" vlaue:"+value);
}
//返给平台 sucesss 成功 fail模拟失败 测试是否会进入延迟队列死信队列
//out.print("success");
out.print("fail");
}
}
测试模拟请求第三方回调失败 利用死信队列延迟队列进行重试 结果:
配置文件:
server:
port: 8088
spring:
application:
name: rabbitmq-dead-jianshu
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
publisher-confirms: true
publisher-returns: true
listener:
direct:
acknowledge-mode: manual
simple:
acknowledge-mode: manual
redis:
host: 127.0.0.1
port: 6379
timeout: 10s
password: 123456
lettuce:
pool:
min-idle: 5
max-idle: 10
max-active: 1000
max-wait: 1ms
shutdown-timeout: 100ms
database: 2
pom.xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.3.5</version>
</dependency>
<!--redis-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--spring2.0集成redis所需common-pool2-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.4.2</version>
</dependency>
</dependencies>
如有错误或者用的不妥的地方欢迎大家帮我指出