读取tfrecord 成parquet文件格式
read_tfrecord.py
#coding:utf-8
"""
读取tfrecord生成parquet文件格式
"""
import os
import time
import argparse
# from pyspark.sql import SparkSession
# from pyspark.conf import SparkConf
from pyspark.sql.functions import rand, udf, lit
# from pyspark.sql.functions import xxhash64
from pyspark.sql.functions import hash as xxhash64
from pyspark.sql.types import FloatType, LongType
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import SparkSession
if __name__ == "__main__":
# read
sc = SparkContext()
sqlContext = SQLContext(sc)
spark = SparkSession.builder.getOrCreate()
hadoop = sc._jvm.org.apache.hadoop
conf = hadoop.conf.Configuration()
fs = hadoop.fs.FileSystem.get(conf)
# 小的测试文件
path = "viewfs://hadoop/user/hadoop-hdp/dlrm_data/train_record"
df = spark.read.format("tfrecords").option("recordType", "Example").load(path)
df.printSchema()
df.show(n=2)
make_sparse = udf(
lambda s, i: s[i-1],
LongType(),
)
sparse_cols = [
make_sparse("spa_fea", lit(i)).alias("C{0}".format(i)) for i in range(1, 27)
]
make_dense = udf(
lambda s, i: s[i-1],
FloatType(),
)
dense_cols = [
make_dense("den_fea", lit(i)).alias("I{0}".format(i)) for i in range(1, 14)
]
make_label = udf(lambda s: float(s), FloatType())
label_col = make_label("label").alias("label")
cols = [label_col] + dense_cols + sparse_cols
new_df = df.select(cols)
new_df.show(n=2)
part_num = 1024
new_df = new_df.repartition(part_num)
# 小的测试文件
train_output_dir = "viewfs://hadoop/user/hadoop-hdp/dlrm_data/train"
new_df.write.mode("overwrite").parquet(train_output_dir)
num_examples = sqlContext.read.parquet(train_output_dir).count()
print(train_output_dir, num_examples)
提交spark 到集群
queue="root.test"
master="yarn-cluster"
num_executors="2"
driver_memory="40g"
executor_cores=4
executor_memory="40g"
/opt/meituan/spark-2.2/bin/spark-submit --queue $queue --conf spark.job.owner=${myusername} \
--executor-cores "$executor_cores" \
--executor-memory "$executor_memory" \
--master yarn \
--deploy-mode cluster \
--num-executors "$num_executors" \
--driver-memory "$driver_memory" \
--conf spark.driver.maxResultSize=0 \
--jars viewfs:///user/hadoop/jars/spark-tensorflow-connector_2.11-1.15.0.jar \
read_tfrecord.py
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)