Elasticsearch趟坑记——后台数据篇

image

背景:

项目隶属于用户画像服务,为了更好的支持前端查询服务,决定将原有的HBase数据源切换成Elasticsearch(以下简称ES),从k-v型数据库切到nosql。本以为是个比较简单的任务,做起来才发现有各种问题,简单记录下。

本文将从数据写入,索引设置两方面简述遇到的一些问题。
基本集群情况如下:三台ES,32G内存,其中16G内存分给ES,单台500G SSD。

数据写入

  • hive to ES
    有专门的工具包,elasticsearch-hadoop-6.3.2.jar,需要注意的是后边的版本号需要和ES的版本一致。将hive表建成外部表,向外部表中写入数据即可。基本语句:
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
'es.nodes' = '192.168.0.1',
'es.port'='9200',
'es.resource' = 'user_profile/user_profile',//表示ES的index/type
'es.mapping.id' = 'user_id',//指定_id的取值
'es.mapping.names' = 'name:NAME'//hive字段与ES字段的对应
)

有两个问题需要注意:

  1. 'es.mapping.id'手动设置,同一index/type的_id具有唯一性,可以进行数据去重,保证数据恢复时的幂等性等。但_id设计到ES的底层存储,需要足够离散,尽量不要用有规律的数字(类比HBase的Rowkey设计),以保证ES的性能,推荐用MD5之后的值作为_id,或用ES自己生成UUID。
  2. hive表名与字段名称不敏感,而ES是敏感的,如果不指定es.mapping.names,导入的ES index的字段将会全部都为小写。这里推荐字段的命名采用下划线的方式以减少可能遇到的问题。
  • spark写入ES
    简单的同步可以用外部表,但如果有一些复杂的逻辑计算,先将结果写入hive再导入ES肯定不是一个好方案,还有需求时从ES查询全量数据后将数据经过计算结果写回ES。对于上述任务,只能自己写代码了。ES提供了spark的访问的工具包。
 libraryDependencies += "org.elasticsearch" % "elasticsearch-hadoop" % "6.3.2"

ES的节点配置写入到sparkconf中:

  val conf = new SparkConf().setAppName("UserProfile")
  conf.set("es.nodes", "192.168.0.01")
  conf.set("es.port", "9200")
  conf.set("es.index.auto.create", "true")
  conf.set("es.nodes.wan.only", "true")

读取写入如下:

  val result_rdd = sc.esRDD("user_profile/user_profile","{\"query\": {\"bool\": {\"must\": [{\"match\": {\"name\": \"test\"}}]}}}")

  EsSpark.saveToEs(result_rdd, "user_profile_new/user_profile_new")

上述代码为rdd与ES的交互,dataframe类似,不在赘述。

这几行代码也带来了本项目前期遇到的一个较大问题:性能,最初观察大概每10s左右有1K数据写入ES,这个效率是完全无法满足线上服务的要求的。经过排查发现是sc.esRDD这个读取的瓶颈,如果spark的并行读设置的足够大,则很容易将ES的cpu打满,如果不大则性能无法满足要求。应该esRDD这个底层实现机制导致,目前没去了解源码,有时间可以仔细排查下具体原因。

最终本项目的解决方案为用http请求替换sc.esRDD这个读入数据的方法,并通过游标的方式,获取全量的结果数据,最终代码如下:

  val url = "http://192.168.0.1:9200/user_profile/_search?_source=user_id&&scroll=1m"
  val response: HttpResponse[String] = Http(url).method("PUT").postData(x._2).header("Content-Type", "application/json;charset=utf-8").timeout(connTimeoutMs = 1000, readTimeoutMs = 600000).asString
  val parser = new JsonParser()
  val jsonstr = parser.parse(response.body).asInstanceOf[JsonObject]
  val json_array = jsonstr.getAsJsonObject("hits").get("hits").getAsJsonArray()
  val scroll_id = jsonstr.getAsJsonPrimitive("_scroll_id")//通过_scroll_id循环获取全量数据

循环内部代码:

  val parser = new JsonParser()
  val scroll_url = "http://192.168.0.1:9200/_search/scroll"
  val scroll_response: HttpResponse[String] = Http(scroll_url).method("PUT").postData("{ \"scroll\": \"1m\",\"scroll_id\":" + scroll_id + " }").header("Content-Type", "application/json;charset=utf-8").timeout(connTimeoutMs = 1000, readTimeoutMs = 600000).asString
  val scrol_jsonstr = parser.parse(scroll_response.body).asInstanceOf[JsonObject]
  val scroll_json_array = scrol_jsonstr.getAsJsonObject("hits").get("hits").getAsJsonArray()

最终测试结果,每分钟大约10W数据写入,初步满足要求。

索引设置

  • 写入性能
    上文简单说了下spark写入ES的性能问题,其实hive外部表导入时也会遇到性能瓶颈,简单介绍下我对索引做了哪些设置:
    a、分片与副本:分片数最好为机器的整倍数。为了提高写入速度,可以先将副本数关闭,写入完成后再打开。这样写入时只会写入一个节点,可以大幅提高写入性能。打开副本数时,ES只会消耗网络与IO资源,不会占用cpu。副本数可以提高一定的搜索性能,理性设置。
curl -H "Content-Type: application/json" -XPUT ${str} -d'
{
    "settings" : {
        "index" : {
            "number_of_shards" : 3,//分片数
            "number_of_replicas" : 0.//副本数
        }
    }
}'

b、索引字段类型:hive外部表写入ES时,如果不指定索引字段的类型,hive中int会自动转成long,double会转成float,最好按需要先建索引,然后再建外部表。关于字段是否需要被索引也需要提前设置,默认是全加索引,这也非常影响写入性能与存储空间。
c、还有一些索引的参数配置,也会一定程度上影响写入速度。

"refresh_interval": "300s"//刷新间隔,默认1s,如果不要求写入数据实时被检索出来,加大之。
"translog.flush_threshold_size": "4096mb" //刷新的文件大小阈值
"index.translog.flush_threshold_ops:":1000000//日志多少条进行flush
"indices.store.throttle.max_bytes_per_sec" : "100mb"//加大merge速度
……

总的来说,分片与副本最影响性能,其次是字段是否需要索引。由于ES的研发思路就是开箱即用,默认配置以满足绝大多数场景,如果不是对ES有一定了解,尽量减少对参数的修改,有时会获得更好的性能。

  • 在线服务
    由于需要对外提供在线服务,如何保证数据更新时的服务也是一个问题。本项目采用了ES自带的别名机制,对外提供的永远是一个确定的名称。每天的数据写入为带时间戳的版本,当数据验证完成时,将新的表的别名设成提供服务的表名。
curl -H "Content-Type: application/json" -XPOST ${str} -d'
{
    "actions": [
        { "remove": { "index": "user_profile_'${1}'", "alias": "user_profile" }},
        { "add":    { "index": "user_profile_'${2}'", "alias": "user_profile" }}
    ]
}'//更改别名

ES默认提供了原子性的操作,保证了同时成功与失败。

本文简述了后台数据写入ES时遇到的一些问题与解决方案,后续为业务方提供基础的RPC服务也遇到了较多问题,有时间继续总结。

PS:不得不佩服ES的更新速度,在本项目进行的过程中ES就升级到了6.4.1,让我这种只喜欢用最新版本的,很难受。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,558评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,002评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,024评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,144评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,255评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,295评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,068评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,478评论 1 305
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,789评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,965评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,649评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,267评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,982评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,800评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,847评论 2 351

推荐阅读更多精彩内容