spark 转换tfrecord 成parquet格式

2023-05-16

读取tfrecord 成parquet文件格式
read_tfrecord.py

#coding:utf-8

"""
读取tfrecord生成parquet文件格式
"""
import os
import time
import argparse
# from pyspark.sql import SparkSession
# from pyspark.conf import SparkConf
from pyspark.sql.functions import rand, udf, lit
# from pyspark.sql.functions import xxhash64
from pyspark.sql.functions import hash  as xxhash64
from pyspark.sql.types import FloatType, LongType
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import SparkSession

if __name__ == "__main__":
  # read
  sc = SparkContext()
  sqlContext = SQLContext(sc)
  spark = SparkSession.builder.getOrCreate()

  hadoop = sc._jvm.org.apache.hadoop
  conf = hadoop.conf.Configuration()
  fs = hadoop.fs.FileSystem.get(conf)
  # 小的测试文件
  path = "viewfs://hadoop/user/hadoop-hdp/dlrm_data/train_record"
  df = spark.read.format("tfrecords").option("recordType", "Example").load(path)
  df.printSchema()
  df.show(n=2)
  
  make_sparse = udf(
            lambda s, i: s[i-1],
            LongType(),
        )
  sparse_cols = [
      make_sparse("spa_fea", lit(i)).alias("C{0}".format(i)) for i in range(1, 27)
  ]
    
  make_dense = udf(
            lambda s, i: s[i-1],
            FloatType(),
        )
  dense_cols = [
      make_dense("den_fea", lit(i)).alias("I{0}".format(i)) for i in range(1, 14)
  ]  
  
  make_label = udf(lambda s: float(s), FloatType())
  label_col = make_label("label").alias("label")
  
  cols = [label_col] + dense_cols + sparse_cols
  
  new_df =  df.select(cols)
  
  new_df.show(n=2)
  
  
  part_num = 1024
  new_df = new_df.repartition(part_num)
  # 小的测试文件
  train_output_dir = "viewfs://hadoop/user/hadoop-hdp/dlrm_data/train"
  
  new_df.write.mode("overwrite").parquet(train_output_dir)
  num_examples = sqlContext.read.parquet(train_output_dir).count()
  print(train_output_dir, num_examples)

提交spark 到集群

queue="root.test"
master="yarn-cluster"
num_executors="2"
driver_memory="40g"
executor_cores=4
executor_memory="40g"

/opt/meituan/spark-2.2/bin/spark-submit --queue $queue --conf spark.job.owner=${myusername} \
  --executor-cores "$executor_cores" \
  --executor-memory "$executor_memory" \
  --master yarn \
  --deploy-mode cluster \
  --num-executors "$num_executors" \
  --driver-memory "$driver_memory" \
  --conf spark.driver.maxResultSize=0  \
  --jars viewfs:///user/hadoop/jars/spark-tensorflow-connector_2.11-1.15.0.jar \
  read_tfrecord.py
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

spark 转换tfrecord 成parquet格式 的相关文章

随机推荐

  • 计算机网络 -- RS232接口 ----OSI物理层协议----RS232接口

    个人计算机上的通讯接口之一 xff0c 由电子工业协会 Electronic Industries Association xff0c EIA 所制定的异步传输标准接口 是目前使用最广泛的串行物理接口 xff0c 必须理解 xff1a 1
  • 来自一个前端大神转产品经理后的聊天感悟

    给的学习建议 xff1a 1 推荐给我一本书 锋利的jQuery 2 学会使用思维导图工具 3 课余时间学习理财 4 研发过程中 xff0c 多多留心一些交互 xff0c 自己完善反复琢磨自己的思路 xff08 保证是最简的 xff09 5
  • 算法竞赛入门经典(第二版)-刘汝佳-第三章 数组与字符串 例题+习题(17/18)

    文章目录 说明例题例3 1 UVA 272 TeX 中的引号例3 2 UVA 10082 WERTYU例3 3 UVA 401 回文词例3 4 UVA 340 猜数字游戏的提示例3 5 UVA 1583 生成元例3 6 UVA 1584 环
  • http_parse使用与学习

    最近学习了下http parse解析库 xff0c 是nginx的一个解析http库 xff0c 在解析的过程中 xff0c 它不会调用任何系统调用 xff0c 不会在HEAP上申请内存 xff0c 不会缓存数据 xff0c 并且可以在任意
  • 编译安装googletest

    googleTest git clone https github com google googletest cd googletest mkdir build cd build cmake DBUILD SHARED LIBS 61 O
  • 通过容器打印出容器的启动命令

    使用 runlike sudo apt get install python3 pip sudo apt get install python3 安装runlike pip install runlike 使用 runlike 容器名 例如
  • 使用 supervisor 管理进程

    Supervisor 是一个用 Python 写的进程管理工具 xff0c 可以很方便的用来启动 重启 关闭进程 xff08 不仅仅是 Python 进程 xff09 除了对单个进程的控制 xff0c 还可以同时启动 关闭多个进程 xff0
  • ERRO[0000] unable to determine runtime API version: rpc error: code = Unavailable desc = connection

    遇到错误 xff1a ERRO 0000 unable to determine runtime API version rpc error code 61 Unavailable desc 61 connection error desc
  • kubelet Container runtime network not ready“ networkReady=“NetworkReady=false reason:NetworkPluginNo

    执行 xff1a systemctl status kubelet 报错信息 xff1a E1006 17 36 42 438319 433223 kubelet go 2373 34 Container runtime network n
  • 磁盘划分和磁盘格式化

    文章目录 列出装置的 UUID 等参数parted 列出磁盘的分区表类型与分区信息磁盘分区 xff1a gdisk fdisk用 gdisk 新增分区槽用 gdisk 删除一个分区槽 磁盘格式化 xff08 建立文件系统 xff09 XFS
  • tmux 最新版本安装

    tmux centos系统 安装依赖 yum install openssl devel wget https github com libevent libevent releases download release 2 1 12 st
  • 自动保存恢复tmux会话 关机重启再也不怕

    整个解决方案由三个tmux插件组成 需要注意的是 xff0c 使用这两个 Tmux 插件要求 Tmux 是 1 9 及以上版本 xff0c 如果不符合要求 xff0c 赶紧升级吧 可以参考 xff1a tmux安装最新版本 tpm xff1
  • parse error on line 1, column 4: bare “ in non-quoted-field

    golang报错 xff1a parse error on line 1 column 4 bare 34 in non quoted field 可能的原因是csv是windowns 导出的 xff0c 编码方式是UTF 8 BOM 方式
  • CPU和GPU性能指标收集

    这里的CPU是AMD的芯片 总的性能指标 Total mem Bw Total mem RdBw Total mem WrBw uperf PCIE 上行带宽 上行 网卡到内存 通过 uperf 查看 XGMI uperf NVLink 带
  • PX4 Bootloader解析

    1 引言 半年前入手了Pixhawk V2全套硬件 xff0c 编译好的开源固件也下了 xff0c 四轴也飞了 xff0c 一直想对这套开源飞控进行一个系统地解析 xff0c 由于工作原因一直没时间 最近翻开了PX4飞控源代码 xff0c
  • 进程内存使用

    查看进程内存使用 28028 是进程的pid top p 28028 还可以查看进程的status文件 xff1a cat proc 28028 status VmRSS对应的值就是物理内存占用 内存占用比较多的程序 ps aux sort
  • perf使用

    perf可记录高达700多种events事件 sudo perf list 可以查看可以perf的事件 sudo perf record F 999 e cpu clock faults a g p 28544 sleep 60 制定进程7
  • gperftools

    gperftools 实现了更高性能的多线程的malloc 实现 增加了极好的性能分析工具 gperftools 的前身是 pprof https github com google pprof sudo apt get install a
  • RDMA 设备查看

    1硬件检测 1 1检查硬件是否安装 确认硬件已安装 lspci tvm grep Mellanox 查看OFED驱动版本命令 如果没有驱动可以参考 https km sankuai com page 335338645 安装 rpm qa
  • spark 转换tfrecord 成parquet格式

    读取tfrecord 成parquet文件格式 read tfrecord py coding utf 8 34 34 34 读取tfrecord生成parquet文件格式 34 34 34 import os import time im