大量数据导出导致系统内存溢出的解决办法
在web开发中,我们经常可能会遇到导出报表等统计功能,常规做法就是会从数据库中拉取大量数据,甚至有可能还会统计所有数据的一个总量。当我们一次性读取所有数据到内存中时,就极可能导致系统OOM。因为我的后台系统使用的是NodeJS的Nest框架,数据库ORM使用的是Sequelize,下面就这种问题我总结一下处理的方法。
方法一、修改V8内存大小
从《深入浅出NodeJS》中我们得知64位系统内存限制约为1.4GB,所以我们一次性讲数据加载进内存中,就有可能超过这个限制,导致OOM。但是NodeJS提供了一个程序运行参数 --max-old-space-size
,可以通过该参数指定V8所占用的内存空间,这样可在一定程度上便面程序内存溢出
方法二、使用非V8内存
这也是在项目中采用的方法。Buffer是一个NodeJS的扩展对象,使用底层的系统内存,不占用V8内存空间。与之相关的文件系统fs和流Stream流操作,都不会占用V8内存。下面我就流操作的解决方法,详细梳理一遍。
MySQL数据库有流式读取方式,不是一次性读取所有数据到内存,而是根据使用者的需要,部分的读取数据。但是Sequelize这个ORM模型又不支持流式读取,所以在系统中我们额外引入了支持流式查询的Knex查询构造器和Mysql2驱动程序。
1、构建数据库服务
import { Injectable } from "@nestjs/common";
import * as config from "config";
import { getLogger } from "log4js";
import * as Knex from "knex";
const logger = getLogger("Knex");
interface MysqlConfig {
name: string;
read: [
{
host: string,
port: number,
username: string,
password: string
}
];
}
const { name: database, read: [conf] } = config.get<MysqlConfig>("mysql");
const knex = Knex({
client: "mysql2",
pool: {
min: 1,
max: 10
},
connection: {
database,
port: conf.port,
host: conf.host,
user: conf.username,
password: conf.password,
decimalNumbers: true
}
});
@Injectable()
export class ReadService {
async* exec(sql, params, limit = 5000) {
// 读取limit条数据,通过yield返回数据
// xxx...
}
}
2、创建Csv相关服务
import * as path from "path";
import * as convert from "iconv-lite";
import * as Achiver from "archiver";
import { getLogger } from "log4js";
import { createReadStream, promises as fs, writeFileSync } from "fs";
const logger = getLogger("ExportCsv");
export class Csv {
private readonly name: string;
private readonly path: string;
// 是否压缩
private readonly compress: boolean;
constructor(name: string, columns: string[], rows: any[] = [], compress: boolean = false) {
this.compress = compress;
this.name = path.basename(name, ".csv");
this.path = path.format({
ext: ".csv",
root: process.cwd(),
name: Date.now() + this.name
});
let txt = columns.join(",");
if (Array.isArray(rows)) {
txt += Csv.parse(rows);
}
// 同步创建一个只包含字段名的文件
writeFileSync(this.path, convert.encode(txt, "GBK", { addBOM: true }));
}
// 转换数据中一些特殊符号
private static parse(rows): string {
return "\r\n" + rows.map(r => {
return r
.map(x => {
if (typeof x === "string") {
x = x.replace(/,/g, ",").replace(/\n/g, "-");
}
return x;
})
.join(",");
}
).join("\r\n");
}
// 在同一个文件中,异步新增数据
async append(rows: any[]) {
return fs.appendFile(this.path, convert.encode(Csv.parse(rows), "GBK", { addBOM: true }));
}
// 根据传入的compress参数判断,是否压缩文件
// res也是一个流对象,所有可以进行流的相关操作
async send(res) {
if (this.compress) {
// 返回压缩后的文件
} else {
// 返回普通文件
}
}
// 监控当文件下载完毕后,则删除服务器生成的文件
private observe(res) {
return new Promise((resolve, reject) => {
res.on("end", () => {
resolve();
fs.unlink(this.path).catch(logger.error.bind(logger));
});
res.on("error", err => reject(err));
});
}
}
3、在控制器函数调用服务
// 用于统计总量的数据
const total = {
money: 0,
deposit: 0
};
// 查询大量数据的SQL
const sql = `xxx`;
// 实例化Csv对象
const csv = new Csv(`订单结算数据${start}至${end}.csv`, title, null, true);
// 执行SQL,一次性读取10000条数据,返回的是一个可遍历的生成器
const reader = this.reader.exec(sql, params, 10000);
// 遍历生成器,不停的向文件中写数据
// 如果我们直接导出数据,那么在csv.append方法中,直接返回[[1, 22, 33], [2, 44, 55]]
// 如果我们要根据所有数据,进行一个统计计算,那么我们可以新写一个parseRows方法来进行统计
for await (const x of reader) {
await csv.append(await this.parseRows(x, { ems, sites, total }));
}
// 总量数据
const row = [
_.round(total.money, 2),
_.round(total.deposit, 2),
];
// 写入文件
await csv.append([row]);
// 返回csv对象,在拦截器中处理
return csv;
4、拦截器中处理返回形式
import { Injectable, NestInterceptor, ExecutionContext, CallHandler } from "@nestjs/common";
import { map } from "rxjs/operators";
import { Csv } from "@shared/commons";
export interface Response<T> {
code: number;
data: T;
}
@Injectable()
export class FormatInterceptor<T> implements NestInterceptor<T, Response<T>> {
intercept(context: ExecutionContext, next: CallHandler) {
return next.handle().pipe(map(data => {
if (data instanceof Csv) {
// 处理数据导出数据格式
const res = context.switchToHttp().getResponse();
// send方法就是csv服务中的导出方法
return data.send(res);
}
// 处理常规API数据格式
const result: any = { code: 0, data: "success" };
if (data) {
if (data.rows) {
data.meta = data.rows;
data.total = data.count;
delete data.rows;
delete data.count;
}
result.data = data;
}
return result;
}));
}
}
自此整个流程:流式读取数据->异步写入文件->压缩文件->返回文件前端下载->删除生成的文件就结束了。这样我们分步骤处理数据,就不会因为数据量过大导致内存溢出了。部分方法中的代码我没有写上来,如果有需要可以互相探讨一下。