Spark 2.3.0 如何处理图片以及存在的一些问题

前言

因为需要在MLSQL里开发一个图片处理模块(以及配套数据源),使用上大概是这样子的:

-- 通过SQL抓取一张图片,imageBytes字段是一个二进制数组
select crawler_request_image("https://tpc.googlesyndication.com/simgad/10310202961328364833") as imageBytes
as  images;

-- 也可以加载一个图片数据集 该表只有一个字段image,但是image是一个复杂字段:

load image.`/training_set`
options 
-- 递归目录查找图片
recursive="true"
-- 丢弃解析失败的图片
and dropImageFailures="true"
-- 采样率
and sampleRatio="1.0"
-- 读取图片线程数
and numPartitions="8"
-- 处理图片线程数
and repartitionNum="4"
-- 单张图片最大限制
and filterByteSize="2048576"
-- 可以禁止对图片进行解析,避免占用过大资源,让后续的OpenCVImage来解析
and enableDecode = "true"
as images;  


-- 比如 选择origin,width字段
-- select image.origin,image.width from images 
-- as newimages;
 
-- 通过OpenCVImage对图片进行处理,设置为100*100大小的图片。
train images as OpenCVImage.`/tmp/word2vecinplace`
where inputCol="imagePath"
and filterByteSize="2048576"
-- 宽度和高度重新设置为100
and shape="100,100,4"
;

从上面的脚本来看,大致需要能加载一个图片目录,或者通过url地址抓取到图片,然后传递给后面的OpenCVImage模块进行处理。那么如何将图片进行传递和存储呢?

Spark 2.3 解决方案

Spark 在2.3.0 开始支持图片格式字段,使用下面的格式描述一张图片:

StructType(
    StructField("origin", StringType, true) ::
      StructField("height", IntegerType, false) ::
      StructField("width", IntegerType, false) ::
      StructField("nChannels", IntegerType, false) ::
      StructField("mode", StringType, false) :: //OpenCV-compatible type: CV_8UC3 in most cases
      StructField("data", BinaryType, false) :: Nil) //bytes in OpenCV-compatible order: row-wise BGR in most cases

实际的解析代码大概是这样的

 private[spark] def decode(origin: String, bytes: Array[Byte]): Option[Row] = {

    val img = ImageIO.read(new ByteArrayInputStream(bytes))

    if (img == null) {
      None
    } else {
      val isGray = img.getColorModel.getColorSpace.getType == ColorSpace.TYPE_GRAY
      val hasAlpha = img.getColorModel.hasAlpha

      val height = img.getHeight
      val width = img.getWidth
      val (nChannels, mode) = if (isGray) {
      ......
      val imageSize = height * width * nChannels
      assert(imageSize < 1e9, "image is too large")
      val decoded = Array.ofDim[Byte](imageSize)
    .....
       var offset = 0
        for (h <- 0 until height) {
          for (w <- 0 until width) {
            val color = new Color(img.getRGB(w, h), hasAlpha)
            decoded(offset) = color.getBlue.toByte
            decoded(offset + 1) = color.getGreen.toByte
            decoded(offset + 2) = color.getRed.toByte
            if (hasAlpha) {
              decoded(offset + 3) = color.getAlpha.toByte
            }
            offset += nChannels
       .......

      // the internal "Row" is needed, because the image is a single DataFrame column
      Some(Row(Row(origin, height, width, nChannels, mode, decoded)))
    }
  }

这样读取每一张图片之后通过这个decode方法解析成一个Row字段。

实际的问题

图片解析完成后是十分庞大的,对于一张分辨率在10241024的图片,其存储成image字段byte字节就至少为 10241024*4 = 4m 左右。如果你要存储成parquet格式,因为parquet默认使用snappy 压缩,而snappy使用了direct buffer 做buffer,这会导致非堆内存不足而异常。解决办法是把XX:MaxDirectMemorySize 设置大些,比如我只是处理不到一万张图片w我把MaxDirectMemorySize 设置为10G了。读取的时候也是类似的问题。这是第一个问题。

第二个问题就是,因为Spark是并行度概念,如果一个JVM里同时解析图片的线程比较多,很可能就JVM GC 挂了。 这是第二个问题。

解决这个问题除了增加资源(堆外堆内内存以外),更有效的方式是控制图片的大小。在load image的时候,只是拿到PortableStream(其实就是路路径),之后到OpenCVImage处理的时候才真正的解析图片,解析前根据图片的原信息(width,height,channel)计算需要占用的byte数,如果超过了,直接丢弃,如果没有则进一步解析成byte数组,然后进行resize,最后完成存储(比如resize 成100*100之后图片会小非常多)。

所以至少我们要做如下几点改造:

  1. 在spark 中,要么解析成功,要么解析失败(undefinedImageType),其实需要添加一个第三种状态(undecodedImageType),未解析状态,可以将解析延后到后面去做。
  2. 解析的时候,现有的实现里无法做大小过滤,必须解析完成后,对data字段进行大小判定(但这个时候data字段可能已经让内存挂掉了),所以需要能够允许用户做大小过滤判断,否则用户用起来肯定过去不的场景非常多,类似这样:
if (filterSize > 0 && height * width * nChannels > filterSize) {
        None
      } else {
        val decoded = Array.ofDim[Byte](height * width * nChannels)
  1. parquet等格式似乎还没有做好对大图片的的支持,那么是不是需要一种新的格式解决这个问题呢?
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,530评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,403评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,120评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,770评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,758评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,649评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,021评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,675评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,931评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,751评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,410评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,004评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,969评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,042评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,493评论 2 343