虽然您需要 Hadoop 库,但您不必安装 Hadoop 即可在本地运行并写入 S3。我只是碰巧尝试编写基于 Avro 模式的 Parquet 输出并生成 SpecificRecord 到 S3。我正在通过 SBT 和 Intellij Idea 在本地运行以下代码的版本。所需零件:
1) 使用以下文件指定所需的 Hadoop 属性(注意:不建议定义 AWS 访问密钥/秘密密钥。最好在具有适当 IAM 角色以读取/写入 S3 存储桶的 EC2 实例上运行。但需要本地进行测试)
<configuration>
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<!-- Comma separated list of local directories used to buffer
large results prior to transmitting them to S3. -->
<property>
<name>fs.s3a.buffer.dir</name>
<value>/tmp</value>
</property>
<!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants -->
<property>
<name>fs.s3a.access.key</name>
<value>YOUR_ACCESS_KEY</value>
</property>
<!-- set your AWS access key -->
<property>
<name>fs.s3a.secret.key</name>
<value>YOUR_SECRET_KEY</value>
</property>
</configuration>
2)进口:
导入 com.uebercomputing.eventrecord.EventOnlyRecord
import org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.Job
import org.apache.parquet.avro.AvroParquetOutputFormat
3)Flink代码使用具有上述配置的HadoopOutputFormat:
val events: DataSet[(Void, EventOnlyRecord)] = ...
val hadoopConfig = getHadoopConfiguration(hadoopConfigFile)
val outputFormat = new AvroParquetOutputFormat[EventOnlyRecord]
val outputJob = Job.getInstance
//Note: AvroParquetOutputFormat extends FileOutputFormat[Void,T]
//so key is Void, value of type T - EventOnlyRecord in this case
val hadoopOutputFormat = new HadoopOutputFormat[Void, EventOnlyRecord](
outputFormat,
outputJob
)
val outputConfig = outputJob.getConfiguration
outputConfig.addResource(hadoopConfig)
val outputPath = new Path("s3://<bucket>/<dir-prefix>")
FileOutputFormat.setOutputPath(outputJob, outputPath)
AvroParquetOutputFormat.setSchema(outputJob, EventOnlyRecord.getClassSchema)
events.output(hadoopOutputFormat)
env.execute
...
def getHadoopConfiguration(hadoodConfigPath: String): HadoopConfiguration = {
val hadoopConfig = new HadoopConfiguration()
hadoopConfig.addResource(new Path(hadoodConfigPath))
hadoopConfig
}
4)构建依赖项和使用的版本:
val awsSdkVersion = "1.7.4"
val hadoopVersion = "2.7.3"
val flinkVersion = "1.1.4"
val flinkDependencies = Seq(
("org.apache.flink" %% "flink-scala" % flinkVersion),
("org.apache.flink" %% "flink-hadoop-compatibility" % flinkVersion)
)
val providedFlinkDependencies = flinkDependencies.map(_ % "provided")
val serializationDependencies = Seq(
("org.apache.avro" % "avro" % "1.7.7"),
("org.apache.avro" % "avro-mapred" % "1.7.7").classifier("hadoop2"),
("org.apache.parquet" % "parquet-avro" % "1.8.1")
)
val s3Dependencies = Seq(
("com.amazonaws" % "aws-java-sdk" % awsSdkVersion),
("org.apache.hadoop" % "hadoop-aws" % hadoopVersion)
)
编辑使用 writeAsText 到 S3:
1) 创建一个 Hadoop 配置目录(将其引用为 hadoop-conf-dir),其中包含文件 core-site.xml。
例如:
mkdir /home/<user>/hadoop-config
cd /home/<user>/hadoop-config
vi core-site.xml
#content of core-site.xml
<configuration>
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<!-- Comma separated list of local directories used to buffer
large results prior to transmitting them to S3. -->
<property>
<name>fs.s3a.buffer.dir</name>
<value>/tmp</value>
</property>
<!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants -->
<property>
<name>fs.s3a.access.key</name>
<value>YOUR_ACCESS_KEY</value>
</property>
<!-- set your AWS access key -->
<property>
<name>fs.s3a.secret.key</name>
<value>YOUR_SECRET_KEY</value>
</property>
</configuration>
2) 创建一个目录(将其引用为 flink-conf-dir),其中包含文件 flink-conf.yaml。
例如:
mkdir /home/<user>/flink-config
cd /home/<user>/flink-config
vi flink-conf.yaml
//content of flink-conf.yaml - continuing earlier example
fs.hdfs.hadoopconf: /home/<user>/hadoop-config
3) 编辑用于运行 S3 Flink 作业的 IntelliJ Run 配置 - 运行 - 编辑配置 - 并添加以下环境变量:
FLINK_CONF_DIR and set it to your flink-conf-dir
Continuing the example above:
FLINK_CONF_DIR=/home/<user>/flink-config
4) 使用该环境变量集运行代码:
events.writeAsText("s3://<bucket>/<prefix-dir>")
env.execute