Flink框架可以从不同的来源获取数据,将数据提交给框架进行处理, 我们将获取数据的来源称之为数据源(Source)。
从java的集合中读取数据
一般情况下,可以将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用。这里的数据结构采用集合类型是比较普遍的。
//1.创建流环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.创建集合
List<WaterSensor> waterSensors = Arrays.asList(new WaterSensor("ws_001", 1577844001L, 45),
new WaterSensor("ws_002", 1577844015L, 43),
new WaterSensor("ws_003", 1577844020L, 42));
//3.读取集合中数据并打印
env.fromCollection(waterSensors).print();
env.execute();
从文件读取数据
1.参数可以是目录也可以是文件
2.路径可以是相对路径也可以是绝对路径
3.相对路径是从系统属性user.dir获取路径: idea下是project的根目录, standalone模式下是集群节点根目录
4.也可以从hdfs目录下读取, 使用路径:hdfs://…, 由于Flink没有提供hadoop相关依赖, 需要pom中添加相关依赖:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.readTextFile("data\\log4j.properties").print();
env.execute();
从网络Socket流中读取数据
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.socketTextStream("linux01",9999);
env.execute();
从Kafka读取数据
添加相应的依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.11.2</version>
</dependency>
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092");
properties.setProperty("group.id", "test");
properties.setProperty("auto.offset.reset", "latest");
properties.setProperty("isolation.level", "read_committed");
DataStreamSource<String> topicDS = env.addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));
topicDS.print();
env.execute();
自定义Source
大多数情况下,前面的数据源已经能够满足需要,但是难免会存在特殊情况的场合,所以flink也提供了能自定义数据源的方式.
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
public class Demo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new MySource("linux01", 9999));
env.execute();
}
//从Socket读取数据的source并封装成WaterSensor对象
public static class MySource implements SourceFunction<WaterSensor> {
private final String host;
private final int port;
private boolean cancel;
private Socket socket;
private BufferedReader bufferedReader;
public MySource(String host, int port) {
this.host = host;
this.port = port;
}
//实现数据获取逻辑,并可以通过传入的参数ctx进行向下游节点的数据转发
@Override
public void run(SourceContext<WaterSensor> sourceContext) throws Exception {
socket = new Socket(host, port);
bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
String line = bufferedReader.readLine();
while (line != null) {
String[] split = line.split(",");
sourceContext.collect(new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2])));
line = bufferedReader.readLine();
}
}
//用来取消数据源,一般在run方法中,会存在一个循环来持续产生数据,cancel方法则可以使该循环终止
@Override
public void cancel() {
cancel = true;
if (bufferedReader != null) {
try {
bufferedReader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)