借鉴水塘抽样算法的一种解决思想

1背景

关于水塘抽样的算法原理此处不再说明了, 本文重点是针对它的一种应用场景, 具体算法原理可参考水塘抽样算法原理

2问题:

在编写Spark程序时, 鉴于内存等资源不够, 然而Hbase数据量又十分巨大(100亿数据, 申请资源Spark核数以及内存较小), 此时在Spark应用程序中调用了repartition进行重新分区, 导致了大量Shuffle 网络IO, 很有可能使得Spark应用程序瘫痪. 因此想到一种委婉的解决方式, 使用水塘抽样算法, 它可以在一开始不知道数据总量的情况下进行抽样, 最终得到接近均匀的分区. 具体解决原理是先进行扫描Hbase进行水塘抽样,抽样出rowKey, 然后分别针对这些rowKey进行单独scan扫描处理, 由于处理的数据量小了就可以保证应用程序不崩溃. 此处是保证可用性牺牲了性能.
解决方式:

  • 使用水塘抽样算法
  • Hbase预分region,并配置合适的region 分隔算法, 保证每个region数据量不要太大, 按region扫描

3代码

3.1水塘抽样解决代码

此处给出一个基本的Demo,

首先创建一个字节数组封装类, 用于比较字节数组(rowKey)

package hbase;

import org.apache.hadoop.hbase.util.Bytes;

/**
 * Created by zhangkai12 on 2018/6/27.
 */
public class MyBytes implements Comparable<MyBytes> {
    private byte[] bytes;

    public MyBytes(byte[] bytes) {
        this.bytes = bytes;
    }

    public byte[] getBytes() {
        return bytes;
    }

    public void setBytes(byte[] bytes) {
        this.bytes = bytes;
    }

    @Override
    public int compareTo(MyBytes o) {
        return Bytes.compareTo(this.bytes, o.getBytes());
    }
}

其次创建一个Hbase基本处理类

package hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import java.io.IOException;

public class HbaseUtil {

    static Configuration conf = HBaseConfiguration.create();
    private static Connection connection;

    public HbaseUtil(String zkUrl) {
        conf.set("hbase.zookeeper.quorum", zkUrl);
        conf.set("hbase.client.operation.timeout", "60000");
    }

    public static synchronized Connection getConn() {
        if (connection == null) {
            try {
                connection = ConnectionFactory.createConnection(conf);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return connection;
    }

    public static Table getTable(String tableName) throws IOException {
        Table table = getConn().getTable(TableName.valueOf(tableName));
        return table;
    }

    public static void closeTable(Table table) {
        if (table == null) {
            return;
        }
        try {
            table.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

最后就是最终的抽样算法类, 此处的抽样函数直接将Spark的scala代码移植成java版本的.

package hbase;

import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
 * Created by zhangkai12 on 2018/6/27.
 */
public class Sample {
    private static final byte[] ROW_END = new byte[]{-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1};
    private static final byte[] ROW_START = {0};
    public static void main(String[] args) {
        Table table = null;
        ResultScanner scanner = null;
        try {
            table = HbaseUtil.getTable("myedge");
            Scan scan = new Scan();
            scan.addFamily(Bytes.toBytes("g"));
            scanner = table.getScanner(scan);
            Iterator<Result> iterator = scanner.iterator();
            byte[][] rowKeys = reservoirSampleAndCount(iterator, 20, 100);

            System.out.println("----------------------sort before----------------------");
            Arrays.stream(rowKeys).forEach(res -> {
                System.out.println(" value " + Arrays.toString(res));
            });

            System.out.println("----------------------sort after----------------------");
            Stream<MyBytes> sorted = Arrays.stream(rowKeys).map(res -> new MyBytes(res)).sorted();
            Arrays.stream(rowKeys).map(res -> new MyBytes(res)).sorted().forEach(res -> {
                System.out.println("****" + Arrays.toString(res.getBytes()));
            });
            List<MyBytes> collect = sorted.collect(Collectors.toList());
            byte[] start = ROW_START;
            byte[] end;
            int sumNum = 0;
            if (null != collect) {
                for (int i = 0; i < collect.size() ; i++) {
                    end = collect.get(i).getBytes();
                    sumNum += scanWithStartAndEndRow(start,end);
                    start = end ;
                }
                start = collect.get(collect.size()-1).getBytes();
                end = ROW_END;
                sumNum += scanWithStartAndEndRow(start,end);
            }


            /**
             * 不使用抽样方法计算所有的sum
             */
            int numNotSample = getSumWithNOSampleMethod();
            System.out.println(sumNum + "---------------" + numNotSample);
            boolean judge = (sumNum == numNotSample);
            System.out.println(judge);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (null != scanner) {
                scanner.close();
            }
            if (null != table) {
                HbaseUtil.closeTable(table);
            }
        }
    }


    /**
     *  算法描述
     *  从S中抽取首k项放入「水塘」中
     对于每一个S[j]项(j ≥ k):
     随机产生一个范围0到j的整数r
     若 r < k 则把水塘中的第r项换成S[j]项
     */
    public static byte[][] reservoirSampleAndCount(Iterator<Result> input, int k, long seed) {
        byte[][] reservoir = new byte[k][];
        int i = 0;
        while (i < k && input.hasNext()) {
            Result next = input.next();
            reservoir[i] = next.getRow();
            i ++;
        }
        if (i < k) {
            byte[][] trimReservoir = new byte[i][];
            System.arraycopy(reservoir, 0, trimReservoir, 0, i);
            return trimReservoir;
        } else {
            Random random = new Random(seed);
            while (input.hasNext()) {
                byte[] item = input.next().getRow();
                int replacementIndex = random.nextInt(i);
                if (replacementIndex < k) {
                    reservoir[replacementIndex] = item;
                }
                i ++;
            }
            return reservoir;
        }
    }


    private static int scanWithStartAndEndRow(byte[] start, byte[] end) {
        int num = 0;
        Table table = null;
        ResultScanner scanner = null;
        try {
            table = HbaseUtil.getTable("myedge");
            Scan scan = new Scan();
            scan.addFamily(Bytes.toBytes("g"));
            scan.setStartRow(start);
            scan.setStopRow(end);
            scanner = table.getScanner(scan);
            Iterator<Result> iterator = scanner.iterator();
            while (iterator.hasNext()) {
                Result next = iterator.next();
                byte[] row = next.getRow();
                String s = Bytes.toString(row);
                System.out.println(Arrays.toString(row) + "---" + s);
                num ++;
            }
            System.out.println("---num-----: " + num);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (null != scanner) {
                scanner.close();
            }
            if (null != table) {
                HbaseUtil.closeTable(table);
            }
        }
        return num;
    }

    private static int getSumWithNOSampleMethod() {
        int num = 0;
        Table table = null;
        ResultScanner scanner = null;
        try {
            table = HbaseUtil.getTable("myedge");
            Scan scan = new Scan();
            scan.addFamily(Bytes.toBytes("g"));
            scanner = table.getScanner(scan);
            Iterator<Result> iterator = scanner.iterator();
            while (iterator.hasNext()) {
                Result next = iterator.next();
                num ++;
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (null != scanner) {
                scanner.close();
            }
            if (null != table) {
                HbaseUtil.closeTable(table);
            }
        }
        return num;
    }
}

3.2 按Region进行数据处理

package hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

/**
 * Created by zhangkai12 on 2018/6/27.
 */
public class HbaseRegion {

    public static void main(String[] args) {
        try {
            checkTable("Janus321");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    private static void checkTable(String tabName) throws Exception {
        TableName tn = TableName.valueOf(tabName);
        Configuration config = HBaseConfiguration.create();
        HRegionInfo regionInfo;
        Connection connection = null;
        Admin admin = null;
        Table table = null;
        try  {
            connection = ConnectionFactory.createConnection(config);
            admin = connection.getAdmin();
            table = connection.getTable(tn);

            if (!admin.tableExists(TableName.valueOf(tabName))) {
                return;
            }
            List<HRegionInfo> lr = admin.getTableRegions(tn);
            Result r = null;
            if (lr == null) {
                System.out.print("No region found for table " + tabName);
            }
            // 遍历表的每个region
            Iterator<HRegionInfo> ir = lr.iterator();
            int i = 1;
            while (ir.hasNext()) {
                regionInfo = ir.next();
                ResultScanner scanner = null;
                byte[] startRowkey = regionInfo.getStartKey();
                System.out.println("----start----" + Bytes.toString(startRowkey));
                byte[] endKey = regionInfo.getEndKey();
                System.out.println("----end----" + Bytes.toString(endKey));
                Scan sc = new Scan();
                sc.setBatch(1);
                sc.setStartRow(startRowkey);
                sc.setStopRow(endKey);
                try {
                    scanner = table.getScanner(sc);
                    Iterator<Result> iterator = scanner.iterator();
                    while (iterator.hasNext()) {
                        Result next = iterator.next();
                        byte[] row = next.getRow();
                        System.out.println("第" + i + " 批 " + Arrays.toString(row));
                    }
                } finally {
                    if (null != scanner) {
                        scanner.close();
                    }
                }
                i ++;
            }
        }catch (Exception e) {

        } finally {
            if (null != table) {
                table.close();
            }
            if (null != admin) {
                admin.close();
            }
            if (null != connection) {
                connection.close();
            }
        }
    }
}

4水塘抽样结果图

水塘抽样结果图

根据水塘抽样结果图可以发现, 我们通过抽样算法处理的数据总量和不使用抽样算法的数据总量是一致的,也就保证了算法的准确性.

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351

推荐阅读更多精彩内容