Flink 核心编程(二) Source

2023-05-16

在这里插入图片描述
Flink框架可以从不同的来源获取数据,将数据提交给框架进行处理, 我们将获取数据的来源称之为数据源(Source)。

从java的集合中读取数据

一般情况下,可以将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用。这里的数据结构采用集合类型是比较普遍的。

//1.创建流环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.创建集合
List<WaterSensor> waterSensors = Arrays.asList(new WaterSensor("ws_001", 1577844001L, 45),
	                                           new WaterSensor("ws_002", 1577844015L, 43),
	                                           new WaterSensor("ws_003", 1577844020L, 42));
//3.读取集合中数据并打印
env.fromCollection(waterSensors).print();
	        
env.execute();

从文件读取数据

1.参数可以是目录也可以是文件
2.路径可以是相对路径也可以是绝对路径
3.相对路径是从系统属性user.dir获取路径: idea下是project的根目录, standalone模式下是集群节点根目录
4.也可以从hdfs目录下读取, 使用路径:hdfs://…, 由于Flink没有提供hadoop相关依赖, 需要pom中添加相关依赖:

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>3.1.3</version>
</dependency>
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.readTextFile("data\\log4j.properties").print();
env.execute();

从网络Socket流中读取数据

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.socketTextStream("linux01",9999);
env.execute();

从Kafka读取数据

添加相应的依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.11.2</version>
</dependency>
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092");
properties.setProperty("group.id", "test");
properties.setProperty("auto.offset.reset", "latest");
properties.setProperty("isolation.level", "read_committed");
DataStreamSource<String> topicDS = env.addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));
topicDS.print();
env.execute();

自定义Source

大多数情况下,前面的数据源已经能够满足需要,但是难免会存在特殊情况的场合,所以flink也提供了能自定义数据源的方式.

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;
import java.nio.charset.StandardCharsets;

public class Demo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.addSource(new MySource("linux01", 9999));
        env.execute();
    }

    //从Socket读取数据的source并封装成WaterSensor对象
    public static class MySource implements SourceFunction<WaterSensor> {
        private final String host;
        private final int port;
        private boolean cancel;
        private Socket socket;
        private BufferedReader bufferedReader;

        public MySource(String host, int port) {
            this.host = host;
            this.port = port;
        }

        //实现数据获取逻辑,并可以通过传入的参数ctx进行向下游节点的数据转发
        @Override
        public void run(SourceContext<WaterSensor> sourceContext) throws Exception {
            socket = new Socket(host, port);
            bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
            String line = bufferedReader.readLine();
            while (line != null) {
                String[] split = line.split(",");
                sourceContext.collect(new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2])));
                line = bufferedReader.readLine();
            }
        }

        //用来取消数据源,一般在run方法中,会存在一个循环来持续产生数据,cancel方法则可以使该循环终止
        @Override
        public void cancel() {
            cancel = true;
            if (bufferedReader != null) {
                try {
                    bufferedReader.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if (socket != null) {
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink 核心编程(二) Source 的相关文章

  • 大数据架构图

    大数据管理数据处理过程图 大数据 big data 指无法在一定时间范围内用常规软件工具进行捕捉 管理和处理的数据集合 是需要新处理模式才能具有更强的决策力 洞察力 大数据处理的主要流程包括数据收集 数据存储 数据处理 数据应用等主要环节

随机推荐