Structured Streaming报错记录:Overloaded method foreachBatch with alternatives

2023-05-16

Structured Streaming报错记录:Overloaded method foreachBatch with alternatives

【文章目录】

  • Structured Streaming报错记录:Overloaded method foreachBatch with alternatives
  • Structured Streaming报错记录:Overloaded method foreachBatch with alternatives
    • 0. 写在前面
    • 1. 报错
    • 2. 代码及报错信息
    • 3. 原因及纠错
    • 4. 参考链接


Structured Streaming报错记录:Overloaded method foreachBatch with alternatives


在这里插入图片描述


0. 写在前面

  • Spark : Spark3.0.0
  • Scala : Scala2.12

1. 报错

overloaded method value foreachBatch with alternatives:

2. 代码及报错信息

Error:(48, 12) overloaded method value foreachBatch with alternatives:

(function:org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row],java.lang.Long])org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]

(function: (org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], scala.Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]

cannot be applied to ((org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], Any) => org.apache.spark.sql.Dataset[org.apache.spark.sql.Row])

.foreachBatch((df, batchId) => {

import java.util.Properties
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, SparkSession}

object ForeachBatchSink1 {
    def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("ForeachSink1")
            .getOrCreate()
        import spark.implicits._
        
        val lines: DataFrame = spark.readStream
            .format("socket") // 设置数据源
            .option("host", "cluster01")
            .option("port", 10000)
            .load
        
        val props = new Properties()
        props.setProperty("user", "root")
        props.setProperty("password", "1234")
        
        val query: StreamingQuery = lines.writeStream
            .outputMode("update")
            .foreachBatch((df, batchId) => {
                val result = df.as[String].flatMap(_.split("\\W+")).groupBy("value").count()
               
                result.persist()
              result.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
                result.write.mode("overwrite").json("./foreach1")
                result.unpersist()
            })
//            .trigger(Trigger.ProcessingTime(0))
            .trigger(Trigger.Continuous(10))
            .start
        query.awaitTermination()
      
    }
}

/**

  • Error:(43, 12) overloaded method value foreachBatch with alternatives:
  • (function:org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row],java.lang.Long])org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]
  • (function: (org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], scala.Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]
  • cannot be applied to ((org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], Any) => org.apache.spark.sql.DataFrame)
  • .foreachBatch((df, batchId) => {
    */
import java.util.Properties
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, SparkSession}

object ForeachBatchSink {
    def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("ForeachSink")
            .getOrCreate()
        import spark.implicits._
        
        val lines: DataFrame = spark.readStream
            .format("socket") // 设置数据源
            .option("host", "cluster01")
            .option("port", 10000)
            .load
        
        val props = new Properties()
        props.setProperty("user", "root")
        props.setProperty("password", "1234")
        
        val query: StreamingQuery = lines.writeStream
            .outputMode("complete")
            .foreachBatch((df, batchId) => {          
                result.persist()
                result.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
                result.write.mode("overwrite").json("./foreach")
                result.unpersist()
            })
            .start
        query.awaitTermination()
      
    }
}

3. 原因及纠错

Scala2.12版本和2.11版本的不同,对于foreachBatch()方法的实现不太一样

正确代码如下

import java.util.Properties
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}


object ForeachBatchSink {

    def myFun(df: Dataset[Row], batchId: Long, props: Properties): Unit = {
        println("BatchId" + batchId)
        if (df.count() != 0) {
            df.persist()
            df.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
            df.write.mode("overwrite").json("./StructedStreaming_sink-ForeachBatchSink")
            df.unpersist()
        }
    }

    def main(args: Array[String]): Unit = {

        val spark: SparkSession = SparkSession
          .builder()
          .master("local[2]")
          .appName("ForeachBatchSink")
          .getOrCreate()
        import spark.implicits._

        val lines: DataFrame = spark.readStream
          .format("socket") // TODO 设置数据源
          .option("host", "cluster01")
          .option("port", 10000)
          .load

        val wordCount: DataFrame = lines.as[String]
          .flatMap(_.split("\\W+"))
          .groupBy("value")
          .count()  // value count

        val props = new Properties()
        props.setProperty("user", "root")
        props.setProperty("password", "1234")

        val query: StreamingQuery = wordCount.writeStream
          .outputMode("complete")
          .foreachBatch((df : Dataset[Row], batchId : Long) => {
              myFun(df, batchId, props)
          })
          .start

        query.awaitTermination()

    }
}

import java.util.Properties

import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object ForeachBatchSink1 {

    def myFun(df: Dataset[Row], batchId: Long, props: Properties, spark : SparkSession): Unit = {
        import spark.implicits._
        println("BatchId = " + batchId)
        if (df.count() != 0) {
            val result = df.as[String].flatMap(_.split("\\W+")).groupBy("value").count()
            result.persist()
            result.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
            result.write.mode("overwrite").json("./StructedStreaming_sink-ForeachBatchSink1")
            result.unpersist()
        }
    }

    def main(args: Array[String]): Unit = {

        val spark: SparkSession = SparkSession
          .builder()
          .master("local[2]")
          .appName("ForeachBatchSink1")
          .getOrCreate()
        import spark.implicits._

        val lines: DataFrame = spark.readStream
          .format("socket") // TODO 设置数据源
          .option("host", "cluster01")
          .option("port", 10000)
          .load

        val props = new Properties()
        props.setProperty("user", "root")
        props.setProperty("password", "1234")

        val query: StreamingQuery = lines.writeStream
          .outputMode("update")
          .foreachBatch((df : Dataset[Row], batchId : Long) => {
                myFun(df, batchId, props, spark)
          })
          .trigger(Trigger.Continuous(10))
          .start
        query.awaitTermination()

    }
}

4. 参考链接

https://blog.csdn.net/Shockang/article/details/120961968

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Structured Streaming报错记录:Overloaded method foreachBatch with alternatives 的相关文章

  • 使用 FFmpeg 从 Mac 进行网络摄像头流传输

    我想使用 FFmpeg 从 Mac 流式传输我的网络摄像头 首先我使用检查了支持的设备ffmpeg f avfoundation list devices true i Output AVFoundation input device 0x
  • 使用 Laravel 为 Android 提供 mp3 流

    这是我的问题 我正在编写一个 laravel 后端 它必须提供一个 mp3 文件 该文件必须使用 android 标准媒体播放器再现 对于 laravel 后端 我需要使用 JWT 来处理身份验证 因此在每个请求标头中 我必须将 授权 字段
  • 如何设置机器人的状态

    所以我试图让我的机器人流媒体与抑郁症 但我已经尝试了多种方法 但它们不起作用 我尝试过这些方法 client user setPresence game name with depression status online bot user
  • 如何在不先读取整个图像的情况下就地缩放流式位图?

    我有一个图像密集型的 Android 应用程序 我目前正在使用Bitmap createScaledBitmap http developer android com reference android graphics Bitmap ht
  • 使用 PHP 进行实时视频流传输

    我有一个 PHP AJAX MYSQL 聊天应用程序 我想将视频聊天添加到我的应用程序中 如何在 PHP 应用程序中创建用于实时视频会议 聊天的实时视频流 如果我想构建这样一个系统 我需要了解哪些关键术语 首先使用 PHP 是个好主意吗 有
  • 如何在网页上嵌入 mjpeg 文件

    我需要将 IP 摄像机的输出显示到网页 以便最终用户可以使用此页面从该摄像机查看实时内容 它有一个为捕获的视频提供 mjpeg 输出的界面 我需要将其嵌入到我的网页上 它至少应该可以在 Firefox Safari 和 IE 上运行 提前致
  • 了解 Python HTTP 流

    我正在努力使用 Python 和请求访问流 API API 内容 我们启用了一个流端点 以便利用持久的 HTTP 套接字连接来请求报价和交易数据 来自 API 的流数据包括发出经过身份验证的 HTTP 请求并保持 HTTP 套接字打开以持续
  • 使用 PHP 分块传输 FTP 上传?

    是否可以使用 PHP 进行 FTP 上传 我有文件需要上传到另一台服务器 但我只能通过 FTP 访问该服务器 不幸的是 我无法增加该服务器上的超时时间 有可能做到这一点吗 基本上 如果有一种方法可以写入文件的一部分 然后附加下一部分 并重复
  • 将 XFDF 与 PDF 表单合并以创建最终的 PDF 服务器端?

    这就是我目前所拥有的 用户提交表单数据并获得 下载 PDF 链接 该链接指向动态生成 XFDF 文件的脚本 并在设置适当的标头等后输出 XFDF 文件 XFDF 文件指向受密码保护的 PDF 这是使用 XFDF 数据填写字段的通用 PDF
  • 将 hadoop 流与 python 组合器一起使用时失败

    我尝试使用 python 的 hadoop 流来计算输入键的平均值 以下是mapper combiner和reducer的代码 mapper import sys def map argv line sys stdin readline t
  • flink kafka生产者在检查点恢复时以一次模式发送重复消息

    我正在写一个案例来测试 flink 两步提交 下面是概述 sink kafka曾经是kafka生产者 sink stepmysql接收器是否扩展two step commit sink comparemysql接收器是否扩展two step
  • Android 上的 RTSP 客户端实现

    我看到很多与此相关的问题 尽管如此 我认为我的答案还没有 我想在 Android 上使用已编码的 RTSP 客户端与 MediaCodec 一起使用 以便捕获 H264 中的 RTSP 流 然后解码并显示它 我使用了 VideoView 和
  • 如何在流式传输之前知道音频歌曲的持续时间?

    我正在制作一个流音频歌曲的应用程序 在自定义媒体播放器中 我必须显示该音频文件的总持续时间 如果一首音频歌曲是 SDCard 我可以使用以下方法知道它的持续时间 MediaPlayer player public double durati
  • python-twitter 流 api 支持/示例

    我正在与python twitter http code google com p python twitter 并意识到 Twitter 提供流媒体api http dev twitter com pages streaming api实
  • 如何通过 YouTube 直播 API 更改我的活动使用的流?

    所以我一直在寻找一种从 YouTube 获取 16 位流名称的方法 我终于通过这行代码找到了它 gt streamName returnedStream getCdn getIngestionInfo getStreamName 流名称只是
  • 为什么 Spark 在字数统计时速度很快? [复制]

    这个问题在这里已经有答案了 测试用例 Spark 在 20 秒以上对 6G 数据进行字数统计 我明白映射减少 FP and stream编程模型 但无法弄清楚字数统计的速度如此惊人 我认为这种情况下是I O密集型计算 不可能在20秒以上扫描
  • 在 Chrome 和 IE11 上流式传输可观看的 .mjpeg 视频

    我在本地托管了一个 mjpeg 文件http 127 0 0 1 web Images Stream somevideo mjpeg http 127 0 0 1 web Images Stream somevideo mjpeg 我在我的
  • 使用 Servlet 启动 VLC HTTP Stream 时出现问题

    我正在为自己开发一个 VLC 项目 我的目标是创建一个 HTML 前端来启动流 我通过使用 Java Servlet 来完成此操作 概述 乌班图13 04 Java 7 21 冰茶 2 3 9 Eclipse JAVAEE IDE 雄猫7
  • 计算流数据的直方图 - 在线直方图计算

    我正在寻找一种算法来生成大量流数据的直方图 最大值和最小值事先未知 但标准差和平均值在特定范围内 我很欣赏你的想法 Cheers 我刚刚找到了一个解决方案 秒 从流式并行决策树算法构建在线直方图 论文的 2 2 该算法由 Hive 项目中的
  • 数据库镜像/Postgres流复制

    我不是 DBA 我是基于企业数据库的应用程序的主要开发人员 我目前正在指定一些新机器来升级我们现有的企业数据库 目前 我们在 DR 站点上运行带有数据库的 Postgres 8 4 该数据库通过前员工执行的一些自定义 rsync 工作定期接

随机推荐