前言
因为需要在MLSQL里开发一个图片处理模块(以及配套数据源),使用上大概是这样子的:
-- 通过SQL抓取一张图片,imageBytes字段是一个二进制数组
select crawler_request_image("https://tpc.googlesyndication.com/simgad/10310202961328364833") as imageBytes
as images;
-- 也可以加载一个图片数据集 该表只有一个字段image,但是image是一个复杂字段:
load image.`/training_set`
options
-- 递归目录查找图片
recursive="true"
-- 丢弃解析失败的图片
and dropImageFailures="true"
-- 采样率
and sampleRatio="1.0"
-- 读取图片线程数
and numPartitions="8"
-- 处理图片线程数
and repartitionNum="4"
-- 单张图片最大限制
and filterByteSize="2048576"
-- 可以禁止对图片进行解析,避免占用过大资源,让后续的OpenCVImage来解析
and enableDecode = "true"
as images;
-- 比如 选择origin,width字段
-- select image.origin,image.width from images
-- as newimages;
-- 通过OpenCVImage对图片进行处理,设置为100*100大小的图片。
train images as OpenCVImage.`/tmp/word2vecinplace`
where inputCol="imagePath"
and filterByteSize="2048576"
-- 宽度和高度重新设置为100
and shape="100,100,4"
;
从上面的脚本来看,大致需要能加载一个图片目录,或者通过url地址抓取到图片,然后传递给后面的OpenCVImage模块进行处理。那么如何将图片进行传递和存储呢?
Spark 2.3 解决方案
Spark 在2.3.0 开始支持图片格式字段,使用下面的格式描述一张图片:
StructType(
StructField("origin", StringType, true) ::
StructField("height", IntegerType, false) ::
StructField("width", IntegerType, false) ::
StructField("nChannels", IntegerType, false) ::
StructField("mode", StringType, false) :: //OpenCV-compatible type: CV_8UC3 in most cases
StructField("data", BinaryType, false) :: Nil) //bytes in OpenCV-compatible order: row-wise BGR in most cases
实际的解析代码大概是这样的
private[spark] def decode(origin: String, bytes: Array[Byte]): Option[Row] = {
val img = ImageIO.read(new ByteArrayInputStream(bytes))
if (img == null) {
None
} else {
val isGray = img.getColorModel.getColorSpace.getType == ColorSpace.TYPE_GRAY
val hasAlpha = img.getColorModel.hasAlpha
val height = img.getHeight
val width = img.getWidth
val (nChannels, mode) = if (isGray) {
......
val imageSize = height * width * nChannels
assert(imageSize < 1e9, "image is too large")
val decoded = Array.ofDim[Byte](imageSize)
.....
var offset = 0
for (h <- 0 until height) {
for (w <- 0 until width) {
val color = new Color(img.getRGB(w, h), hasAlpha)
decoded(offset) = color.getBlue.toByte
decoded(offset + 1) = color.getGreen.toByte
decoded(offset + 2) = color.getRed.toByte
if (hasAlpha) {
decoded(offset + 3) = color.getAlpha.toByte
}
offset += nChannels
.......
// the internal "Row" is needed, because the image is a single DataFrame column
Some(Row(Row(origin, height, width, nChannels, mode, decoded)))
}
}
这样读取每一张图片之后通过这个decode方法解析成一个Row字段。
实际的问题
图片解析完成后是十分庞大的,对于一张分辨率在10241024的图片,其存储成image字段byte字节就至少为 10241024*4 = 4m 左右。如果你要存储成parquet格式,因为parquet默认使用snappy 压缩,而snappy使用了direct buffer 做buffer,这会导致非堆内存不足而异常。解决办法是把XX:MaxDirectMemorySize 设置大些,比如我只是处理不到一万张图片w我把MaxDirectMemorySize 设置为10G了。读取的时候也是类似的问题。这是第一个问题。
第二个问题就是,因为Spark是并行度概念,如果一个JVM里同时解析图片的线程比较多,很可能就JVM GC 挂了。 这是第二个问题。
解决这个问题除了增加资源(堆外堆内内存以外),更有效的方式是控制图片的大小。在load image的时候,只是拿到PortableStream(其实就是路路径),之后到OpenCVImage处理的时候才真正的解析图片,解析前根据图片的原信息(width,height,channel)计算需要占用的byte数,如果超过了,直接丢弃,如果没有则进一步解析成byte数组,然后进行resize,最后完成存储(比如resize 成100*100之后图片会小非常多)。
所以至少我们要做如下几点改造:
- 在spark 中,要么解析成功,要么解析失败(undefinedImageType),其实需要添加一个第三种状态(undecodedImageType),未解析状态,可以将解析延后到后面去做。
- 解析的时候,现有的实现里无法做大小过滤,必须解析完成后,对data字段进行大小判定(但这个时候data字段可能已经让内存挂掉了),所以需要能够允许用户做大小过滤判断,否则用户用起来肯定过去不的场景非常多,类似这样:
if (filterSize > 0 && height * width * nChannels > filterSize) {
None
} else {
val decoded = Array.ofDim[Byte](height * width * nChannels)
- parquet等格式似乎还没有做好对大图片的的支持,那么是不是需要一种新的格式解决这个问题呢?