Flink自定义实现ElasticSearch Table Source

2023-11-15

Flink版本:1.12.1

ES Maven版本:elasticsearch-rest-client:6.3.1

FLINK TableSource官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/sourceSinks.html

Flink自定义Table Source需要实现3个类,DynamicTableSourceFactory -> DynamicTableSource -> SourceFunction。

在工厂类中解析建表时的选项字段,并将其作为参数。最后在SourceFunction中实现数据来源,并写入上下文。且在Table Source中需要将数据序列化为RowData,而普通DataStream自定义Source则不需要序列化。

一、ESReader

在这个类中实现了ES服务器的连接和查询方法,查询的方法可以在SourceFunction中直接调用获取数据。这里ES的连接可以根据不同的版本和需求自己写。最后的query方法是按照索引和时间范围查找。

@Slf4j
public class ESReader {

    // 测试使用
    public static void main(String[] args) throws Exception {
        ParameterTool params = getExecuteEnvParams(args);
        String esServerAddress = ;
        String username = ;
        String password = ;
        //...
        RestHighLevelClient client = getClient(esServerAddress, username, password, caPath);
        List<JSONObject> result = queryLog("", 1625537750000L, 1630894572761L, "", client);
        System.out.println(result);
        client.close();
    }

    /**
     * @return RestHighLevelClient 获取操作es索引的对象
     */
    public static RestHighLevelClient getClient() {
        //省略...
        return restClient;
    }


    public static  RestClientBuilder getRestClientBuilder(){
        //省略...
        return restClientBuilder;
    }

    public static List<JSONObject> queryLog() {
        //省略...
        return eventInfo;
    }
}

二、ESSqlFactory

工厂类中主要对建表数据源字段解析,并设置解码器。这里因为从ES中取出的数据为json,不重新新建formatFactory而直接使用Kafka的decoding。

public class ESSqlFactory implements DynamicTableSourceFactory {

    public static final ConfigOption<String> HOSTNAME = ConfigOptions.key("hostname").stringType().noDefaultValue();
    public static final ConfigOption<String> USERNAME = ConfigOptions.key("username").stringType().noDefaultValue();
    public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password").stringType().noDefaultValue();
    //...


    /**
     *  用于 'connector' = '...'
     * @return elasticsearch
     */
    @Override
    public String factoryIdentifier() {
        return "elasticsearch";
    }

    /**
     * 必选字段
     */
    @Override
    public Set<ConfigOption<?>> requiredOptions() {
        final Set<ConfigOption<?>> options = new HashSet<>();
        options.add(HOSTNAME);
        options.add(USERNAME);
        options.add(PASSWORD);
        //...

        options.add(FactoryUtil.FORMAT);    // use pre-defined option for format
        return options;
    }

    /**
     * 可选字段
     */
    @Override
    public Set<ConfigOption<?>> optionalOptions() {
        final Set<ConfigOption<?>> options = new HashSet<>();
        return options;
    }

    public DynamicTableSource createDynamicTableSource(Context context) {
        final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);

        // 获取解码器
        final DecodingFormat<DeserializationSchema<RowData>> valueFormat =
                (DecodingFormat)helper.discoverOptionalDecodingFormat(
                        DeserializationFormatFactory.class, FactoryUtil.FORMAT).orElseGet(() -> {
            return helper.discoverDecodingFormat(DeserializationFormatFactory.class, KafkaOptions.VALUE_FORMAT);
        });

        final DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat(
                DeserializationFormatFactory.class, FactoryUtil.FORMAT);

        helper.validate();

        final ReadableConfig options = helper.getOptions();
        final String hostname = options.get(HOSTNAME);
        final String username = options.get(USERNAME);
        final String password = options.get(PASSWORD);
        //...

        final DataType producedDataType = context.getCatalogTable().getSchema().toPersistedRowDataType();

        return new ESDynamicTableSource(hostname, username, password,..., valueFormat, producedDataType);
    }
}

三、ESDynamicTableSource

从工厂类中调用动态表源类,该类实现了ScanTableSource,做全部查询,其中的核心方法为getScanRuntimeProvider

public class ESDynamicTableSource implements ScanTableSource {

    private final String hostname;
    private final String username;
    private final String password;
    //...
    private final DecodingFormat<DeserializationSchema<RowData>> decodingFormat;
    private final DataType producedDataType;

    public ESDynamicTableSource(String hostname,
                                String username,
                                String password,
                                //...
                                DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
                                DataType producedDataType) {
        this.hostname = hostname;
        this.username = username;
        this.password = password;
        //...
        this.decodingFormat = decodingFormat;
        this.producedDataType = producedDataType;
    }

    @Override
    public ChangelogMode getChangelogMode() {
        return decodingFormat.getChangelogMode();
    }

    @Override
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
        final DeserializationSchema<RowData> deserializer = decodingFormat.createRuntimeDecoder(
                runtimeProviderContext,
                producedDataType);

        final SourceFunction<RowData> sourceFunction = new ESSourceFunction(
                hostname, username, password,
                ...,deserializer);

        return SourceFunctionProvider.of(sourceFunction, false);
    }

    @Override
    public DynamicTableSource copy() {
        return new ESDynamicTableSource(hostname, username, password,
                ..., decodingFormat, producedDataType);
    }

    @Override
    public String asSummaryString() {
        return "elastic Table Source";
    }
}

四、ESSourceFunction

该类中实现了对读取ES数据的读取,通过在run()方法中调用ES读取方法。读取后需要使用传进来的deserializer转换为RowData

public class ESSourceFunction extends RichSourceFunction<RowData> implements ResultTypeQueryable<RowData> {

    private final String hostname;
    private final String username;
    private final String password;
    //...
    private final DeserializationSchema<RowData> deserializer;

    private volatile boolean isRunning = true;
    RestHighLevelClient client;


    public ESSourceFunction(String hostname, String username, String password, ..., DeserializationSchema<RowData> deserializer) {
        this.hostname = hostname;
        this.username = username;
        this.password = password;
        //...
        this.deserializer = deserializer;
    }

    @Override
    public TypeInformation<RowData> getProducedType() {
        return deserializer.getProducedType();
    }

    @Override
    public void run(SourceContext<RowData> ctx) throws Exception {
        // 数据源获取
        // 省略...
        JSONObject result = queryLog(...);
 		ctx.collect(deserializer.deserialize(result.toJSONString().getBytes()));
        cancel();
    }

    @Override
    public void cancel() {
        isRunning = false;
        try {
            client.close();
        } catch (Throwable t) {
            // ignore
        }
    }
}

五、使用

在flink Table建表语句中调用即可:

CREATE TABLE ...
WITH (
'connector' = 'elasticsearch',
'hostname' = '',
'password' = ''... ,
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true')
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink自定义实现ElasticSearch Table Source 的相关文章

随机推荐

  • [CTSC2008]网络管理Network【树状数组+主席树】

    题目链接 题意 有一棵N个点的树 每个点有对应的权值 现在有这样的操作 0 a b 将a点的权值改成为b k a b 询问a到b的链上第k大的权值是几 我们可以用dfs序的树上差分的方式来解决这个问题 可以发现 求u到v的信息 其实就是求u
  • 【c++】角度与弧度转换

    角度转换为弧度 double angle to radian double degree double min double second double flag degree lt 0 1 0 1 0 判断正负 if degree lt
  • matlab与机器学习(二)程序基本操作(含代码解析)

    备注 之间的为注释内容 注释后面的内容 matlab形成自己的编程风格更有利于简洁的编程界面 图像的初步处理可应用于论文撰写上 文章更有说服力 程序调试有利于发现逻辑问题 都是必备的基础知识 I 清空环境变量及命令 clear all cl
  • Python selenium基础用法详解

    活动地址 CSDN21天学习挑战赛 学习的最大理由是想摆脱平庸 早一天就多一份人生的精彩 迟一天就多一天平庸的困扰 学习日记 目录 学习日记 一 Selenium库介绍 1 Selenium简介 2 Selenium的安装 3 安装浏览器驱
  • RabbitMQ系列(十五)RabbitMQ进阶-SprintBoot集成RabbitMQ使用

    RabbitMQ进阶 SprintBoot集成RabbitMQ使用 文章目录 RabbitMQ进阶 SprintBoot集成RabbitMQ使用 1 构建项目 1 1 Spring Init创建项目 1 2 新建项目包 2 初始化Rabbi
  • 【Chrome】分享几个常用的插件,持续集成

    文章目录 一 准备 打开扩展程序 1 1 方式一 1 2 方式二 1 3 打开开发者模式 二 Chrome应用商店在线安装 需要科学上网 三 离线安装 3 1 离线crx下载地址 3 2 crx方式安装 3 3 加载已解压的扩展程序 方式安
  • 计算机组成原理实验——五、单周期CPU设计

    一 实验目的 掌握指令执行过程的5个阶段 掌握每条指令的数据通路选择 掌握译码器和控制器的功能和实现 掌握数据输入输出处理的方法 实现risc v中RV32I指令的单周期CPU 利用实现的risc v CPU实现平方数 二 实验内容 实现r
  • DHorse系列文章之操作手册

    在介绍DHorse的操作之前 我们先来看一下发布一个系统的流程是什么样的 发布系统的流程 我们以一个Springboot系统为例 来说明一下发布流程 1 首先从代码仓库下载代码 比如Gitlab 2 接着是进行打包 比如使用Maven 3
  • 求和2.14

    n int input 输入数字的数量 a list map int input split 输入一个列表 print a S 0 s1 sum a for i in range 0 n s1 a i S a i s1 print S
  • 使用PHPExcel实现数据批量导入到数据库

    此例子只使用execel2003的 xls文档 若使用的是其他版本 可以保存格式为 Execel 97 2003 工作簿 xls 即 xls文件类型即可 功能说明 只能上传Excel2003类型的xls文件 大小不超过5M 可下载例子模板添
  • 配置服务器实现无缝连接

    在进行网络爬虫时 经常会面临目标网站的IP封锁 反爬虫策略等问题 为了解决这些问题 配置代理服务器是一种常见的方法 本文将向您介绍如何配置代理服务器与爬虫实现无缝连接 助您顺利进行数据采集 一 了解代理服务器的作用 代理服务器充当中间人的角
  • vue3.0 vue.config.js 配置实战

    项目常用配置 const path require path const UglifyJsPlugin require uglifyjs webpack plugin function resolve dir return path joi
  • MIUI11系统详细卡刷开发版获取Root超级权限的步骤

    小米的机器不同手机型号一般情况官方论坛都提供两个不同的系统 它们是稳定版和开发版 稳定版没有提供root权限管理 开发版中就提供了root权限 很多情况下我们需要使用的一些功能强大的工具 都需要在root权限下工作 就比如我们公司在使用的营
  • 2、应用入口类 SpringbootApplication&核心注解

    官网 https docs spring io spring boot docs 2 5 8 SNAPSHOT reference htmlsingle getting started installing 核心注解分析 许多 Spring
  • 【Postgresql】触发器某个字段更新时执行,行插入或更新执行

    Postgresql 触发器某个字段更新时执行 行插入或更新执行 1 postgresql触发器 2 触发器的创建及示例 1 字段更新时 触发 2 行插入或更新时 触发 3 触发器的删除 4 触发器的坑 参考 1 postgresql触发器
  • 服务器好玩的项目_推荐!github上四个与100有关的优质项目

    编辑 zero 关注 搜罗最好玩的计算机视觉论文和应用 AI算法与图像处理 微信公众号 获得第一手计算机视觉相关信息 今天要分享四个非常优质的开源项目 一定能够有效的提升你的coding能力 1 Python 100天从新手到大师 2 10
  • 机器学习基础学习-多元线性回归问题(梯度下降法实现)

    1 基本概念 在之前的博客当中描述了怎样模拟出了梯度下降的过程 如果是多维情况 theta其实是一个向量 那么对其求导的损失函数也是向量 梯度就是损失函数对每个方向的theta求偏导 和之前的一维线性回归相比 我们对只是对w这个数字进行求导
  • VC++ CComboBox自绘(颜色下拉列表框)

    使用前 请将控件的Style属性设置为DropdownList 下拉列表 Owner Draw设置为Fixed Has Strings设置为TRUE 效果图如下 头文件声明 CSWColorComboBox h pragma once in
  • 关于C++ 对象私有成员不可访问的理解误区

    C 中对象的私有成员是否可以被别的对象访问 答案是肯定的 但是分场合 同一个类 友元类 什么情况下可以访问C 对象的私有成员呢 首先 C 私有成员不可以直接访问是个错误理解 C 对象的私有成员在类内是可以访问的 请注意是类内而不仅仅是对象内
  • Flink自定义实现ElasticSearch Table Source

    Flink版本 1 12 1 ES Maven版本 elasticsearch rest client 6 3 1 FLINK TableSource官方文档 https ci apache org projects flink flink