如何将行合并到spark数据帧的列中作为有效的json以将其写入mysql

2024-02-09

我正在尝试将多行合并为一列,作为 Spark 数据帧(spark 1.6.1)中的有效 json 格式。然后我希望它存储在mysql表中。

我的原始 Spark 数据框如下所示:

|user_id   |product_id|price       | 
|A         |p1        |3000        |
|A         |p2        |1500        |
|B         |P1        |3000        |
|B         |P3        |2000        |

我想像这样转换上面的表:

|user_id   |contents_json 
|A         |{(product_id:p1, price:3000), (product_id:p2, price:1500)} 
|B         |{{product_id:p1, price:3000), (product_id:p3, price:2000)} 

然后将上面的表放入mysql表中。

这与爆炸的方式完全相反,但我找不到正确的方法。


我假设您正在寻找下面显示的 JSON 输出。

from pyspark.sql.functions import col, collect_list, struct

df = sc.parallelize([('A','P1',3000), ('A','P2',1500),
                     ('B','P1',3000), ('B','P3',2000)]).toDF(["user_id", "product_id","price"])

> Spark2.0

df1 = df.\
    groupBy("user_id").agg(collect_list(struct(col("product_id"),col("price"))).alias("contents_json"))
df1.show()

Spark1.6

zipCols = psf.udf(
  lambda x, y: list(zip(x, y)),
  ArrayType(StructType([
      # Adjust types to reflect data types
      StructField("product_id", StringType()),
      StructField("price", IntegerType())
  ]))
)

df1 = df.\
    groupBy("user_id").agg(
        zipCols(
            collect_list(col("product_id")), 
            collect_list(col("price"))
        ).alias("contents_json")
    )

for row in df1.toJSON().collect():
    print row

输出是:

{"user_id":"B","contents_json":[{"product_id":"P1","price":3000},{"product_id":"P3","price":2000}]}
{"user_id":"A","contents_json":[{"product_id":"P1","price":3000},{"product_id":"P2","price":1500}]}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何将行合并到spark数据帧的列中作为有效的json以将其写入mysql 的相关文章

  • 如何在 IntelliJ IDEA 中按 JSON 中的路径搜索?

    我有很长的 JSON 文件 例如 a b c keyC 和路径 a b c 如何使用路径在 JSON 中搜索 转到行 问题类似于如何在 IntelliJ IDEA 中复制 JSON 中的路径 https stackoverflow com
  • 有序 JSON 对象

    我有一个 servlet 它与数据库通信 然后返回有序 按时间排序 对象的列表 在servlet部分 我有 access DB returns a list of User objects ordered ArrayList users M
  • 使用 lift-json 反序列化具有 Map[String,Any] 属性的案例类

    几天来我一直在努力解决一些通过 lift json 应该很简单的事情 将映射序列化为 JSON 我知道 我知道 根对象还不能是 List 或 Map 但我愿意暂时包装在一个案例类中 但我仍然无法让它工作 感谢一些堆栈溢出帮助 我已经可以进行
  • 如何使用多个 { 'not find' } 来干燥方法?

    我正在尝试优雅地处理以下错误的 JSON 其中Hash fetch似乎不是一个选择 使用 Hash fetch 优雅地处理错误的 JSON https stackoverflow com questions 25193627 handle
  • 如何使用 Jackson 将对象附加到现有 JSON 文件

    如何使用 Jackson 将对象附加到现有 JSON 文件 File file new File test json if file exists file createNewFile ObjectMapper mapper new Obj
  • 插入 Mysql 表时防止 Json 排序

    在发送 AJAX 请求时 Json Content 的重新排序已经是一个已知问题 但我不知道在将 Json content 插入 JSON 类型的 Mysql 表时也会发生同样的情况 在这种情况下 mysql 服务器在保存之前也会对其内容进
  • sed 替换 json 对象中键的值

    我想做什么 给定一个 json 事件文件 我想通过关键字定位特定事件 然后将该事件中的键值替换为 这必须使用 sed 来完成 Splunk 转发问题 我不会用细节来烦你 事件示例 message we have a response fro
  • 检查给定字符串中是否存在回车符

    我正在从文件中读取一些行 并检查每行是否具有 Windows 类型的 CRLF 如果任何行中缺少 n 或 r 则必须报告错误 我尝试使用下面的代码 即使该行没有 r 它也不会报告任何错误 Open file open File Name r
  • Rails 可以自动解析从表单 text_field 收到的日期时间吗

    Rails 可以自动解析从表单的文本字段接收到的日期时间吗 in view div class field br div in controller params product updated at yesterday 目前我收到以下错误
  • Pandas HD5-查询,其中表达式失败

    我想查询 HDF5 文件 我愿意 df to hdf pfad df format table 将数据帧写入光盘 为了阅读我使用 hdf pandas HDFStore pfad 我有一个列表 其中包含numpy datetime64值称为
  • Ajax 调用后使用 Django 模板呈现 JSON 对象

    我一直在尝试了解什么是最佳方法Ajax http en wikipedia org wiki Ajax 28programming 29 in Django http en wikipedia org wiki Django 28web f
  • toJSON() 和 JSON.Stringify() 之间的区别

    如果您需要读取或克隆模型的所有数据属性 请使用其 toJSON 方法 此方法返回属性的副本作为 对象 尽管有其名称 但不是 JSON 字符串 当 JSON stringify 为 使用 toJSON 方法传递一个对象 它将返回的字符串化 t
  • 如何在 PySpark 中使用 foreach 或 foreachBatch 写入数据库?

    我想使用 Python PySpark 从 Kafka 源到 MariaDB 进行 Spark 结构化流处理 Spark 2 4 x 我想使用流式 Spark 数据帧 而不是静态数据帧或 Pandas 数据帧 看来必须要用foreach o
  • 为什么我无法解开根节点并反序列化对象数组?

    为什么我无法通过展开根节点来反序列化对象数组 import java io IOException import java util Arrays import java util List import org codehaus jack
  • 如何避免连续“重置偏移量”和“寻找最新偏移量”?

    我正在尝试遵循本指南 https spark apache org docs latest structed streaming kafka integration html https spark apache org docs late
  • 如何对 JSON 对象进行加密哈希?

    下面的问题比乍看起来更复杂 假设我有一个任意 JSON 对象 该对象可能包含任意数量的数据 包括其他嵌套的 JSON 对象 我想要的是 JSON 数据的加密哈希 摘要 而不考虑实际的 JSON 格式本身 例如 忽略换行符和 JSON 令牌之
  • 将spark.local.dir设置为不同的驱动器

    我正在尝试在 Windows 10 上设置独立 Spark 我想设置spark local dir to D spark tmp tmp 目前它似乎正在使用C Users
  • WCF 自定义序列化器

    我正在 WCF 中创建一个返回 JSON 的 Web 服务 但 DataContractJsonSerializer 对某些循环引用犹豫不决 在这种特殊情况下我无法删除这些引用 相反 我想使用 Newtonsoft json 库 在 WCF
  • Web API 复杂参数属性均为 null

    我有一个 Web API 服务调用可以更新用户的首选项 不幸的是 当我从 jQuery ajax 调用中调用此 POST 方法时 请求参数对象的属性始终为 null 或默认值 而不是传入的值 如果我使用 REST 客户端调用相同的方法 我使
  • 将Json字符串映射到java中的map或hashmap字段

    假设我从服务器返回了以下 JSON 字符串 response imageInstances one id 1 url ONE two id 2 url TWO 杰克逊代码大厦 JsonProperty 我怎样才能得到HashMap对象出来了

随机推荐