hbase批量提交与批量读取

需求1

大量数据(10W个点,每个点为double类型)实时存入hbase,hbase为单机版

实现核心代码

  1. put
public static long put(String tablename, List<Put> puts) throws Exception {
        long currentTime = System.currentTimeMillis();
        Connection conn = getConnection();
        final BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() {
            @Override
            public void onException(RetriesExhaustedWithDetailsException e, BufferedMutator mutator) {
                for (int i = 0; i < e.getNumExceptions(); i++) {
                    System.out.println("Failed to sent put " + e.getRow(i) + ".");
                    logger.error("Failed to sent put " + e.getRow(i) + ".");
                }
            }
        };
        BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tablename))
                .listener(listener);
        params.writeBufferSize(5 * 1024 * 1024);

        final BufferedMutator mutator = conn.getBufferedMutator(params);
        try {
            mutator.mutate(puts);
            mutator.flush();
        } finally {
            mutator.close();
            closeConnect(conn);
        }
        return System.currentTimeMillis() - currentTime;
    }
  1. 多线程操作
 threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    HBaseUtil.put(tableName, puts);
                } catch (Exception e) {
                    logger.error("batchPut failed . ", e);
                }
            }
        });

        if(waiting){
            try {
                threadPool.awaitTermination();
            } catch (InterruptedException e) {
                logger.error("HBase put job thread pool await termination time out.", e);
            }
        }

需求2

批量数据读取及优化

  1. 读取代码
 public List<TimeValue> getPeroidDate(String point, DateTime startTime, DateTime endTime) throws IOException {
        List<TimeValue> series = new ArrayList<>();
        byte[] start = Bytes.toBytes(startTime.getMillis());
        byte[] end = Bytes.toBytes(endTime.getMillis());
        byte[] id = Bytes.toBytes(point);
        Scan scan = new Scan(id, id);
        scan.addFamily(Bytes.toBytes("history"));
        Filter f = new ColumnRangeFilter(start, true, end, false);
        scan.setFilter(f);
        scan.setCaching(5000);
        scan.setBatch(5000);
        scan.setCacheBlocks(false);
        //TODO Add parameters to support the pagination
        ResultScanner r = null;
        try {
            r = table.getScanner(scan);
        } catch (IOException e) {
            e.printStackTrace();
        }
        for (Result result : r) {
            for (Cell cell : result.rawCells()) {
                series.add(new TimeValue(new Timestamp(Bytes.toLong(CellUtil.cloneQualifier(cell))), Bytes.toString(CellUtil.cloneValue(cell))));
            }
        }
        return series;
    }
  1. 优化
    多线程将时间切割为一段一段分别读取并合并
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 关于Mongodb的全面总结 MongoDB的内部构造《MongoDB The Definitive Guide》...
    中v中阅读 32,012评论 2 89
  • Java继承关系初始化顺序 父类的静态变量-->父类的静态代码块-->子类的静态变量-->子类的静态代码快-->父...
    第六象限阅读 2,174评论 0 9
  • 人在小的时候,别人都会觉得你是小孩子,当你孤独的时候,最直接的方式就是哭,就会有人觉得你需要陪伴了,过来逗你玩儿。...
    棠燕霏阅读 513评论 0 2
  • 你牵着一匹马在荒原上行走,马背上驮着成捆的书,你是一名旅者。在这个世界上,旅者的任务就是将书籍从一个地方运送到另一...
    Biobot阅读 317评论 0 0
  • 以前老师说过一个故事,展示音乐和情绪的力量:某一年暑假期间,一个寝室的几个男生,为了恶搞对面寝室的一个很内向的同学...
    又穷又懒阅读 229评论 0 0