如何高效更新文件修改频繁的Impala表

2024-03-05

我们有一个基于 Hadoop 的解决方案 (CDH 5.15),我们可以在 HDFS 的某些目录中获取新文件。在这些目录的顶部,我们有 4-5 个 Impala (2.1) 表。在 HDFS 中写入这些文件的过程是 Spark Structured Streaming (2.3.1)

现在,一旦我们将文件写入 HDFS,我们就会运行一些 DDL 查询:

  • ALTER TABLE table1 RECOVER PARTITONS检测添加到表中的新分区(及其 HDFS 目录和文件)。

  • REFRESH table1 PARTITIONS (partition1=X, partition2=Y),使用每个分区的所有键。

目前,此 DDL 花费的时间有点太长,并且它们在我们的系统中排队,从而损害了系统的数据可用性。

所以,我的问题是:有没有一种方法可以更有效地进行数据合并?

我们考虑过:

  • 使用ALTER TABLE .. RECOVER PARTITONS但根据文档 https://docs.cloudera.com/documentation/enterprise/5-15-x/topics/impala_alter_table.html,它只刷新新分区。

  • 尝试使用REFRESH .. PARTITON ...一次具有多个分区,但语句语法不允许这样做。

  • 尝试对查询进行批处理,但 Hive JDBC 驱动器不支持批处理查询。

  • 鉴于系统已经很忙,我们是否应该尝试并行进行这些更新?

  • 您还知道其他什么方式吗?

Thanks!

Victor

注意:我们知道哪些分区需要刷新的方法是使用 HDFS 事件,就像 Spark 结构化流一样,我们不知道文件何时写入。

注意#2:此外,HDFS 中写入的文件有时很小,因此如果能够同时合并这些文件那就太好了。


由于似乎没有人能解决我的问题,我想分享我们为提高处理效率而采取的方法,非常欢迎提出意见。

我们发现(文档对此不是很清楚)HDFS 中的 Spark“检查点”中存储的一些信息是许多元数据文件,描述每个 Parquet 文件的写入时间及其大小:

$hdfs dfs -ls -h hdfs://...../my_spark_job/_spark_metadata

w-r--r--   3 hdfs 68K   2020-02-26 20:49 hdfs://...../my_spark_job/_spark_metadata/3248
rw-r--r--  3 hdfs 33.3M 2020-02-26 20:53 hdfs://...../my_spark_job/_spark_metadata/3249.compact
w-r--r--   3 hdfs 68K   2020-02-26 20:54 hdfs://...../my_spark_job/_spark_metadata/3250
...

$hdfs dfs -cat hdfs://...../my_spark_job/_spark_metadata/3250
v1
{"path":"hdfs://.../my_spark_job/../part-00004.c000.snappy.parquet","size":9866555,"isDir":false,"modificationTime":1582750862638,"blockReplication":3,"blockSize":134217728,"action":"add"}
{"path":"hdfs://.../my_spark_job/../part-00004.c001.snappy.parquet","size":526513,"isDir":false,"modificationTime":1582750862834,"blockReplication":3,"blockSize":134217728,"action":"add"}
...

所以,我们所做的是:

  • Build a Spark Streaming Job polling that _spark_metadata folder.
    • 我们使用一个fileStream因为它允许我们定义要使用的文件过滤器。
    • 该流中的每个条目都是这些 JSON 行之一,解析该行以提取文件路径和大小。
  • 按文件所属的父文件夹(映射到每个 Impala 分区)对文件进行分组。
  • For each folder:
    • 读取数据帧加载only目标 Parquet 文件(以避免与写入文件的其他作业发生竞争情况)
    • 计算要写入的块数(使用 JSON 中的大小字段和目标块大小)
    • 将数据帧合并到所需数量的分区并将其写回 HDFS
    • 执行DDLREFRESH TABLE myTable PARTITION ([partition keys derived from the new folder]
  • 最后删除源文件

我们取得的成果是:

  • 通过对每个分区和批次进行一次刷新来限制 DDL。

  • 通过可配置批处理时间和块大小,我们能够使我们的产品适应具有更大或更小数据集的不同部署场景。

  • 该解决方案非常灵活,因为我们可以为 Spark Streaming 作业分配更多或更少的资源(执行程序、核心、内存等),并且我们还可以启动/停止它(使用其自己的检查点系统)。

  • 我们还在研究在执行此过程时应用一些数据重新分区的可能性,以使分区尽可能接近最佳大小。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何高效更新文件修改频繁的Impala表 的相关文章

随机推荐

  • Spring Boot JSF 集成

    环境 雄猫8 春季启动 1 5 JSF 2 2 阿帕奇 MyFaces 春季MVC Code 我正在 Servlet 3 0 环境中集成 Spring Boot 和 JSF 2 2 配置类 JSFConfig java JSF 的配置 Co
  • 无法通用导入Python包

    假设我有以下目录结构 workspace init py ys manage init py manage py ys utils init py project dicts py 现在 假设我需要访问project dicts py in
  • 如何合并多个数组而不减慢编译器速度?

    添加这行代码会使我的编译时间从 10 秒缩短到 3 分钟 var resultsArray hashTagParticipantCodes prefixParticipantCodes asterixParticipantCodes att
  • C# 中引用类型和值类型有什么区别?

    几个月前有人问我这个问题 我无法详细解释 C 中引用类型和值类型有什么区别 我知道值类型是int bool float等 参考类型是delegate interface等等 或者这也是错误的吗 你能用专业的方式给我解释一下吗 你的例子有点奇
  • Windows 8 ARM(A.K.A.)吗? “Windows RT”有可供第三方开发人员使用的 Winapi (win32) 吗?

    Windows 8 for ARM 也称为 Windows RT 它是否具有与 Win32 API 等效的功能 I don t意味着它是否可以运行 Win32 x86 代码 但如果它具有 Win32 API可用的给第三方开发商 是的 ARM
  • Rake 任务只调用一次就执行两次

    我编写了一个非常简单的 rake 任务来尝试找到这个问题的根源 namespace foo do task bar environment do puts RUNNING end end 在控制台执行时rake foo bar输出是 RUN
  • SSIS 格式化货币输出

    在我的输出 CSV 文件中 有 4 列数据类型为货币 我的输出是平面文件 CSV 文件 我希望输出中的每一列都采用以下格式 这并没有发生 输出 CSV 文件中出现实际包含美分的列50 79 no 以以下结尾的列0正在压制0 40 8 零列看
  • Rest API 与客户端分离的多租户数据库

    我有一个带有组合键的多租户数据库 clientId docId 路由看起来像这样 api controller clientId docId 对于身份验证 我使用 全局 用户名 例如电子邮件 密码 通过 https 在每个请求的 http
  • Spring Data动态查询

    我正在尝试使用 spring 数据设置一个动态查询 基本上我有一个具有一堆特征的数组 我需要根据这些特征组装查询 几乎类似于 WHEREcharacteristic A ANDcharacteristic B AND特征 C 但特征的数量可
  • 单击 FusionTablesLayer 多边形时的事件

    每次客户端点击多边形时 我都需要在我的 JavaScript 中知道 我还需要知道它对应于我的融合表中的哪一行 有这样做的活动吗 有这样做的活动吗 FusionTables鼠标事件 https developers google com m
  • 将 functools 与 IronPython 结合使用

    我正在使用带有 IronPython 的 functools 库 它在开发机器上运行良好 但在生产中无法导入库 抛出异常 IronPython Runtime Exceptions ImportException 没有命名的模块 funct
  • 如何重命名 MongoDB 数据库?

    我的 MongoDB 数据库名称有一个拼写错误 我正在尝试重命名该数据库 I can copy http www mongodb org display DOCS Copy Database Commands然后像这样删除 db copyD
  • OpenGL 3.0 中 glDrawPixels 的替代品?

    所以我知道 glDrawPixels 已被弃用 有没有做同样事情的函数 我想过使用纹理 但它们是由当前矩阵修改的 与 glDrawPixels 绘制的像素不同 我想过使用纹理 但它们被当前矩阵修改了 当前矩阵 在 3 0 中已弃用 并在 3
  • 在 HTML 5 画布上创建 Reuleaux 多边形的函数

    我正在开展一个使用 HTML 5 画布设计系统艺术作品的项目 为了给我的作品带来更有机和多样化的感觉 我想要一个创建 reuleaux 多边形的函数 我认为可能有一种方法可以改变我的draw sharp polygon center pos
  • UITableViewCell 中的 UILabel 大小不正确(分配文本后的动画)

    文本在一个UILabel显示后闪烁 首先以省略号出现在单行上 然后占据其适合的 2 行 请注意 单元格高度没有改变 问题是这样的 标签 朋友们 快来参与并填满盒子吧 首先出现被截断的并在视图转换期间错位为 这种情况仅发生在装有 iOS 8
  • 为什么我应该更喜欢使用成员初始值设定项列表?

    我偏向于在构造函数中使用成员初始值设定项列表 但我早已忘记了其背后的原因 您是否在构造函数中使用成员初始值设定项列表 如果是这样 为什么 如果没有 为什么不呢 For POD https stackoverflow com a 146454
  • SpyOn TypeORM 存储库可更改单元测试 NestJS 的返回值

    我想对我的 TypeORM 数据库调用进行单元测试 我已经用有效数据模拟了我所有的 TypeORM 存储库 但我想监视存储库并更改 TypeORM 的返回值形式 我怎么做 import INestApplication from nestj
  • PowerShell - 如何在运行空间中导入模块

    我正在尝试用 C 创建一个 cmdlet 代码看起来像这样 Cmdlet VerbsCommon Get HeapSummary public class Get HeapSummary Cmdlet protected override
  • 更改 WooCommerce 电子邮件通知中的订单项目元数据

    我需要更改 自定义 WooCommerce 电子邮件通知的特定订单项元数据 但我找不到解决方案 I found one https stackoverflow com a 52684694 1354580 但它用于从 Woocommerce
  • 如何高效更新文件修改频繁的Impala表

    我们有一个基于 Hadoop 的解决方案 CDH 5 15 我们可以在 HDFS 的某些目录中获取新文件 在这些目录的顶部 我们有 4 5 个 Impala 2 1 表 在 HDFS 中写入这些文件的过程是 Spark Structured