import happybase
import pandas as pd
def get_hbase_pool(host, size=5):
"""获取一个连接池
:param host: hbase主机ip
:return: 连接池
"""
pool = happybase.ConnectionPool(host=host, size=int(size)) # 因R传递的size过来非int,需用int转换
return pool
def read_hbase_data(pool, table_name, col_filter, col, row_prefix=None):
"""读取hbase的数据表并转换成dataframe输出
:param pool: 连接池
:param col_filter: 过滤器(数据筛选)
:param col: dataframe的列名与read_col对应
:param row_prefix: 指定row_key的前缀
:param table_name: 要读取hbase的表名
:return: 数据读取结果dataframe
"""
read_col = [i.encode() for i in ['f:{}'.format(col[i]) for i in range(len(col))]] #指定读取hbase数据表的列名(base64编码)
row_prefix = row_prefix.encode()
result = pd.DataFrame(columns=col)
with pool.connection() as connection:
try:
#print(connection.tables()) #所有数据表
tab = connection.table(table_name)
for key, value in tab.scan(row_prefix=row_prefix, columns=read_col, filter=col_filter):
#print('key= ', key, '\nvalue= ', value)
col_value = pd.DataFrame.from_dict(value, orient='index').T # dict转dataframe
col_value = col_value.applymap(lambda x: str(x, 'utf-8')) # 将bytes解码为utf-8
col_value.columns = col
result = result.append(col_value)
connection.close()
except Exception as e:
connection.close()
print('Error:', e)
return result
def read_hbase_data_nopool(host, table_name, col_filter, col, row_prefix=None):
"""读取hbase的数据表并转换成dataframe输出
:param host: hbase主机ip
:param col_filter: 过滤器(数据筛选)
:param col: dataframe的列名与read_col对应
:param row_prefix: 指定row_key的前缀
:param table_name: 要读取hbase的表名
:return: 数据读取结果dataframe
"""
read_col = [i.encode() for i in ['f:{}'.format(col[i]) for i in range(len(col))]] #指定读取hbase数据表的列名(base64编码)
row_prefix = row_prefix.encode()
result = pd.DataFrame(columns=col)
connection = happybase.Connection(host, autoconnect=False) # ip
connection.open()
try:
#print(connection.tables()) #所有数据表
tab = connection.table(table_name)
for key, value in tab.scan(row_prefix=row_prefix, columns=read_col, filter=col_filter):
#print('key= ', key, '\nvalue= ', value)
col_value = pd.DataFrame.from_dict(value, orient='index').T # dict转dataframe
col_value = col_value.applymap(lambda x: str(x, 'utf-8')) # 将bytes解码为utf-8
col_value.columns = col
result = result.append(col_value)
connection.close()
except Exception as e:
connection.close()
print('Error:', e)
return result
if __name__ == "__main__":
host = 'ip'
table_name = '表名'
row_prefix = None # row_key的前缀
col_filter = "SingleColumnValueFilter('f', 'x', =, 'binary:a')" # 过滤器 筛选x=a的
col = ['id', 'x', 'y'] # dataframe的列名
pool = get_hbase_pool(host, size=5)
result = read_hbase_data(pool=pool, table_name=table_name, col_filter=col_filter,
col=col, row_prefix=row_prefix)
python读取hbase
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
推荐阅读更多精彩内容
- 1.Hbase安装: 可以参考这篇文章,写的很详细:https://blog.csdn.net/wuruijie3...
- 准备数据: 上传到hdfs 编写mapper: 编写reducer: 编写driver: 打包运行主类: yarn...
- 一、版本信息和环境 1、版本信息(全是Apache版本): hadoop-2.6.0 hbase-1.2.6.1 ...
- 1 通过 scan 读取 hbase 表 应用场景: 读取方法: 直到读取数据的inputformat是 Tabl...