Sqoop导入HBase,并借助Coprocessor协处理器同步索引到ES

1.环境

  • Mysql 5.6
  • Sqoop 1.4.6
  • Hadoop 2.5.2
  • HBase 0.98
  • Elasticsearch 2.3.5

2.安装(略过)

3.HBase Coprocessor实现

HBase Observer

package com.gavin.observer;


import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.elasticsearch.client.Client;
//import org.elasticsearch.client.transport.TransportClient;
//import org.elasticsearch.common.settings.ImmutableSettings;
//import org.elasticsearch.common.settings.Settings;
//import org.elasticsearch.common.transport.InetSocketTransportAddress;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
//import java.util.NavigableMap;

public class DataSyncObserver extends BaseRegionObserver {

   private static Client client = null;
   private static final Log LOG = LogFactory.getLog(DataSyncObserver.class);


   /**
    * 读取HBase Shell的指令参数
    *
    * @param env
    */
   private void readConfiguration(CoprocessorEnvironment env) {
       Configuration conf = env.getConfiguration();
       Config.clusterName = conf.get("es_cluster");
       Config.nodeHost = conf.get("es_host");
       Config.nodePort = conf.getInt("es_port", -1);
       Config.indexName = conf.get("es_index");
       Config.typeName = conf.get("es_type");

       LOG.info("observer -- started with config: " + Config.getInfo());
   }


   @Override
   public void start(CoprocessorEnvironment env) throws IOException {
       readConfiguration(env);
//        Settings settings = ImmutableSettings.settingsBuilder()
//                .put("cluster.name", Config.clusterName).build();
//        client = new TransportClient(settings)
//                .addTransportAddress(new InetSocketTransportAddress(
//                        Config.nodeHost, Config.nodePort));
       client = MyTransportClient.client;
   }


   @Override
   public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
       try {
           String indexId = new String(put.getRow());
           Map<byte[], List<Cell>> familyMap = put.getFamilyCellMap();
//            NavigableMap<byte[], List<Cell>> familyMap = put.getFamilyCellMap();
           Map<String, Object> json = new HashMap<String, Object>();
           for (Map.Entry<byte[], List<Cell>> entry : familyMap.entrySet()) {
               for (Cell cell : entry.getValue()) {
                   String key = Bytes.toString(CellUtil.cloneQualifier(cell));
                   String value = Bytes.toString(CellUtil.cloneValue(cell));
                   json.put(key, value);
               }
           }
           System.out.println();
           ElasticSearchOperator.addUpdateBuilderToBulk(client.prepareUpdate(Config.indexName, Config.typeName, indexId).setDoc(json).setUpsert(json));
           LOG.info("observer -- add new doc: " + indexId + " to type: " + Config.typeName);
       } catch (Exception ex) {
           LOG.error(ex);
       }
   }

   @Override
   public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> e, final Delete delete, final WALEdit edit, final Durability durability) throws IOException {
       try {
           String indexId = new String(delete.getRow());
           ElasticSearchOperator.addDeleteBuilderToBulk(client.prepareDelete(Config.indexName, Config.typeName, indexId));
           LOG.info("observer -- delete a doc: " + indexId);
       } catch (Exception ex) {
           LOG.error(ex);
       }
   }

}

ES方法

package com.gavin.observer;


import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.client.Client;
//import org.elasticsearch.client.transport.TransportClient;
//import org.elasticsearch.common.settings.ImmutableSettings;
//import org.elasticsearch.common.settings.Settings;
//import org.elasticsearch.common.transport.InetSocketTransportAddress;

import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ElasticSearchOperator {

   // 缓冲池容量
   private static final int MAX_BULK_COUNT = 10;
   // 最大提交间隔(秒)
   private static final int MAX_COMMIT_INTERVAL = 60 * 5;

   private static Client client = null;
   private static BulkRequestBuilder bulkRequestBuilder = null;

   private static Lock commitLock = new ReentrantLock();

   static {

       // elasticsearch1.5.0
//        Settings settings = ImmutableSettings.settingsBuilder()
//                .put("cluster.name", Config.clusterName).build();
//        client = new TransportClient(settings)
//                .addTransportAddress(new InetSocketTransportAddress(
//                        Config.nodeHost, Config.nodePort));

       // 2.3.5
       client = MyTransportClient.client;

       bulkRequestBuilder = client.prepareBulk();
       bulkRequestBuilder.setRefresh(true);

       Timer timer = new Timer();
       timer.schedule(new CommitTimer(), 10 * 1000, MAX_COMMIT_INTERVAL * 1000);
   }

   /**
    * 判断缓存池是否已满,批量提交
    *
    * @param threshold
    */
   private static void bulkRequest(int threshold) {
       if (bulkRequestBuilder.numberOfActions() > threshold) {
           BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();
           if (!bulkResponse.hasFailures()) {
               bulkRequestBuilder = client.prepareBulk();
           }
       }
   }

   /**
    * 加入索引请求到缓冲池
    *
    * @param builder
    */
   public static void addUpdateBuilderToBulk(UpdateRequestBuilder builder) {
       commitLock.lock();
       try {
           bulkRequestBuilder.add(builder);
           bulkRequest(MAX_BULK_COUNT);
       } catch (Exception ex) {
           ex.printStackTrace();
       } finally {
           commitLock.unlock();
       }
   }

   /**
    * 加入删除请求到缓冲池
    *
    * @param builder
    */
   public static void addDeleteBuilderToBulk(DeleteRequestBuilder builder) {
       commitLock.lock();
       try {
           bulkRequestBuilder.add(builder);
           bulkRequest(MAX_BULK_COUNT);
       } catch (Exception ex) {
           ex.printStackTrace();
       } finally {
           commitLock.unlock();
       }
   }

   /**
    * 定时任务,避免RegionServer迟迟无数据更新,导致ElasticSearch没有与HBase同步
    */
   static class CommitTimer extends TimerTask {
       @Override
       public void run() {
           commitLock.lock();
           try {
               bulkRequest(0);
           } catch (Exception ex) {
               ex.printStackTrace();
           } finally {
               commitLock.unlock();
           }
       }
   }

}

打包并上传到hdfs

mvn clean compile assembly:single
mv observer-1.0-SNAPSHOT-jar-with-dependencies.jar observer-hb0.98-es2.3.5.jar
hdfs dfs -put observer-hb0.98-es2.3.5.jar /hbase/lib/

4.创建HBase表,并启用Coprocessor

mysql

hbase shell
create 'region','data'
disable 'region'
alter 'region', METHOD => 'table_att', 'coprocessor' => 'hdfs:///hbase/lib/observer-hb0.98-es2.3.5.jar|com.gavin.observer.DataSyncObserver|1001|es_cluster=elas2.3.4,es_type=mysql_region,es_index=hbase,es_port=9300,es_host=localhost'
enable 'region'

oracle

create 'sp','data'
disable 'sp'
alter 'sp', METHOD => 'table_att', 'coprocessor' => 'hdfs:///hbase/lib/observer-hb0.98-es2.3.5.jar|com.gavin.observer.DataSyncObserver|1001|es_cluster=elas2.3.4,es_type=oracle_sp,es_index=hbase,es_port=9300,es_host=localhost'
enable 'sp'

查看

hbase(main):007:0* describe 'ora_test'

Table ora_test is ENABLED                                            
ora_test, {TABLE_ATTRIBUTES => {coprocessor$1 => 'hdfs:///appdt/hbase
/lib/observer-hb1.2.2-es2.3.5.jar|com.gavin.observer.DataSyncObserver
|1001|es_cluster=elas2.3.4,es_type=ora_test,es_index=hbase,es_port=93
00,es_host=localhost'}                                               
COLUMN FAMILIES DESCRIPTION                                          
{NAME => 'data', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW',
 REPLICATION_SCOPE => '0', VERSIONS => '1', COMPRESSION => 'NONE', MI
N_VERSIONS => '0', TTL => 'FOREVER', KEEP_DELETED_CELLS => 'FALSE', B
LOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}     
1 row(s) in 0.0260 seconds

删除Coprocessor

disable 'ora_test' 
alter 'ora_test',METHOD => 'table_att_unset',NAME =>'coprocessor$1' 
enable 'ora_test'

查看删除效果

hbase(main):011:0> describe 'ora_test'

Table ora_test is ENABLED                                           
ora_test                                                             
COLUMN FAMILIES DESCRIPTION                                          
{NAME => 'data', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW',
 REPLICATION_SCOPE => '0', VERSIONS => '1', COMPRESSION => 'NONE', MI
N_VERSIONS => '0', TTL => 'FOREVER', KEEP_DELETED_CELLS => 'FALSE', B
LOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}     
1 row(s) in 0.0200 seconds

5.使用sqoop上传数据

mysql

bin/sqoop import --connect jdbc:mysql://192.168.1.187:3306/trade_dev --username mysql --password 111111 --table TB_REGION --hbase-table region --hbase-row-key REGION_ID --column-family data

oracle

bin/sqoop import --connect jdbc:oracle:thin:@192.168.16.223:1521/orcl --username sitts --password password --table SITTS.ESB_SERVICE_PARAM --split-by PARAM_ID --hbase-table sp --hbase-row-key PARAM_ID --column-family data

6.校验

HBase

scan 'region'

ES

ES Head

7.参考

HBase Observer同步数据到ElasticSearch

8.注意

  • 同一个Coprocessor用一个index,不同表可以设置不同type,不然index会乱
  • 修改Java代码后,上传到HDFS的jar包文件必须和之前不一样,否则就算卸载掉原有的coprocessor再重新安装也不能生效
  • 如果你有多个表对多个索引/类型的映射,每个表所加载Observer对应的jar包路径不能相同,否则ElasticSearch会串数据
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,294评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,780评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,001评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,593评论 1 289
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,687评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,679评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,667评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,426评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,872评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,180评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,346评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,019评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,658评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,268评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,495评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,275评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,207评论 2 352

推荐阅读更多精彩内容