故障过程
我司使用AWS云服务,众所周知,DDB的存储费用比较贵,对于一些大表,结合更便宜的S3文件存储做了冷热分离设计。
早期的冷热分离设计,借助DDB TTL,实现过期删除,并使用Lambda对接DDB Stream,监听TTL事件,将过期数据写入S3,写入时还做了gzip压缩,进一步节约了存储费用。
22年对于很多IT公司都不容易,公司进行了降本增效,S3的特点是存储便宜,但是读写比DDB更贵。TTL过期的每一条数据都会读写一次S3,lambda更是需要额外付费,此处有优化空间。好在TTL过期删除是不收费的,于是,我们采用spark任务,每个一段时间,将数据批量写入S3中。
问题来了,在第一次上线处理存量数据时,spark任务启动后4分钟内,S3写入吞吐量前2分钟快速上升,后2分钟快速下降直至停止。
由于生产机器无法直接访问,开始时,尝试过调整单机并发度至1(也就是单线程),review代码,怀疑stream没有正常回收并重写,GC虽然频繁但为发生OOM。
直到拿到了Thread Dump,才确认问题还是出在了压缩部分。
打开Oracle官网文档,该类的说明中明确提到单线程会导致死锁。问题实锤无疑
Typically, data is read from a PipedInputStream object by one thread and data is written to the corresponding PipedOutputStream by some other thread. Attempting to use both objects from a single thread is not recommended, as it may deadlock the thread
解决办法很简单,修改成ByteArrayInputStream和ByteArrayOutputStream后解决。
另外一个问题是,导致BUG的代码从何而来,排查后发现该代码是从原先的lambda程序中拷贝而来,且一直运转良好,如果有问题早就应该触发BUG才对。
分析代码后发现,lambda的逻辑是,将过期的一条数据使用PipedInputStream和PipedOutputStream进行gzip压缩,然后将压缩后的stream和S3原文件的stream使用SequenceInputStream进行合并,再重新写入S3。而新代码的逻辑是,将S3原文件数据读出,和过期的多条数据进行merge,再压缩成Stream,写入S3。
通过源码分析发现,如果压缩后数据大小不超过PipedInputStream的buffer(显示配置4kb),则不会触发死锁,而lambda中原代码的单条数据恰恰不会超过4kb。至此问题真相大白。
详细分析这篇博客写的很清楚 Java之IO(八)PipedIutputStream和PipedOutputStream
知识点
GZIP 的文件格式在设计上其实是可以允许一个文件里有多个压缩数据集(compressed data sets)—— GZIP 压缩后的片段拼接而成的。所以在代码中,我们可以使用SequenceInputStream拼接多个gzip后的stream流,并且可以一次性解压处理啊。详见 www.gzip.org
通常PipedInputStream和PipedOutputStream在不同的线程中使用,如果在同一个线程可能存在死锁的情况。详见 JDK官方文档-Class PipedInputStream