看来您正在尝试从 Hive 表读入 pandas 数据帧并进行一些转换并将其保存回某个 Hive 外部表。请参考下面的代码作为示例。在这里,我已从 Hive 表读取到 pandas 数据框中,并向其中添加了一些日期列。后来我使用 subprocess 模块来执行我的 shell,它将数据加载到按某些日期列分区的 Hive 表中。
from pyhive import hive
import pandas as pd
import sqlalchemy
from sqlalchemy.engine import create_engine
import datetime
from subprocess import PIPE, Popen
import subprocess
import sys
conn = hive.Connection(host="yourhost.com", port=10000, username="vikct001")
cursor = conn.cursor()
query="select user_id,country from test_dev_db.test_data"
start_time= datetime.datetime.now()
output_file='/home/vikct001/user/vikrant/python/test_data.csv'
data=pd.read_sql(query,conn)
data['current_date'] = pd.datetime.today().strftime("%Y-%m-%d")
print(data)
data.to_csv(output_file, sep='|', encoding='utf-8',index=None)
hivequery=""" hive --hivevar loaded_date=$(date +"%Y-%m-%d") hive -e 'LOAD DATA LOCAL INPATH "/home/vikct001/user/vikrant/python/test_data.csv" INTO TABLE test_dev_db.test_data_external PARTITION (loaded_date="${hivevar:loaded_date}")';"""
def save_to_hdfs(output_file):
print("I am here")
p=subprocess.Popen(hivequery,shell=True,stderr=subprocess.PIPE)
stdout,stderr = p.communicate()
if p.returncode != 0:
print stderr
sys.exit(1)
save_to_hdfs(output_file)
end_time=datetime.datetime.now()
print 'processing ends', (start_time-end_time).seconds/60.0,' minutes'
表说明:
hive (test_dev_db)> desc test_dev_db.test_data_external;
OK
id int
country string
input_date date
loaded_date string
# Partition Information
# col_name data_type comment
loaded_date string
您可以看到数据已加载并创建了具有当前日期的分区。
hive (test_dev_db)> show partitions test_dev_db.test_data_external;
OK
loaded_date=2019-08-21
hive (test_dev_db)> select * from test_dev_db.test_data_external;
OK
1 India 2019-08-21 2019-08-21
2 Ukraine 2019-08-21 2019-08-21
1 India 2019-08-21 2019-08-21
2 Ukraine 2019-08-21 2019-08-21
1 India 2019-08-21 2019-08-21
2 Ukraine 2019-08-21 2019-08-21
1 India 2019-08-21 2019-08-21