Python 读写 hbase 数据的正确姿势(一)


title: Python 读写 hbase 数据的正确姿势(一)

tags:

  • hbase
  • happybase
  • python

categories:

  • �Hbase

comments: true
date: 2017-09-09 19:00:00


之前操作 hbase 大都是用 java 写,或者偶尔用 python 写几个一些简单的 put、get 操作。最近在使用 happybase 库批量向 hbase 导入数据,并通过 java 实现查询的一些复杂的搜索时(scan+filter),遇到了一些有趣的问题。

实验版本

Hbase 版本:1.0.0
Happybase 版本:1.1.0
Python 版本:2.7.13

问题1:filter 过滤失败

问题重现

hbase 的使用场景大概是这样的:

有一个 hbase table,存储一些文章的基本信息,包括创建时间、文章ID、文章类别ID等,同属于一个column family,"article"。

查询的场景则是查找"指定的时间范围","文章类型ID为N" 的所有文章数据。

根据以上场景,设计如下 table:

  1. hbase table 为 article 。
  2. rowkey 是 "ARTICLE" + 微秒级时间戳(类似OpenTSDB 的rowkey,便于按时间序列查到某一段时间创建的 articles),即 "ARTICLE1504939752000000"。
  3. family 为 "basic",包含 "ArticleID", "ArticleTypeID", "Created", 三个 column。

查询时通过指定 rowkey start 和 rowkey stop,可以 scan 某一个时间段的数据(因为 rowkey 中包含数值型的时间戳),通过 hbase filter 实现"ArticleTypeID" == N 的过滤条件。

开始导入数据、准备查询,以下是导入数据部分代码 demo:

def save_batch_events(datas, table=None):
    with get_connetion_pool().connection() as conn:
        if table is not None:
            t = conn.table(table)
        else:
            t = conn.table(TABLE)
        b = t.batch(transaction=False)
        for row, data in datas.items():
            b.put(row, data)
        b.send()

def save_main_v1():
    datas = dict()
    for i in range(100):
        article_type_id = i % 2
        timestamp = time.time() + i
        rowkey = "ARTICLE" + str(timestamp * 1000000)
        data = {
            "basic:" + "ArticleID": str(i),
            "basic:" + "ArticleTypeID": str(article_type_id),
            "basic:" + "Created": str(timestamp),
        }
        datas[rowkey] = data
    save_batch_events(datas)

查看一下 hbase 的数据,100 条数据全部正常导入,其中50条数据 "ArticleTypeID" 为0,50条为1

图 1:python-happyhbase 写入的数据

接下来就是用 hbase filter 过滤的过程了,假设查询 "ArticleTypeID" 为 0 的数据,使用 java 客户端实现查询:

    public static void test_hbase_filter() throws IOException {
        TableName tableName = TableName.valueOf("test_article_1");
        Configuration conf = HBaseConfiguration.create();
        Connection conn = ConnectionFactory.createConnection(conf);
        Table table = conn.getTable(tableName);

        // Scan python table `test_article_1`
        System.out.println("Prepare to scan !");
        FilterList list = new FilterList(FilterList.Operator.MUST_PASS_ONE);
        SingleColumnValueFilter filter1 = new SingleColumnValueFilter(Bytes.toBytes("basic"),
                Bytes.toBytes("ArticleTypeID"), CompareOp.EQUAL, Bytes.toBytes(1L));
        list.addFilter(filter1);
        Scan s = new Scan();
        s.addFamily(Bytes.toBytes("basic"));
        s.setFilter(list);
        ResultScanner scanner = table.getScanner(s);
        int num = 0;
        for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
            num++;
        }
        System.out.println("Found row: " + num);// 预期 50,结果为 0

问题出现:使用 java 期望的查询结果为 50 条,但是查出的结果却是 0 条!

使用 python 查询却可以得到正确的结果:

def recent_events_v1(start, end, table=None, filter_str=None, limit=2000):
    with get_connetion_pool().connection() as conn:
        if table is not None:
            t = conn.table(table)
        else:
            t = conn.table(TABLE)
        start_row = 'ARTICLE' + str(start * 1000000)
        end_row = 'ARTICLE' + str(end * 1000000)
        return t.scan(row_start=start_row, row_stop=end_row, filter=filter_str, limit=limit)

if __name__ == '__main__':
    filter_str = "SingleColumnValueFilter('basic', 'ArticleTypeID', =, 'binary:1')"
    results = recent_events_v1(start=0, end=1505023900, filter_str=filter_str)
    print len([i for i in results])  # 期望值为50, 实际值为 50,正确

寻找原因

经过 N 次确认,java 的读操作是没有问题的,python 实现的读写也得到了预期的效果。进一步探究,特意用 java 完整的实现的数据的导入和查询:

public static void test_hbase_filter1() throws IOException {        
        tableName = TableName.valueOf("test_article_java_1");
        table = conn.getTable(tableName);
        System.out.println("Prepare create table !");
        Admin admin = conn.getAdmin();
        if (!admin.tableExists(tableName)) {
            HTableDescriptor td = new HTableDescriptor(tableName);
            HColumnDescriptor basic = new HColumnDescriptor("basic");
            td.addFamily(basic);
            admin.createTable(td);
            System.out.println("Created !");
        }

        // Put value to test_article_java_1
        System.out.println("Prepare to write data to: " + table.getName().toString());
        for (int i = 0; i < 100; i++) {
            Put p = new Put(Bytes.toBytes("ARTICLE" + (System.currentTimeMillis() + 1000) * 1000));
            p.addColumn(Bytes.toBytes("basic"), Bytes.toBytes("ArticleTypeID"), Bytes.toBytes(Long.valueOf(i % 2)));
            table.put(p);
        }

        // scan test_article_java_1
        scanner = table.getScanner(s);
        num = 0;
        for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
            num++;
        }
        System.out.println("Found row: " + num);// 预期 50,结果为 50
}

可见,用 java 写的数据,用 java 读是没问题的,用 python 写的数据用 python 读也没问题。但 java 读 python 写的数据就存在异常,难道是 python 写的数据和 java 写的数据不一样?为此分别对比一下 python 和 java 写入 hbase 的数据:

图 2:java 写入的数据

仔细观察图 1 和图 2 中的数据可以发现,python 写入的数据中对应的 ArticleTypeID 值为 01,而 java 则是一串 bytes。突然意识到一个问题,hbase 读写的时候要求传入的数据类型为 bytes,而使用 python 传输的过程中这种整形数据是直接通过 str() 方法转成字符串存储到 hbase 中的,并不是以 bytes 的形式存于 hbase,所以使用 java 用转化成 bytes 的 filter 读才没能得到预期的结果。

正确的 filter 姿势

既然找到了原因,解决问题就比较简单了,存储的时候将整型数据全部都通过 struct.pack 方法转成 bytes 存入,这样就可以被通用的查询了,同时 使用 python 查询的时候也将 filter 中的整型数值替换成 bytes 格式。

使用 struct.pack 方法将整型转成 bytes 时,注意选择使用 big-endian 的 Byte order,即 pack 方法的第一个参数使用 >。因为 java 官方 client 采用这种字节序,下面是 Bytes.toBytes 的实现源码,可见采用的是 big-endian

  /**
   * Convert a long value to a byte array using big-endian.
   *
   * @param val value to convert
   * @return the byte array
   */
  public static byte[] toBytes(long val) {
    byte [] b = new byte[8];
    for (int i = 7; i > 0; i--) {
      b[i] = (byte) val;
      val >>>= 8;
    }
    b[0] = (byte) val;
    return b;

写入的代码:

def save_main_v2():
    datas = dict()
    for i in range(100):
        article_type_id = i % 2
        timestamp = time.time() + i
        rowkey = "ARTICLE" + str(timestamp * 1000000)
        data = {
            "basic:" + "ArticleID": str(i),
            "basic:" + "ArticleTypeID": struct.pack('>q', article_type_id),
            "basic:" + "Created": str(timestamp),
        }
        datas[rowkey] = data
    save_batch_events(datas, table="test_article_2")

查询是的filter:

filter_str = "SingleColumnValueFilter('basic', 'ArticleTypeID', =, 'binary:{value}')".format(value=struct.pack('>q', 1))

这样就没有问题了~

总结

使用 python 读写 hbase 数据,直接传输整型参数时,hbase 的 thrift 接口会抛出 TDecodeException: Field 'value(3)' of 'Mutation' needs type 'STRING' 异常,被告知只接受 string 类型的数据。这时注意将整型数据转化成 bytes 形式的 str,而不要直接使用 str() 方法强转,否则难以避免的会出现一些非预期的结果。

以为这样就没问题了? 请关注看下文~

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容