保留 Spark Streaming 输出

2024-01-13

我正在从消息传递应用程序收集数据,我目前正在使用 Flume,它每天发送大约 5000 万条记录

我想用卡夫卡, 使用 Spark Streaming 从 Kafka 消费 并将其保存到 hadoop 并使用 impala 进行查询

我尝试过的每种方法都遇到问题..

方法 1 - 将 RDD 保存为 parquet,将外部 hive parquet 表指向 parquet 目录

// scala
val ssc =  new StreamingContext(sparkConf, Seconds(bucketsize.toInt))
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
lines.foreachRDD(rdd => {

    // 1 - Create a SchemaRDD object from the rdd and specify the schema
    val SchemaRDD1 = sqlContext.jsonRDD(rdd, schema)

    // 2 - register it as a spark sql table
    SchemaRDD1.registerTempTable("sparktable")

    // 3 - qry sparktable to produce another SchemaRDD object of the data needed 'finalParquet'. and persist this as parquet files
    val finalParquet = sqlContext.sql(sql)
    finalParquet.saveAsParquetFile(dir)

问题在于finalParquet。另存为Parquet文件输出大量文件,从 Kafka 接收的 Dstream 在 1 分钟批量大小下输出超过 200 个文件。 它输出许多文件的原因是因为计算是分布式的,如另一篇文章中所解释的那样如何使 saveAsTextFile 不将输出拆分为多个文件? https://stackoverflow.com/questions/24371259/how-to-make-saveastextfile-not-split-output-into-multiple-file/24378808#24378808

然而,所提出的解决方案对我来说似乎并不是最佳的,例如正如一位用户所说 - 如果您的数据很少,那么只有一个输出文件才是一个好主意。

方法 2 - 使用HiveContext。直接将RDD数据插入到Hive表中

# python
sqlContext = HiveContext(sc)
ssc = StreamingContext(sc, int(batch_interval))
kvs = KafkaUtils.createStream(ssc, zkQuorum, group, {topics: 1})
lines = kvs.map(lambda x: x[1]).persist(StorageLevel.MEMORY_AND_DISK_SER)
lines.foreachRDD(sendRecord)

def sendRecord(rdd):

  sql = "INSERT INTO TABLE table select * from beacon_sparktable"

  # 1 - Apply the schema to the RDD creating a data frame 'beaconDF'
  beaconDF = sqlContext.jsonRDD(rdd,schema)

  # 2- Register the DataFrame as a spark sql table.
  beaconDF.registerTempTable("beacon_sparktable")

  # 3 - insert to hive directly from a qry on the spark sql table
  sqlContext.sql(sql);

这工作正常,它直接插入到镶木地板表中,但由于处理时间超过批处理间隔时间,因此批处理会出现调度延迟。 消费者无法跟上正在生产的产品,并且要处理的批次开始排队。

看来写入 hive 很慢。我尝试调整批处理间隔大小,运行更多消费者实例。

总之

考虑到存在多个文件的问题以及写入 hive 的潜在延迟,保存 Spark Streaming 中的大数据的最佳方法是什么? 其他人在做什么?

这里已经提出了类似的问题,但他对目录有一个问题,而不是太多文件如何让 Spark Streaming 写入其输出以便 Impala 可以读取它? https://stackoverflow.com/questions/24204656/how-to-make-spark-streaming-write-its-output-so-that-impala-can-read-it

非常感谢您的帮助


在解决方案#2中,创建的文件数量可以通过每个RDD的分区数量来控制。

看这个例子:

// create a Hive table (assume it's already existing)
sqlContext.sql("CREATE TABLE test (id int, txt string) STORED AS PARQUET")

// create a RDD with 2 records and only 1 partition
val rdd = sc.parallelize(List( List(1, "hello"), List(2, "world") ), 1)

// create a DataFrame from the RDD
val schema = StructType(Seq(
 StructField("id", IntegerType, nullable = false),
 StructField("txt", StringType, nullable = false)
))
val df = sqlContext.createDataFrame(rdd.map( Row(_:_*) ), schema)

// this creates a single file, because the RDD has 1 partition
df.write.mode("append").saveAsTable("test")

现在,我想您可以考虑从 Kafka 提取数据的频率,以及每个 RDD 的分区数量(默认情况下,您的 Kafka 主题的分区,您可以通过重新分区来减少)。

我正在使用 CDH 5.5.1 中的 Spark 1.5,并且使用以下任一方法都得到相同的结果df.write.mode("append").saveAsTable("test")或者你的 SQL 字符串。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

保留 Spark Streaming 输出 的相关文章

随机推荐

  • 嵌入存档中的动态链接器依赖信息

    动态库很好 其中嵌入了信息 帮助运行时链接器确定最终可执行文件需要加载哪些其他库 它还告诉可执行文件将加载哪些符号 然而 静态库却是一个令人头疼的问题 链接器不会自动链接存档的依赖项 当静态链接一个中等复杂或具有深度依赖图的库时 这会变得令
  • 在选择不同行时按一个字段中的最小值进行分组

    这就是我正在尝试做的事情 假设我有这张表 key id id record date other cols 1 18 2011 04 03 x 2 18 2012 05 19 y 3 18 2012 08 09 z 4 19 2009 06
  • 如何让 Chrome 扩展为每个添加的新 Iframe 运行?

    我创建了一个 Chrome 扩展程序作为覆盖 SalesForce 控制台页面中的 helpText 气泡的解决方案 helpText 气泡显示文本 但无法链接 URL 它看起来像这样 该扩展程序采用 helpText 气泡 在 Sales
  • 如何使用 R 中 e1071 包的“svm”执行多类分类

    我想使用执行多类分类svm的函数e1071包裹 但据我从文档中了解到svm 只能进行二元分类 插图文档讲述了多类分类的情况 为了允许多类分类 libsvm通过拟合所有二元子分类器并通过投票机制找到正确的类 使用一对一技术 我仍然不明白的是我
  • 对抗锯齿文本进行 OCR

    我必须从 PDF 文档中 OCR 表格 我编写了简单的 Python opencv 脚本来获取单个单元格 之后新的问题又出现了 文本已抗锯齿且质量不佳 tesseract 的识别率很低 我尝试过使用自适应阈值来预处理图像 但结果并没有好多少
  • 比较 Excel 中的两列并排除

    I want to compare values in two columns in Excel as depicted in the image below 使用该公式 我想将值放入 B 中不存在的 A 的值 和 A 中不存在的 B 的值
  • 在 ConfigParser 解析的请求模块上使用 URL 时,InvalidSchema("未找到 {!r}".format(url)) 的 URL

    我在配置文件中有一个 URL 我使用 ConfigParser 解析该 URL 以获取请求 配置文件 default root url https reqres in api users page 2 FetchFeeds py impor
  • 如何在 Windows shell 中向文件类型添加辅助动词?

    Windows shell 编程的基本思想是 您可以将给定的文件类型 扩展名 与 MS 当前调用的 progid 例如 Company Type Ver 相关联 HKCR txt Acme Text 1 HKCR Acme Text 1 这
  • Javascript 中触摸屏事件的文档

    在哪里可以找到 Javascript 中触摸屏事件的文档或参考 例如 触摸开始 我发现这个有用的链接http ross posterous com 2008 08 19 iphone touch events in javascript h
  • PHP中将单个数字分成一组唯一的随机数

    我想从一个预先确定的单个数字开始 然后有多个随机数字 当它们相加时 它们的总数就是我开始的数字 例如 我有 100 个 但想要 10 个随机数 将它们加在一起时等于 100 以我有限的知识 我写下了这样的内容
  • Elixir 变量真的是不可变的吗?

    在 Dave Thomas 的 Programming Elixir 一书中 他指出 Elixir 强制执行不可变数据 并接着说道 在 Elixir 中 一旦变量引用了诸如 1 2 3 之类的列表 您就知道它将始终引用相同的值 直到您重新绑
  • 如何从字符串转换为 XElement 对象

    我有一个像这样的字符串
  • STL Vector默认使用“new”和“delete”进行内存分配吗?

    我正在为应用程序开发一个插件 其中内存应该由应用程序分配并跟踪它 因此 内存句柄应该以缓冲区的形式从主机应用程序获取 然后将它们返回给应用程序 现在 我计划使用 STL Vectors 我想知道它内部使用什么样的内存分配 它在内部使用 新建
  • jquery悬停事件无法正常工作

    我有一个简单的水平菜单 当我将鼠标悬停在每个项目上时 子菜单会向下滑动 所以基本上它是我们经常看到的典型导航菜单 我希望当鼠标悬停时子菜单会在鼠标移出时向下和向上滑动 我的问题是 如果我在项目中快速移动鼠标 就会有多个子菜单保持可见 我猜这
  • RSS 是否跟踪保留或提交的内存?

    我正在 java 8 上使用不同的 jvm 选项进行实验 以降低 RSS 用于 Rss 跟踪的脚本 ps o rss o vsz o pid pid 用于设置 java 进程的 JVM 参数 XX PrintNMTStatistics XX
  • Laravel 5 中用于管理或身份验证的 Laravel 中间件

    我是 Laravel 的新手 不了解 Laravel 限制机制 我读过有关中间件的内容 但很困惑如何使用它 为什么使用它以及它如何工作 所以请指导我如何实现它以达到限制目的 即对于 auth sa 用户路由 确保您在数据库用户表中有角色列或
  • Sitecore 中子布局的多变量测试

    我过去曾尝试过这个概念 现在对在我公司的 Sitecore 网站上使用多变量测试感兴趣 我认为在很多地方我们绝对可以通过使用 A B 测试来提高销量 运行两个完全不同的模板 看看哪种布局更适合用户 在网站上运行许多不同的子布局 表单 以查看
  • PHP 中的重音符号 (`)(不是单引号)代表什么?

    在下面的示例中 第二行中的重音符号是什么意思 cmd ffmpeg i video deinterlace an ss second t 00 00 01 r 1 y vcodec mjpeg f mjpeg image 2 gt 1 re
  • 将版本放入我的 java 应用程序 - Netbeans

    有什么方法可以在 netbeans 中为我的应用程序提供版本号 然后在我的代码中访问该版本号 类似于我们在 Net 中使用的程序集号 在 java 或 netbeans 中是否有类似的东西 定义一个Implementation Versio
  • 保留 Spark Streaming 输出

    我正在从消息传递应用程序收集数据 我目前正在使用 Flume 它每天发送大约 5000 万条记录 我想用卡夫卡 使用 Spark Streaming 从 Kafka 消费 并将其保存到 hadoop 并使用 impala 进行查询 我尝试过