如何在spark结构化流连接中选择最新记录

2023-12-12

我使用的是spark-sql 2.4.x版本,datastax-spark-cassandra-connector用于Cassandra-3.x版本。和卡夫卡一起。

我有货币样本的汇率元数据如下:

val ratesMetaDataDf = Seq(
     ("EUR","5/10/2019","1.130657","USD"),
     ("EUR","5/9/2019","1.13088","USD")
     ).toDF("base_code", "rate_date","rate_value","target_code")
.withColumn("rate_date", to_date($"rate_date" ,"MM/dd/yyyy").cast(DateType))
.withColumn("rate_value", $"rate_value".cast(DoubleType))

我从kafka主题收到的销售记录是,如下(示例) :

val kafkaDf = Seq((15,2016, 4, 100.5,"USD","2021-01-20","EUR",221.4)
                                ).toDF("companyId", "year","quarter","sales","code","calc_date","c_code","prev_sales")

要计算“prev_sales”,我需要获取其“c_code”各自的“rate_value”,它最接近“calc_date”,即rate_date”

我正在做如下

val w2 = Window.orderBy(col("rate_date") desc)
val rateJoinResultDf = kafkaDf.as("k").join(ratesMetaDataDf.as("e"))
                                   .where( ($"k.c_code" === $"e.base_code") &&
                                           ($"rate_date" < $"calc_date")
                                         ).orderBy($"rate_date" desc)
                                  .withColumn("row",row_number.over(w2))
                                  .where($"row" === 1).drop("row")
                                  .withColumn("prev_sales", (col("prev_sales") * col("rate_value")).cast(DoubleType))
                                  .select("companyId", "year","quarter","sales","code","calc_date","prev_sales")

在上面,为了获取给定“rate_date”的最近记录(即来自 ratesMetaDataDf 的“5/10/2019”),我使用 window 和 row_number 函数并按“desc”对记录进行排序。

但在spark-sql流中它导致了如下错误

"
Sorting is not supported on streaming DataFrames/Datasets, unless it is on aggregated DataFrame/Dataset in Complete output mode;;"

那么如何获取第一条记录加入上面呢。


将最后一个代码部分替换为以下代码。这段代码会做left join并计算日期差calc_date & rate_date. Next Window函数我们将选择最近的日期并计算prev_sales通过使用相同的计算。

请注意我添加了一个过滤条件filter(col("diff") >=0), 这将处理以下情况calc_date < rate_date。我加了几个 更多记录,以便更好地了解此案。

scala> ratesMetaDataDf.show
+---------+----------+----------+-----------+
|base_code| rate_date|rate_value|target_code|
+---------+----------+----------+-----------+
|      EUR|2019-05-10|  1.130657|        USD|
|      EUR|2019-05-09|   1.12088|        USD|
|      EUR|2019-12-20|    1.1584|        USD|
+---------+----------+----------+-----------+


scala> kafkaDf.show
+---------+----+-------+-----+----+----------+------+----------+
|companyId|year|quarter|sales|code| calc_date|c_code|prev_sales|
+---------+----+-------+-----+----+----------+------+----------+
|       15|2016|      4|100.5| USD|2021-01-20|   EUR|     221.4|
|       15|2016|      4|100.5| USD|2019-06-20|   EUR|     221.4|
+---------+----+-------+-----+----+----------+------+----------+


scala>  val W = Window.partitionBy("companyId","year","quarter","sales","code","calc_date","c_code","prev_sales").orderBy(col("diff"))

scala>   val rateJoinResultDf= kafkaDf.alias("k").join(ratesMetaDataDf.alias("r"), col("k.c_code") === col("r.base_code"), "left")
                                         .withColumn("diff",datediff(col("calc_date"), col("rate_date")))
                                         .filter(col("diff") >= 0)
                                         .withColumn("closedate", row_number.over(W))
                                         .filter(col("closedate") === 1)
                                         .drop("diff", "closedate")
                                         .withColumn("prev_sales", (col("prev_sales") * col("rate_value")).cast("Decimal(14,5)"))
                                         .select("companyId", "year","quarter","sales","code","calc_date","prev_sales")

scala> rateJoinResultDf.show
+---------+----+-------+-----+----+----------+----------+
|companyId|year|quarter|sales|code| calc_date|prev_sales|
+---------+----+-------+-----+----+----------+----------+
|       15|2016|      4|100.5| USD|2021-01-20| 256.46976|
|       15|2016|      4|100.5| USD|2019-06-20| 250.32746|
+---------+----+-------+-----+----+----------+----------+ 
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在spark结构化流连接中选择最新记录 的相关文章

随机推荐

  • PHP Http请求

    我需要从 PHP 执行 HTTP GET 更具体地说 从 index php 中 我需要获取 trac 和 svn 的内容 找到 ul 元素 然后渲染 然后内联到 index php 上 trac 和 svn 是相对 URL 而不是文件系统
  • Sendmail插入!电子邮件中间 990 个字符后的符号

    我使用 sendmail v 从 sendmail 发送邮件 电子邮件受保护 https infoeu mytotalconnectcomfort com 我不明白为什么sendmail会插入额外的字符 p Dear Mr Ms scure
  • 如何获取在画布中构成圆的坐标数组?

    因此 如果这是我用来在画布上画圆的代码 ctx beginPath ctx arc centerX centerY radius 0 2 Math PI false ctx lineWidth 3 ctx strokeStyle black
  • 带有特殊字符的Sql查询-如何处理?

    我有几个雇员的名字 比如 john 1 魔鬼的 corn 像这样的东西 现在 当我搜索这些名称时 我正在使用 select from emp where empname like john 1 devil s corn 但我没有得到预期值
  • jQuery AJAX 将 url 作为字符串传递

    我有一个 ajax 函数 它将一串变量传递给我的脚本 但我有一个变量需要包含带有参数的完整 url 发生的情况是 var1 和 var2 成为 POST 变量 但我需要将整个 url 变量保存为字符串 var url http domain
  • 在 Python 3.7+ 中更改 dict 中的键顺序

    Since dict对象在 Python 3 7 中具有本机键顺序 https docs python org 3 whatsnew 3 7 html 应该有一种方法来管理订单 有官方文档可供我阅读吗 在我的具体情况下 我想解决这些问题无需
  • 多列上的数据透视

    我有这样的数据 Product Group Product Level Quatity Sold Trend Group 1 L1 10 up Group 1 L2 20 up Group 1 L3 30 down Group 2 L1 2
  • .Net 2.0 上的 C# 3.0 兼容性

    什么是C 3 0我们可以在面向 Net 2 0 框架的应用程序中使用哪些语言功能 PS 我知道很少有像 Lambda 表达式这样的var keyword 我有一个关于这个主题的文章 简单来说 支持的 自动实现的属性 隐式类型局部变量和数组
  • 泛型类中不明确/冲突的构造函数

    我有一个通用类 public class BaseFieldValue
  • 如何在 Access 2010 中保存 RichTextBox 内容

    我正在创建一个字典系统 其中单词含义可以编辑RichTextBox 供用户定义字体大小 颜色等 那么我怎样才能保存意义RichTextBoxAccess 数据库中的内容是否为 RTF 格式 我怎样才能读到这个 Access 2007 201
  • 如何判断鼠标在 .click() 期间是否移动?

    根据 jQuery 文档 click 只有在这一系列事件之后才会触发事件 当指针位于元素内时按下鼠标按钮 当指针位于元素内时释放鼠标按钮 我面临的问题是 我在项目网格上使用 单击拖动 功能 我想为每个项目注册单击事件 这意味着每次我单击拖动
  • MpChart 在条形图的 X 轴上绘制图标作为标签

    Hi I would like to draw icons in xaxis of the bar chart instead of values Like the chart below 您必须创建自己的自定义渲染器并将其应用到您的图表
  • ViewModel 中的依赖属性有什么例子吗?

    有人可以给出 WPF 中 ViewModel 中的依赖属性作为数据上下文传递给视图的示例吗 这需要从 DependencyObject 继承吗 假设我希望 ListBox SelectedItem 绑定到 ViewModel 中的依赖属性
  • jquery 焦点回到相同的输入字段,错误不适用于所有浏览器

    我有一个包含多个字段的表单 其中既有动态创建的字段又有一些预定义的字段 其中一个字段使用 jquery timepicker 插件 由http jonthornton github io jquery timepicker 现在我的问题是我
  • 在 Swagger 中上传文件并在 Flask 后端接收

    我正在尝试使用 Swagger 和 Flask 上传文件 我对招摇有以下配置 user register post tags user summary Register a new user description operationId
  • “_attribute_((aligned(4)));”的含义是什么在第一行?

    char buf BUF LEN attribute aligned 4 ssize t len i 0 read BUF LEN bytes worth of events len read fd buf BUF LEN loop ove
  • chrome扩展弹出窗口无法通过ID找到元素

    我知道类似的问题已经被问过很多次了 但我还没有找到适合我的解决方案 我的问题很简单 我想做的就是测试 popup html 上的操作 因为在这里 我在弹出窗口上有一个单击按钮 当我单击它时 我想显示警报 但什么也没发生 它没有找到该元素 我
  • Flutter添加ScrollView和背景图片

    您好 我正在尝试将 ScollView 添加到我的应用程序中 但问题是我不能同时拥有 ScrollView 和背景图像 所以如果有人可以帮助我 这里我放置了背景图像 那么我现在如何放置滚动视图 我有两个工作 但不是同时工作 我使用的滚动视图
  • 启动多个线程并重新启动它们

    我正在尝试编写一个系统 在其中创建 x 个工作线程 这些线程将在不同的时间完成它们的工作 当它们中的任何一个完成工作时 我将检查它们的输出并再次重新启动它们 将运行的线程数保持在 x 左右 我将进行多次任意迭代 因此 基本上控制器线程将启动
  • 如何在spark结构化流连接中选择最新记录

    我使用的是spark sql 2 4 x版本 datastax spark cassandra connector用于Cassandra 3 x版本 和卡夫卡一起 我有货币样本的汇率元数据如下 val ratesMetaDataDf Seq