创建具有通用返回类型的 FlinkSQL UDF

2024-04-24

我想定义函数MAX_BY接受类型值T和类型的订购参数Number并根据排序从窗口返回最大元素(类型为T)。我试过了

public class MaxBy<T> extends AggregateFunction<T, Tuple2<T, Number>> {

    @Override
    public T getValue(Tuple2<T, Number> tuple) {
        return tuple.f0;
    }

    @Override
    public Tuple2<T, Number> createAccumulator() {
        return Tuple2.of(null, 0L);
    }

    public void accumulate(Tuple2<T, Number> acc, T value, Number order) {
        if (order.doubleValue() > acc.f1.doubleValue()) {
            acc.f0 = value;
            acc.f1 = order;
        }
    }
}

但我无法使用注册此类功能TableEnvironment.registerFunction。 Flink 底层使用TypeInformation为了匹配 SQL 查询中的类型,并且使用这样的定义,它无法确定类型(至少我是这么认为的)。我看到可以提供几个accumulate函数,但仍然 - 我认为每个重载方法的返回类型必须相同。

内置聚合函数的工作原理与我想要实现的类似 -MAX可以采用任意列类型并返回相同的类型。这就是为什么我想我也应该能够做到。


不幸的是,Flink 不支持具有灵活返回类型的聚合函数。为了MAX函数,内部实现定义独立于类型的核心逻辑,然后为每个支持的类型创建一个实现(see code https://github.com/apache/flink/blob/0ab1549f52f1f544e8492757c6b0d562bf50a061/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala).

在内部,MAX然后根据类型映射到正确的实现。

我认为如果您将函数定义并注册为用户定义的聚合函数,这是不可能的。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

创建具有通用返回类型的 FlinkSQL UDF 的相关文章

随机推荐