Flink版本:1.12.1
ES Maven版本:elasticsearch-rest-client:6.3.1
FLINK TableSource官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/sourceSinks.html
Flink自定义Table Source需要实现3个类,DynamicTableSourceFactory -> DynamicTableSource -> SourceFunction。
在工厂类中解析建表时的选项字段,并将其作为参数。最后在SourceFunction中实现数据来源,并写入上下文。且在Table Source中需要将数据序列化为RowData,而普通DataStream自定义Source则不需要序列化。
一、ESReader
在这个类中实现了ES服务器的连接和查询方法,查询的方法可以在SourceFunction中直接调用获取数据。这里ES的连接可以根据不同的版本和需求自己写。最后的query方法是按照索引和时间范围查找。
@Slf4j
public class ESReader {
// 测试使用
public static void main(String[] args) throws Exception {
ParameterTool params = getExecuteEnvParams(args);
String esServerAddress = ;
String username = ;
String password = ;
//...
RestHighLevelClient client = getClient(esServerAddress, username, password, caPath);
List<JSONObject> result = queryLog("", 1625537750000L, 1630894572761L, "", client);
System.out.println(result);
client.close();
}
/**
* @return RestHighLevelClient 获取操作es索引的对象
*/
public static RestHighLevelClient getClient() {
//省略...
return restClient;
}
public static RestClientBuilder getRestClientBuilder(){
//省略...
return restClientBuilder;
}
public static List<JSONObject> queryLog() {
//省略...
return eventInfo;
}
}
二、ESSqlFactory
工厂类中主要对建表数据源字段解析,并设置解码器。这里因为从ES中取出的数据为json,不重新新建formatFactory而直接使用Kafka的decoding。
public class ESSqlFactory implements DynamicTableSourceFactory {
public static final ConfigOption<String> HOSTNAME = ConfigOptions.key("hostname").stringType().noDefaultValue();
public static final ConfigOption<String> USERNAME = ConfigOptions.key("username").stringType().noDefaultValue();
public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password").stringType().noDefaultValue();
//...
/**
* 用于 'connector' = '...'
* @return elasticsearch
*/
@Override
public String factoryIdentifier() {
return "elasticsearch";
}
/**
* 必选字段
*/
@Override
public Set<ConfigOption<?>> requiredOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(HOSTNAME);
options.add(USERNAME);
options.add(PASSWORD);
//...
options.add(FactoryUtil.FORMAT); // use pre-defined option for format
return options;
}
/**
* 可选字段
*/
@Override
public Set<ConfigOption<?>> optionalOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
return options;
}
public DynamicTableSource createDynamicTableSource(Context context) {
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
// 获取解码器
final DecodingFormat<DeserializationSchema<RowData>> valueFormat =
(DecodingFormat)helper.discoverOptionalDecodingFormat(
DeserializationFormatFactory.class, FactoryUtil.FORMAT).orElseGet(() -> {
return helper.discoverDecodingFormat(DeserializationFormatFactory.class, KafkaOptions.VALUE_FORMAT);
});
final DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat(
DeserializationFormatFactory.class, FactoryUtil.FORMAT);
helper.validate();
final ReadableConfig options = helper.getOptions();
final String hostname = options.get(HOSTNAME);
final String username = options.get(USERNAME);
final String password = options.get(PASSWORD);
//...
final DataType producedDataType = context.getCatalogTable().getSchema().toPersistedRowDataType();
return new ESDynamicTableSource(hostname, username, password,..., valueFormat, producedDataType);
}
}
三、ESDynamicTableSource
从工厂类中调用动态表源类,该类实现了ScanTableSource,做全部查询,其中的核心方法为getScanRuntimeProvider
public class ESDynamicTableSource implements ScanTableSource {
private final String hostname;
private final String username;
private final String password;
//...
private final DecodingFormat<DeserializationSchema<RowData>> decodingFormat;
private final DataType producedDataType;
public ESDynamicTableSource(String hostname,
String username,
String password,
//...
DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
DataType producedDataType) {
this.hostname = hostname;
this.username = username;
this.password = password;
//...
this.decodingFormat = decodingFormat;
this.producedDataType = producedDataType;
}
@Override
public ChangelogMode getChangelogMode() {
return decodingFormat.getChangelogMode();
}
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
final DeserializationSchema<RowData> deserializer = decodingFormat.createRuntimeDecoder(
runtimeProviderContext,
producedDataType);
final SourceFunction<RowData> sourceFunction = new ESSourceFunction(
hostname, username, password,
...,deserializer);
return SourceFunctionProvider.of(sourceFunction, false);
}
@Override
public DynamicTableSource copy() {
return new ESDynamicTableSource(hostname, username, password,
..., decodingFormat, producedDataType);
}
@Override
public String asSummaryString() {
return "elastic Table Source";
}
}
四、ESSourceFunction
该类中实现了对读取ES数据的读取,通过在run()方法中调用ES读取方法。读取后需要使用传进来的deserializer转换为RowData
public class ESSourceFunction extends RichSourceFunction<RowData> implements ResultTypeQueryable<RowData> {
private final String hostname;
private final String username;
private final String password;
//...
private final DeserializationSchema<RowData> deserializer;
private volatile boolean isRunning = true;
RestHighLevelClient client;
public ESSourceFunction(String hostname, String username, String password, ..., DeserializationSchema<RowData> deserializer) {
this.hostname = hostname;
this.username = username;
this.password = password;
//...
this.deserializer = deserializer;
}
@Override
public TypeInformation<RowData> getProducedType() {
return deserializer.getProducedType();
}
@Override
public void run(SourceContext<RowData> ctx) throws Exception {
// 数据源获取
// 省略...
JSONObject result = queryLog(...);
ctx.collect(deserializer.deserialize(result.toJSONString().getBytes()));
cancel();
}
@Override
public void cancel() {
isRunning = false;
try {
client.close();
} catch (Throwable t) {
// ignore
}
}
}
五、使用
在flink Table建表语句中调用即可:
CREATE TABLE ...
WITH (
'connector' = 'elasticsearch',
'hostname' = '',
'password' = '',
... ,
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true')