背景
最近接手了个项目,项目代码不多,但是问题不少,尤其是项目中涉及了服务之间的数据同步。数据不是丢,就是乱 。每天提心吊胆 ,生怕又有数据不一致了,需要手动介入处理 ,偶尔周末还要个”意外惊喜”。
数据同步不一致 ,真是让人头秃的病。下面一起来分析下 ,对症下药,
数据同步
场景重现
项目内有2个服务需要进行数据同步。
服务A是个订单系统, 不断产生订单数据,
服务B是个计费系统,需要拉取到服务A里的订单信息,留存后 ,做些计费等后置处理。
这里涉及到 服务间的数据同步。服务间传递的数据格式如下:
{
id: "ID000001", // message id
timestamp: 1733281956, // message timestamp
data: {
orderId: "ORD000001", // id
orderStatus: "PROCESSING", // status
...
updatetime: 1733281950, // order update time
createtime: 1733281000 // order create time
}
}
服务间数据同步
服务之间的数据同步,服务A 向 服务B同步数据。服务A中包含两类数据 ,
- 一些正在进行的数据(绿色部分),这些数据后续可能还会进行更新。比如正在进行的订单,用户后续可能还会修改。
- 一些已经完结的数据(灰色部分),这些数据已经完结,不会在更新。
以下是个图示,
问题汇总
几类典型问题,
- 数据重复插入:相同数据因为接收到了多次,被重复插入。
- 老数据覆盖新数据:老数据由于网络延迟或者超时重传,接收时是乱序的,后到达接收端,覆盖了先到达的新数据。
- 数据丢失:有些数据可能只发了一次,中途由于网络等原因丢失了。
- 数据更新太慢:有些数据的对时效性比较高。
- 数据不完整:接收到了数据,但是数据本身不完整。
考虑因素
针对于上述问题 ,则抽取出以下要考虑的因素,(这里被同步方的存储方式,使用的是数据库存储。当然如果是其它存储方式,也可按此因素考虑设计)
1)数据防重
唯一id,更新时应该保障id在数据库中的唯一性。
2)数据防乱序
在数据更新的时候,必须要求拿到的数据比数据库里的数据新,才会去数据库里更新。
3)数据防丢
同步的数据不应该丢失,或者 丢失后,也有补偿机制。
4)数据同步时效性
要考虑业务范围内能承受的延时 。
5)数据完整性
被同步方的数据个数,和数据内容应该是一致的。
实践参考
由上述可知,做个数据同步,要考虑如下因素,
1)数据防重
2)数据防乱序
3)数据防丢
4)数据同步时效性
5)数据完整性
废话不多说,上方案,逐点击破。
解决方案
1)数据防重 2)数据防乱序
基于一些常用数据库 mongodb / mysql等,1个sql语句, 直接解决这2个问题
# 示例数据
id = "ID00001"
data = {
"timestamp": 1727891234,
"some_field": "some_value"
}
# 定义过滤器和更新文档
# data timestamp 代表数据的原始时间
filter = {
"id": id,
"updatetime": {"$lte": data["timestamp"]}
}
update = {"$set": data}
# 执行 FindOneAndUpdate 操作
doc = collection.find_one_and_update(
filter,
update,
upsert=True,
return_document=ReturnDocument.AFTER
)
find_one_and_update ,这里重点看下参数 upsert=True
以及 "$lte": data["timestamp"]
。
- upsert :配合主键id,没有则插入,有则更新。 可以保障数据的在库中的唯一性。
- $lte :进行数据筛选,更新的数据比数据库中已有数据时间大,才能更新。 新数据才能存储进来,老数据不可以存进来。
如果id存在但是时间靠后,请求进来,也就是老数据进到这里,是不应该覆盖数据库中已有的数据的。同时这里明确下,补充下所有可能出现的情况,以及对应现象。
- 请求的id 在数据库中不存在
- 同步的数据 第一次进来,这种数据库中之前也没有存储,找不到,直接插入。
- 请求的id 在数据库中已存在
如果请求插入的数据时间是老的,也就是老数据来了。$lte去找数据库中小于老数据的时间,而数据库中已存的时间比老数据的时间一定大, 所以filter中的语句查不出,此时同时因为有upsert=True,则会尝试插入一条数据,又由于id在数据库中唯一,则会插入失败,报错 duplicated key insert error。
如果请求插入的数据时间是新的,也就是新数据来了,根据filter语句 ,可以找到,找到并更新。
聪明的小伙伴看到这里,其实可以发现上述整体的处理思路,其实就是参考了 乐观锁
。
3)数据防丢
数据防丢,最终 同步方 和 被同步方 的数据要一致。
这里需要考虑,数据是否被同步到,如果没有同步到,还有哪些没同步过来。
这里可以按如下进行考虑,
- 如果 能确保数据一定被同步到。
一般是使用包含有ack机制的消息队列服务,比如 Apache Kafka,RabbitMQ。这些消息队列服务可以确保,发送端(同步方)发送数据后, 消费端(被同步方)一定能接收到数据,从而保障数据一定可以被同步到。
- 如果 不能确保数据一定被同步到,则需要使用 补偿机制 。
有时在某些场景下 ,同步的内容不多,引入消息队列服务,会比较重。所以在不引入这种服务情况下,无法确保数据一定会被同步到,这时可以引入 补偿机制。
补偿机制包括
全量同步:天级别把数据全量同步过去,数据量少的时候适用。
-
增量同步: 增量同步 , 举个例子,下次同步数据时,
- 以上一次数据最后更新时间做为起始时间,当前时间做为最后时间,拉取这段时间内的增量数据。
- 也有些业务数据包含类似active或者status字段,那么增量的部分则该是active为true 或者 status为in progress的数据。
这里只是举例,具体怎么做增量补偿要以业务数据本身为准,
4)数据同步时效性
最实时:时效性最高,毫秒ms级别,这种一般要求 同步方数据变化时,主动推送数据 。
次实时:时效性一般,秒s级别,这种 被同步方 可以定时轮询拉取同步方的数据。
不实时:时效性最低,要求也较宽松, 这个定期同步 ,甚至天级别的同步即可。
最实时的需要 同步方主动推送数据,可以借助消息队列 发布-订阅模型,有新的数据产生也往队列里放一份,需要实时更新的接收端订阅这个队列即可。当然也可以自己写推送和接收代码实现。
次实时和不实时的, 可以考虑常规做法,定频拉取同步。拉取频率根据业务场景的承受范围定即可。
5)数据完整性
完整性可以从数据总数和数据内容两方面考虑。
- 数据总数:可能一条带条件的count SQL语句就搞定了
- 数据内容:这个比对起来比较麻烦, 可以对数据的关键字段的key和value值提取,排序后 ,进一步进行hash,然后比对两边的hash值。
数据完整性,更像是数据同步后的验证,可以按需做即可。
结语
至此,针对数据同步的典型问题,给出对应方案,绘制到我们最开始的图上。
数据同步,稳了!