Apache Geode with Spark

在一些特定场景,例如streamingRDD需要和历史数据进行join从而获得一些profile信息,此时形成较小的新数据RDD和很大的历史RDD的join。
Spark中直接join实际上效率不高:

  • RDD没有索引,join操作实际上是相互join的RDD进行hash然后shuffle到一起;

实际上,如果历史数据的RDD有索引,我们可以循环遍历streaming中的每一条数据,并向历史数据发送point query,即loop + indexed get。Streaming的数据是小数据,这样坐的性能会高很多。(这种小数据和大量历史数据的join模式在物联网/互联网场景下很常见)

另外,

  • spark中的RDD是只读的,增量信息无法直接更新到历史RDD中

虽然我们可以使用streaming的窗口操作来缓存一定量的历史数据,但这会增加业务逻辑的复杂度。

IndexedRDD能够解决上述的两个问题,即对RDD内存数据建立索引,并且可以更新RDD。但是IndexRDD不支持事务,如果需要对同一个key做更新就存在数据更新冲突,导致数据不一致。另外,IndexRDD单纯是RDD的数据结构和接口的增强,不支持Spark之外的组件对其的访问。

本文将介绍基于Apache Geode和Spark相结合:

  • 基于Geode的RDD借助Geode的内存数据存储和数据索引,其join操作是loop + indexed get方式,可以提高流数据和历史数据相join的效率;
  • Geode 是目前性能和生产可用性最高的IMDG之一,基本满足ACID;
  • Spark 中通过GeodeRDD的写操作实际上是将数据写入Geode,我们还可以通过JDBC等方式访问数据,甚至进行OLAP操作。

Geode和spark版本选择

Geode-Spark connector编译

需要手动编译spark-connector,参照GitHub上的流程操作即可。
https://github.com/apache/geode/blob/rel/v1.1.1/geode-spark-connector/doc/1_building.md
最终会编译三个文件:
The following jar files will be created:

  • geode-spark-connector/target/scala-2.10/geode-spark-connector_2.10-0.5.0.jar
  • geode-functions/target/scala-2.10/geode-functions_2.10-0.5.0.jar
  • geode-spark-demos/target/scala-2.10/geode-spark-demos_2.10-0.5.0.jar

启动geode并创建region

Start Geode cluster with 1 locator and 2 servers:

gfsh
gfsh>start locator --name=locator1 --port=55221
gfsh>start server --name=server1 --locators=localhost[55221] --server-port=0
gfsh>start server --name=server2 --locators=localhost[55221] --server-port=0

Then create two demo regions:

gfsh>create region --name=str_str_region --type=PARTITION --key-constraint=java.lang.String --value-constraint=java.lang.String
gfsh>create region --name=int_str_region --type=PARTITION --key-constraint=java.lang.Integer --value-constraint=java.lang.String

Deploy Spark Geode Connector's geode-function jar (geode-functions_2.10-0.5.0.jar):

gfsh>deploy --jar=<path to connector project>/geode-functions/target/scala-2.10/geode-functions_2.10-0.5.0.jar

Spark 启动

官网下载spark1.6.0-bin-hadoop2.6。解压后运行./sbin/start-all。

进入spark-shell并引入Geode包

export GEDE=<path to geode>/apache-geode-1.1.1/

spark-shell --master spark://Dings-MacBook-Pro.local:7077 --jars /Users/dingbingbing/hon/geode/geode/geode-spark-connector/geode-spark-connector/target/scala-2.10/geode-spark-connector_2.10-0.5.0.jar,/Users/dingbingbing/hon/geode/geode/geode-spark-connector/geode-functions/target/scala-2.10/geode-functions_2.10-0.5.0.jar,$GEDE/lib/activation-1.1.jar,$GEDE/lib/antlr-2.7.7.jar,$GEDE/lib/commons-beanutils-1.8.3.jar,$GEDE/lib/commons-io-2.4.jar,$GEDE/lib/commons-lang-2.5.jar,$GEDE/lib/commons-logging-1.2.jar,$GEDE/lib/commons-modeler-2.0.jar,$GEDE/lib/fastutil-7.0.2.jar,$GEDE/lib/findbugs-annotations-1.3.9-1.jar,$GEDE/lib/geode-common-1.1.1.jar,$GEDE/lib/geode-core-1.1.1.jar,$GEDE/lib/geode-cq-1.1.1.jar,$GEDE/lib/geode-dependencies.jar,$GEDE/lib/geode-json-1.1.1.jar,$GEDE/lib/geode-lucene-1.1.1.jar,$GEDE/lib/geode-old-client-support-1.1.1.jar,$GEDE/lib/geode-rebalancer-1.1.1.jar,$GEDE/lib/geode-wan-1.1.1.jar,$GEDE/lib/geode-web-1.1.1.jar,$GEDE/lib/gfsh-dependencies.jar,$GEDE/lib/jackson-annotations-2.8.0.jar,$GEDE/lib/jackson-core-2.8.2.jar,$GEDE/lib/jackson-databind-2.8.2.jar,$GEDE/lib/jansi-1.8.jar,$GEDE/lib/javax.mail-api-1.4.5.jar,$GEDE/lib/javax.resource-api-1.7.jar,$GEDE/lib/javax.servlet-api-3.1.0.jar,$GEDE/lib/javax.transaction-api-1.2.jar,$GEDE/lib/jetty-http-9.3.6.v20151106.jar,$GEDE/lib/jetty-io-9.3.6.v20151106.jar,$GEDE/lib/jetty-security-9.3.6.v20151106.jar,$GEDE/lib/jetty-server-9.3.6.v20151106.jar,$GEDE/lib/jetty-servlet-9.3.6.v20151106.jar,$GEDE/lib/jetty-util-9.3.6.v20151106.jar,$GEDE/lib/jetty-webapp-9.3.6.v20151106.jar,$GEDE/lib/jetty-xml-9.3.6.v20151106.jar,$GEDE/lib/jgroups-3.6.10.Final.jar,$GEDE/lib/jline-2.12.jar,$GEDE/lib/jna-4.0.0.jar,$GEDE/lib/jopt-simple-5.0.1.jar,$GEDE/lib/log4j-api-2.6.1.jar,$GEDE/lib/log4j-core-2.6.1.jar,$GEDE/lib/log4j-jcl-2.6.1.jar,$GEDE/lib/log4j-jul-2.6.1.jar,$GEDE/lib/log4j-slf4j-impl-2.6.1.jar,$GEDE/lib/lucene-analyzers-common-6.0.0.jar,$GEDE/lib/lucene-core-6.0.0.jar,$GEDE/lib/lucene-queries-6.0.0.jar,$GEDE/lib/lucene-queryparser-6.0.0.jar,$GEDE/lib/mx4j-3.0.1.jar,$GEDE/lib/mx4j-remote-3.0.1.jar,$GEDE/lib/mx4j-tools-3.0.1.jar,$GEDE/lib/netty-all-4.0.4.Final.jar,$GEDE/lib/ra.jar,$GEDE/lib/shiro-core-1.3.1.jar,$GEDE/lib/slf4j-api-1.7.21.jar,$GEDE/lib/snappy-0.4.jar,$GEDE/lib/spring-aop-4.3.2.RELEASE.jar,$GEDE/lib/spring-beans-4.3.2.RELEASE.jar,$GEDE/lib/spring-context-4.3.2.RELEASE.jar,$GEDE/lib/spring-core-4.3.2.RELEASE.jar,$GEDE/lib/spring-expression-4.3.2.RELEASE.jar,$GEDE/lib/spring-shell-1.2.0.RELEASE.jar,$GEDE/lib/spring-web-4.3.2.RELEASE.jar --conf spark.geode.locators=localhost[55221]

Check Geode locator property in the Spark shell:

scala> sc.getConf.get("spark.geode.locators")
res0: String = localhost[55221]

测试代码及原理简介

Geode可以认为是类似hdfs/hbase的数据集,不同的是:

  • 基于Geode数据形成的RDD可以被修改;
  • 普通的RDD可以和Geode Region数据快速join;

使用Geode Spark Connector的代码中首先import一下org.apache.geode.spark.connector._。引入所有的implicit函数。

scala> import org.apache.geode.spark.connector._
import org.apache.geode.spark.connector._

Save Pair RDD to Geode

In the Spark shell, create a simple pair RDD and save it to Geode:

scala> val data = Array(("1", "one"), ("2", "two"), ("3", "three"))
data: Array[(String, String)] = Array((1,one), (2,two), (3,three))

scala> val distData = sc.parallelize(data)
distData: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:14

scala> distData.saveToGemfire("str_str_region")
15/02/17 07:11:54 INFO DAGScheduler: Job 0 finished: runJob at GemFireRDDFunctions.scala:29, took 0.341288 s

此时Geode中相应region就有了刚才save的数据了gfsh:

gfsh>query --query="select key,value from /str_str_region.entries"

Result     : true
startCount : 0
endCount   : 20
Rows       : 3

key | value
--- | -----
1   | one
3   | three
2   | two

NEXT_STEP_NAME : END

Save Non-Pair RDD to Geode

Saving non-pair RDD to Geode requires an extra function that converts each
element of RDD to a key-value pair. Here's sample session in Spark shell:

scala> val data2 = Array("a","ab","abc")
data2: Array[String] = Array(a, ab, abc)

scala> val distData2 = sc.parallelize(data2)
distData2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:17

scala> distData2.saveToGemfire("int_str_region", e => (e.length, e))
[info 2015/02/17 12:43:21.174 PST <main> tid=0x1]
...
15/02/17 12:43:21 INFO DAGScheduler: Job 0 finished: runJob at GemFireRDDFunctions.scala:52, took 0.251194 s

Verify the result with gfsh:

gfsh>query --query="select key,value from /int_str_region.entrySet"

Result     : true
startCount : 0
endCount   : 20
Rows       : 3

key | value
--- | -----
2   | ab
3   | abc
1   | a

NEXT_STEP_NAME : END

Expose Geode Region As RDD

The same API is used to expose both replicated and partitioned region as RDDs.

scala> val rdd = sc.geodeRegion[String, String]("str_str_region")
rdd: org.apache.geode.spark.connector.rdd.GemFireRDD[String,String] = GemFireRDD[2] at RDD at GemFireRDD.scala:19

scala> rdd.foreach(println)
(1,one)
(3,three)
(2,two)

scala> val rdd2 = sc.geodeRegion[Int, String]("int_str_region")
rdd2: org.apache.geode.spark.connector.rdd.GemFireRDD[Int,String] = GemFireRDD[3] at RDD at GemFireRDD.scala:19

scala> rdd2.foreach(println)
(2,ab)
(1,a)
(3,abc)

Join性能测试(极简单案例)

// 10万条数据
val device_id = sc.parallelize((1 to 100000).map(i => ("device_"+i, "device_id = "+ i + ", value="+(new scala.util.Random().nextInt()))))
// save to Geode
device_id.saveToGeode("str_str_region")
// 1000条数据作为新增数据
val new_rdd = sc.parallelize((4000 to 5000).map(i => ("device_"+i, "device_id = "+ i + ", value="+(new scala.util.Random().nextInt()))))
// 新数据和Geode中十万条join
new_rdd.joinGeodeRegion("str_str_region", p => p._1).count()
// 新增数据和十万条数据的RDD join
new_rdd.join(device_id).count()
两种join操作的时间对比

10万条数据的性能差别有将近10倍。

具体来说,RDD跟Geode Regioin的join是循环+get操作,类似于map-only 的join。具体代码参见GeodeJoinRdd.scala

private def computeWithoutFunc(split: Partition, context: TaskContext, region: Region[K, V]): Iterator[(T, V)] = {
    val leftPairs = left.iterator(split, context).toList.asInstanceOf[List[(K, _)]]
    val leftKeys = leftPairs.map { case (k, v) => k}.toSet
    // Note: get all will return (key, null) for non-exist entry, so remove those entries
    val rightPairs = region.getAll(leftKeys).filter { case (k, v) => v != null}
    leftPairs.filter{case (k, v) => rightPairs.contains(k)}
             .map {case (k, v) => ((k, v).asInstanceOf[T], rightPairs.get(k).get)}.toIterator
  }

而RDD跟RDD的普通join操作需要数据的shuffle,会带来很多额外的开销。如下图所示。


普通RDD的join形成shuffle

可以推断一下,在一些特定场景,例如streamingRDD需要和历史数据进行join从而获得一些profile信息,此时形成较小的新数据RDD和很大的历史RDD的join。此时loop + index get的性能会高很多。这种小数据和大量历史数据的join模式在物联网/互联网场景下很常见。

此外IndexedRdd也可以作为一个备选方案。但是IndexedRdd无法向Geode这样能够被Spark世界之外访问,只能作为提高spark计算的一种方案.

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,384评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,845评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,148评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,640评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,731评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,712评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,703评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,473评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,915评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,227评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,384评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,063评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,706评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,302评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,531评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,321评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,248评论 2 352

推荐阅读更多精彩内容