介绍
Notification是亚马逊卖家服务提供的API,Notification为事件驱动,依托于AWS SQS服务(Simple Queue Service) 基于MQ实现通知,可减少接口轮询次数,并且拥有比轮询更高的实时性
本文主要介绍在SP-API 中,如何使用SQS 接入Notification,正确消费消息参考文档
STEP 1 创建SQS
1.1 创建队列
1.2 设置基本属性
根据需求配置队列属性
1.3 设置默认访问策略
访问策略选择默认,创建结束后再修改
STEP 2 配置SQS (允许amazon 发送消息)
2.1 编辑队列
在队列创建完成后, 记录arn,并点击编辑
2.2 修改策略
记录当前队列的root Principal,点击策略生成器(此时,我们已经拿到了自身的ARN, 和自身的 Principal)
2.3 自身拥有队列的所有权限
2.4 Amazon拥有 GetQueueAttributes 和 SendMessage 权限
Amazon的为Principal (固定的): arn:aws:iam::437568002678:root
2.5 生成配置
2.6 复制配置JSON
2.7 替换配置JSON
2.8 点击保存
至此,我们拥有了一个队列,并且,该队列允许亚马逊发送消息,开发者账号拥有该队列所有权限
STEP 3 监听队列
推荐依赖Spring boot,使用JMS 监听队列
3.1 pom依赖
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-sqs-java-messaging-lib</artifactId>
<version>1.0.0</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sqs</artifactId>
<version>1.11.943</version>
</dependency>
3.2 监听代码
@Component
@Slf4j
public class SqsConsumer {
@JmsListener(destination = "SQS名称"
, containerFactory = "listenerContainerFactory")
public void consumer(SQSTextMessage sqsTestMessage) throws Exception {
String text = sqsTestMessage.getText();
log.info("接收到SQS通知:{}", text);
}
}
3.3 listenerContainerFactory
@Bean
public SQSConnectionFactory connectionFactory() {
SQSConnectionFactory sqsConnectionFactory = SQSConnectionFactory.builder()
//amazonPropertiesConfig 为 AWSCredentialsProvider的实现类实例, 提供 AWSCredentials 即可
.withAWSCredentialsProvider(amazonPropertiesConfig)
// sqs所在地区 在endpoint中可以找到,如: us-west-1
.withRegionName(amazonPropertiesConfig.getSqsRegion())
//SQS地址 配置页中的地址 https://sqs.us-west-1.amazonaws.com/手动马赛克/TEST
.withEndpoint(amazonPropertiesConfig.getSqsEndpoint())
.build();
return sqsConnectionFactory;
}
@Bean
public DefaultJmsListenerContainerFactory listenerContainerFactory(SQSConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setDestinationResolver(new DynamicDestinationResolver());
factory.setConcurrency("10-20"); //开启多少线程
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE); //开启自动确认
return factory;
}
STEP 4 订阅消息
4.1 创建 createDestination
注意:
- 在请求参数中: resourceSpecification下的 sqs 和 eventBridge 只能二选一,并且必须选一个,在SQS中,我们填写SQS就行
- 访问createDestination时,由于Destination是开发者的内容,与卖家没有关系,因此,在ApiClient对象的创建上,需要使用scope模式,scope = "sellingpartnerapi::notifications"
LWAClientScopes lwaClientScopes = new LWAClientScopes(new HashSet<>());
lwaClientScopes.getScopes().add("sellingpartnerapi::notifications");
LWAAuthorizationCredentials lwaAuthorizationCredentials = LWAAuthorizationCredentials.builder()
.clientId(clientId())
.clientSecret(clientSecret())
.scopes(lwaClientScopes)
.endpoint(authUrl())
.build();
createDestination 示例代码:
public CreateDestinationResponse createDestination(String marketplaceId, CreateDestinationDTO createDestinationDTO) throws ApiException {
NotificationsApi notificationsApi = getNotificationsApi(marketplaceId);
DestinationResourceSpecification destinationResourceSpecification = new DestinationResourceSpecification();
SqsResource sqsResource = new SqsResource();
sqsResource.setArn(createDestinationDTO.getArn());
destinationResourceSpecification.setSqs(sqsResource);
CreateDestinationRequest body = new CreateDestinationRequest();
body.setResourceSpecification(destinationResourceSpecification);
body.setName(createDestinationDTO.getDestinationName());
return notificationsApi.createDestination(body);
}
@Data
public class CreateDestinationDTO {
private String arn; //sqs详情页中可以找到改值
private String destinationName; //
private String sqsName; //sqs详情页中可以找到改值
}
请求参数示例:
body
{
"resourceSpecification": {
"sqs": {
"arn": "arn:aws:sqs:us-west-1:手动马赛克:TEST"
}
},
"name": "testDestination"
}
返回值示例:
{
"payload": {
"name": "testDestination",
"destinationId": "2d6eee67-23e9-40fe-destinationId",
"resource": {
"sqs": {
"arn": "arn:aws:sqs:us-west-1:手动马赛克:TEST"
}
}
}
}
需要记录下返回结果中的: destinationId
4.2 根据Destination创建订阅
注意
- 一个卖家的一种消息类型,只能订阅一个 Destination
- 此时,操作与卖家有关,需要使用卖家的授权进行访问
LWAAuthorizationCredentials lwaAuthorizationCredentials = LWAAuthorizationCredentials.builder()
.clientId(clientId)
.clientSecret(clientSecret)
.refreshToken(spAuthToken)
.endpoint(authUrl)
.build();
createSubscription 示例代码:
public CreateSubscriptionResponse createSubscription(String destinationId, NotificationTypeEnum notificationType,String spAuthToken) throws ApiException {
NotificationsApi notificationsApi = getNotificationsApi(spAuthToken);
CreateSubscriptionRequest request = new CreateSubscriptionRequest();
request.setDestinationId(destinationId);
request.setPayloadVersion("1.0");
return notificationsApi.createSubscription(request, notificationType.name());
}
请求参数示例:
url:
/notifications/v1/subscriptions/REPORT_PROCESSING_FINISHED
body:
{
"payloadVersion": "1.0",
"destinationId": "2d6eee67-23e9-40fe-destinationId"
}
返回值示例:
{
"payload": {
"subscriptionId":"7ca3572b-2130-41eb-subscriptionId",
"payloadVersion":"1.0",
"destinationId": "2d6eee67-23e9-40fe-destinationId",
}
}
至此, 已经完成了所有准备工作,耐心等待Amazon 触发相关类型的消息就可以啦!!!
5 备注
5.1 NotificationType
5.2 通知数据接口
REPORT_PROCESSING_FINISHED notification 示例:
{
"notificationVersion": "2020-09-04",
"notificationType": "REPORT_PROCESSING_FINISHED",
"payloadVersion": "1.0",
"eventTime": "2022-02-18T03:05:00.197Z",
"payload": {
"reportProcessingFinishedNotification": {
"sellerId": "A2BAZU3******",
"reportId": "34997*********",
"reportType": "GET_FLAT_FILE_VAT_INVOICE_DATA_REPORT",
"processingStatus": "DONE",
"reportDocumentId": "amzn1.tortuga.********************************************"
}
},
"notificationMetadata": {
"applicationId": "amzn1.sellerapps.app.********************",
"subscriptionId": "36df6577-33c7-********************",
"publishTime": "2022-02-18T03:05:00.230Z",
"notificationId": "0cbae32d-222f-********************"
}
}
FEED_PROCESSING_FINISHED notification 示例:
{
"notificationVersion": "2020-09-04",
"notificationType": "FEED_PROCESSING_FINISHED",
"payloadVersion": "1.0",
"eventTime": "2022-02-18T08:46:38.498Z",
"payload": {
"feedProcessingFinishedNotification": {
"sellerId": "A2EW6******",
"feedId": "625004******",
"feedType": "POST_ORDER_FULFILLMENT_DATA",
"processingStatus": "DONE",
"resultFeedDocumentId": "amzn1.tortuga.********************************************"
}
},
"notificationMetadata": {
"applicationId": "amzn1.sellerapps.app.**************************",
"subscriptionId": "4d5b53b4-66ad-4ee0-***************",
"publishTime": "2022-02-18T08:46:39.950Z",
"notificationId": "188fea65-74da-4fce-***************"
}
}