Python导出Hbase数据的思路:
- 使用
happybase
连接Hbase
- 使用
table.scan()
扫数据,将得到的数据整理为dataframe
格式
- 将从
Hbase
中得到的byte
类型的数据转为str
类型的数据
示例代码
import happybase
import numpy as np
import pandas as pd
def create_table(table_name):
"""创建表"""
connection = happybase.Connection()
if table_name in connection.tables(): # 在所有的表中
connection.delete_table(table_name, disable=True) # 删除表
connection.create_table(
table_name, # 表名
{
"col_1": dict(), # 定义列族
"col_2": dict(), # 定义列族
"col_3": dict(),
}
)
connection.close()
def generate_data(table_name):
"""添加数据,添加20000行数据"""
connection = happybase.Connection()
table = connection.table(table_name)
with table.batch(batch_size=10) as batch_table:
for i in range(20000):
random_col = np.random.randint(0, 10)
batch_table.put('row{}'.format(i), {
'col_1:c{}'.format(random_col): "{}".format(random_col),
'col_2:c{}'.format(random_col): "{}".format(random_col),
'col_3:c{}'.format(random_col): "{}".format(random_col),
})
def convert_string(value):
"""将byte类型的数据转为str"""
if pd.isna(value):
return value
else:
return value.decode("utf8")
def change_data_to_dataframe(table_name, limit=2000):
"""将数据转为dataframe"""
connection = happybase.Connection()
table = connection.table(table_name)
table_index = []
table_values = []
for key, value in table.scan(limit=limit): # 选择前1000行
table_index.append(key)
table_values.append(value)
table_index = [i.decode("utf8") for i in table_index]
table_df = pd.DataFrame(table_values, index=table_index)
table_df = table_df.applymap(convert_string) # 将bytes解码为utf-8
table_df.columns = [convert_string(i) for i in table_df.columns]
return table_df
def main():
table_name = "generate_table"
create_table(table_name) # 创建数据table
generate_data(table_name) # 生成数据table
table_df = change_data_to_dataframe(table_name)
print(table_df.head())
if __name__ == '__main__':
main()