是否可以将 Riak CS 与 Apache Flink 一起使用?

2024-01-03

我要配置filesystem状态后端和zookeeper恢复模式:

state.backend: filesystem
state.backend.fs.checkpointdir: ???

recovery.mode: zookeeper
recovery.zookeeper.storageDir: ???

正如你所看到的,我应该指定checkpointdir and storageDir参数,但我没有 Apache Flink 支持的任何文件系统(例如 HDFS 或 Amazon S3)。但我已经安装了Riak CS集群(好像是这样兼容S3 http://docs.basho.com/riakcs/latest/).

那么,我可以将 Riak CS 与 Apache Flink 一起使用吗?如果可能:如何配置 Apache Flink 与 Riak CS 配合使用?


答:如何加入 Apache Flink 和 Riak CS?

Riak CS 具有 S3(版本 2)兼容接口。因此,可以使用 Hadoop 的 S3 文件系统适配器与 Riak CS 配合使用。

我不知道为什么,但 Apache Flink 在 fat jar 中只有部分 Hadoop 文件系统适配器(lib/flink-dist_2.11-1.0.1.jar)即它有 FTP 文件系统(org.apache.hadoop.fs.ftp.FTPFileSystem)但没有 S3 文件系统(即org.apache.hadoop.fs.s3a.S3AFileSystem)。所以,你有两种方法来解决这个问题:

  • 使用 Hadoop 安装中的这些适配器。我没有尝试这个,但似乎你应该只配置 HADOOP_CLASSPATH 或 HADOOP_HOME evn 变量。
  • Monky 修补 Apache Flink 并下载所需的 JAR<flink home>/lib目录

因此,我选择第二种方式,因为不想在我的环境中配置 Hadoop。您可以从 Hadoop dist 或互联网复制 JAR:

curl http://central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.2/hadoop-aws-2.7.2.jar -o /flink/lib/hadoop-aws-2.7.2.jar
curl http://central.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar -o /flink/lib/aws-java-sdk-1.7.4.jar
curl http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.2.5/httpcore-4.2.5.jar -o /flink/lib/httpcore-4.2.5.jar
curl http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.2.5/httpclient-4.2.5.jar -o /flink/lib/httpclient-4.2.5.jar

正如你所看到的,我使用的是旧版本,因为这个版本在 Hadoop 2.7.2 中使用,并且我使用与这个版本的 Hadoop 兼容的 Flink。

仅供参考:如果您在自己的流程中使用这些 JAR 的最新版本,此类 hack 可能会导致问题。为了避免与不同版本相关的问题,您可以在使用流构建 fat jar 时重新定位包,使用类似的东西(我正在使用 Gradle):

// Relocate org.apache.http packages because Apache Flink include old version of this library (we place them for using S3 compatible FS)
shadowJar {
    dependencies {
        include(dependency('.*:.*:.*'))
    }

    relocate 'org.apache.http', 'relocated.org.apache.http'
    relocate 'org.apache.commons', 'relocated.org.apache.commons'
}

然后你应该指定路径core-site.xml in flink-conf.yaml因为 Hadoop 兼容文件系统使用此配置来加载设置:

...
fs.hdfs.hadoopconf: /flink/conf
...

正如你所看到的,我只是把它放在<fink home>/conf目录。它有以下设置:

<?xml version="1.0" encoding="UTF-8" ?>
<configuration>
    <property>
        <name>fs.s3a.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> // because S3A better then other: https://wiki.apache.org/hadoop/AmazonS3
    </property>
    <property>
        <name>fs.s3a.endpoint</name>
        <value>my-riak-cs.stage.local</value>  // this is my Riak CS host
    </property>
    <property>
        <name>fs.s3a.connection.ssl.enabled</name> // my Riak CS in staging doesn't support SSL
        <value>false</value>
    </property>
    <property>
        <name>fs.s3a.access.key</name>
        <value>????</value> // this is my access key for Riak CS
    </property>
    <property>
        <name>fs.s3a.secret.key</name>
        <value>????</value> // this is my secret key for Riak CS
    </property>
</configuration>

然后你应该配置 Riak CS 存储桶flink-conf.yaml作为推荐人here https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/jobmanager_high_availability.html#standalone-cluster-high-availability:

...
state.backend.fs.checkpointdir: s3a://example-staging-flink/checkpoints
...
recovery.zookeeper.storageDir: s3a://example-staging-flink/recovery
...

并在 Riak CS 中创建存储桶。我在用s3cmd(安装在brew在我的 OS X 开发环境中):

s3cmd mb s3://example-staging-flink

仅供参考:使用前s3cmd你应该配置它使用s3cmd --configure然后修复一些设置~/.s3cmd file:

signature_v2 = True // because Riak CS using S3 V2 interface
use_https = False // if your don't use SSL
access_key = ???
secret_key = ???
host_base = my-riak-cs.stage.local // your Riak CS host
host_bucket = %(bucket).my-riak-cs.stage.local // format of bucket used by Riak CS

因此,这就是您在 Riak CS 中为独立 HA Apache Flink 集群的保存/恢复状态应该配置的所有内容。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

是否可以将 Riak CS 与 Apache Flink 一起使用? 的相关文章

随机推荐

  • 梅文。 “无家可归”的罐子该怎么办?

    我有一些 proprietary jar 需要包含在我的项目中 但我不想将其安装到本地存储库 我最初所做的是将 jar 放入我的项目的版本控制中lib 文件夹 然后将 Maven 依赖项指定为
  • HTML5 画布圆形文本

    如何使用画布创建圆形文本 圆形文本 字母现在应该正确定向 CanvasRenderingContext2D prototype fillTextCircle function text x y radius startRotation va
  • 使用正则表达式进行 Github 搜索

    有没有办法使用正则表达式在 github 存储库中搜索代码 目前 我克隆了存储库并进行搜索 但我想输入类似的内容 s foo gi 并查找代码中所有出现 foo 的地方 foo create foo extend fooBar barFoo
  • 从 SurfaceView 获取图像到 ImageView?

    我在从用作相机预览的 SurfaceView 获取图像 可绘制对象或位图时遇到了一些麻烦 final CameraSurfaceView cameraSurfaceView new CameraSurfaceView this Linear
  • 使用边框创建三角形

    我最近需要创建对话气泡 为了在对话气泡的末端创建小三角形尖端 我使用了CSS技术 http jsfiddle net 66jAA 5 其中元素被赋予0 width and 0 height并给定边界 使某些边框透明会产生对角线 这非常有效
  • 如何在 React 的子功能组件中触发一个动作?

    对于基本的表单 输入布局 很明显应该使用回调来处理从子组件到父组件的状态更改 由子组件发起 但是父组件如何要求子组件重新评估其状态并将其传达回父组件 这里的最终目标只是在提交表单按钮时触发子输入的验证 给定的 ts 代码如下所示 const
  • Go 声明中的“_,”(下划线逗号)是什么?

    我似乎无法理解这种变量声明 prs m example 究竟是什么 他们为什么声明这样的变量而不是 prs m example 我发现它是举例 地图 https gobyexample com maps 它避免了必须为返回值声明所有变量 它
  • 解释一下C++代码

    我可以获得有关以下代码解释的帮助吗 include
  • “复制本地”对于项目引用是否具有传递性?

    沃特 拟议的骗局 因为这里的问题表明了相反的情况链接问题 https stackoverflow com questions 12386523 visual studio not copying content files from ind
  • guice:命令行运行时注入/绑定

    我有以下问题 Inject MyClass Service service this service service public void doSomething service invokeSelf 我有一个模块 bind servic
  • 如何在没有 TCP/IP 堆栈的情况下用 Java 发送以太网帧

    我的 Java 应用程序应该控制直接连接到我的计算机 Ubuntu 和 Windows 网络接口的外部设备 EtherCAT 总线技术 没有连接其他网络设备 通信是在标准 IEEE 802 3 以太网帧上完成的 无需 IP 堆栈 发送数据示
  • 如何在 TensorFlow 中将张量转换为 ndarray?

    我的目标是将张量转换为 ndarray 而不需要 run 或 eval 我想执行与示例相同的操作 A tf constant 5 B tf constant A 1 0 0 但是 ndarray 可以位于 tf constant 内部 但张
  • 如何使用NuGetpackages.config文件?

    I see a 包配置解决方案中我的每个项目的文件 它包含有关各种程序集信息的信息 我希望 NuGet 能够自动扫描这些 packages config 并根据需要进行下载 但事实并非如此 我需要手动安装所有软件包吗 如果右键单击相关项目
  • python pip install 在 Windows 上不起作用

    我在 Windows 上安装了 python 2 7 10 我尝试使用以下命令在命令行上安装 Django C users user myproject gt python pip install django 这会显示以下错误 pytho
  • 更改 UITextView 中一个链接的属性

    我有一个UITextView具有多个 URL 我通过设置激活dataDetectorTypes财产给UIDataDetectorTypeLink 然后我使用linkTextAttributes属性来设置链接的颜色 现在 当用户点击其中一个链
  • 此编码器要求从 initWithCoder 返回替换的对象:

    我的应用程序在 iOS 11 2 上运行良好 但在 iOS 11 3 中会崩溃 我有例外 由于未捕获的异常 NSGenericException 而终止应用程序 原因 此编码器要求从 initWithCoder 返回替换的对象 我有一个带有
  • 通过 golang 中的多个 HTTP 处理程序包含上下文对象

    我刚刚读过这篇博文 http blog golang org error handling and go TOC 3 关于创建函数类型并实现 ServeHTTP 该函数上的方法能够处理错误 例如 type appError struct E
  • 在 SQL 中实现不相交集逼近(并集查找)

    使用 SQL 实现近似不相交集的最佳方法是什么 Details 我有一个边表 存储为两列表 vertex a vertex b 我需要一个不同集合的表 存储为 vertex set id 每个顶点一行 用不相交的 set id 标记每个顶点
  • 应用于行的几何平均值

    我有这个数据框作为例子 Col1 Col2 Col3 Col4 1 2 3 2 2 我想添加名为 Gmean 的第四列 用于计算每行前 3 列的几何平均值 怎样才能完成呢 Thanks 一种方法是Scipy s geometric mean
  • 是否可以将 Riak CS 与 Apache Flink 一起使用?

    我要配置filesystem状态后端和zookeeper恢复模式 state backend filesystem state backend fs checkpointdir recovery mode zookeeper recover