SingleOutputStreamOperator#returns(TypeHint typeHint) 方法的 javadoc

2024-01-22

我正在阅读源代码SingleOutputStreamOperator#returns,它的javadoc是:

/**
 * Adds a type information hint about the return type of this operator. This method
 * can be used in cases where Flink cannot determine automatically what the produced
 * type of a function is. That can be the case if the function uses generic type variables
 * in the return type that cannot be inferred from the input type.
 *
 * <p>Use this method the following way:
 * <pre>{@code
 *     DataStream<Tuple2<String, Double>> result =
 *         stream.flatMap(new FunctionWithNonInferrableReturnType())
 *               .returns(new TypeHint<Tuple2<String, Double>>(){});
 * }</pre>
 *
 * @param typeHint The type hint for the returned data type.
 * @return This operator with the type information corresponding to the given type hint.
 */

它提到FunctionWithNonInferrableReturnType展示 return 方法的必要性,但我无法编写这样一个类NonInferrableReturnType。怎样写一个简单的呢?


当文档说NonInferrableReturnType这意味着我们可以使用类型变量<T>,或您喜欢的任何其他字母。所以你可以创建一个MapFunction返回一个T。但接下来你必须使用.returns(TypeInformation.of(String.class)例如,如果您的目标是返回String.

public class MyMapFunctionNonInferrableReturnType<T> implements MapFunction<AbstractDataModel, T> {
    @Override
    public T map(AbstractDataModel value) throws Exception {
        return (T) value.getValue();
    }
}

在这里我使用你最后一个问题的类使用超类型创建 MapFunction 时编译失败 https://stackoverflow.com/q/68174020/2096986。相同的代码没有.returns(TypeInformation.of(String.class))编译但抛出运行时异常:

由于类型擦除,无法自动确定。你可以 通过使用 returns(...) 方法给出类型信息提示 转换调用的结果,或者让你的函数 实现“ResultTypeQueryable”接口。

public class NonInferrableReturnTypeStreamJob {

    private final List<AbstractDataModel> abstractDataModelList;
    private final ValenciaSinkFunction sink;

    public NonInferrableReturnTypeStreamJob() {
        this.abstractDataModelList = new ArrayList<AbstractDataModel>();
        this.abstractDataModelList.add(new ConcreteModel("a", "1"));
        this.abstractDataModelList.add(new ConcreteModel("a", "2"));
        this.sink = new ValenciaSinkFunction();
    }

    public NonInferrableReturnTypeStreamJob(List<AbstractDataModel> abstractDataModelList, ValenciaSinkFunction sink) {
        this.abstractDataModelList = abstractDataModelList;
        this.sink = sink;
    }

    public static void main(String[] args) throws Exception {
        NonInferrableReturnTypeStreamJob concreteModelTest = new NonInferrableReturnTypeStreamJob();
        concreteModelTest.execute();
    }

    public void execute() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.fromCollection(this.abstractDataModelList)
                .map(new MyMapFunctionNonInferrableReturnType())
                .returns(TypeInformation.of(String.class))
                .addSink(sink);

        env.execute();
    }
}

如果您愿意,这里是此示例的集成测试:

import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.junit.ClassRule;
import org.junit.Test;
import org.sense.flink.examples.stream.valencia.ValenciaSinkFunction;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static junit.framework.TestCase.assertEquals;
import static org.junit.Assert.assertTrue;

public class NonInferrableReturnTypeStreamJobTest {

    @ClassRule
    public static MiniClusterWithClientResource flinkCluster;
    private final int minAvailableProcessors = 4;
    private final boolean runInParallel;

    public NonInferrableReturnTypeStreamJobTest() {
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        this.runInParallel = availableProcessors >= minAvailableProcessors;
        if (this.runInParallel) {
            flinkCluster = new MiniClusterWithClientResource(
                    new MiniClusterResourceConfiguration.Builder()
                            .setNumberSlotsPerTaskManager(minAvailableProcessors)
                            .setNumberTaskManagers(1)
                            .build());
        }
    }

    @Test
    public void execute() throws Exception {
        List<AbstractDataModel> abstractDataModelList = new ArrayList<AbstractDataModel>();
        abstractDataModelList.add(new ConcreteModel("a", "1"));
        abstractDataModelList.add(new ConcreteModel("a", "2"));
        ValenciaSinkFunction.values.clear();

        NonInferrableReturnTypeStreamJob streamJob = new NonInferrableReturnTypeStreamJob(abstractDataModelList, new ValenciaSinkFunction());
        streamJob.execute();

        List<String> results = ValenciaSinkFunction.values;
        assertEquals(2, results.size());
        assertTrue(results.containsAll(Arrays.asList("1", "2")));
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

SingleOutputStreamOperator#returns(TypeHint typeHint) 方法的 javadoc 的相关文章

  • Java 8 可选

    我想检查特定对象大小是否大于 0 如果它大于 0 那么我想创建一个可选对象 如果不是 那么我想返回一个可选的空对象 这是java代码的长版本 if fooA size gt 0 return Optional of new Foo else
  • 将 java 方法参数设置为最终参数

    那有什么区别final在下面的代码之间进行 将参数声明为有什么好处final public String changeTimezone Timestamp stamp Timezone fTz Timezone toTz return pu
  • 在 JList 中写一些东西

    嘿 我还有另一个问题 我创建JList在我的主窗口中 现在我想向其中添加一些内容 我这样做 private void jButton2ActionPerformed java awt event ActionEvent evt Dodaj
  • Jboss EAP 7 - 如何从部署中排除隐式模块(javax.jms)?

    我没想到我会来到这里 但经过大量 Google 和 StackOverflow 搜索后 我来到了这里 这就是我的确切问题 https www linkedin com pulse tale two jars marco antonio al
  • 如何在 Android 中签署 AAR Artifacts?

    我目前正在开发一个 AAR android 库 我想用我自己的密钥对已发布的工件进行签名 以便我可以确定我是否发布了具有相同名称和功能的假 aar 注意事项1 我希望能够以编程方式检查我的库的真实性 即使是一个伪造的库 只是伪造了我的 aa
  • 如何从 Java 生产代码中删除调试语句

    编译器是否可以从生产代码中删除用于调试目的 例如日志记录 的语句 调试语句需要以某种方式进行标记 可能使用注释 设置属性 debug true 并在每个调试语句中检查它很容易 但这会降低性能 如果编译器能够简单地使调试语句消失 那就太好了
  • 在休眠搜索中使用现有分析器AnalyzerDiscriminator

    Entity Indexed AnalyzerDefs AnalyzerDef name en tokenizer TokenizerDef factory StandardTokenizerFactory class filters To
  • java内存不足然后退出

    我有一个必须分析大文件的软件 限制输入或提供无限内存都不是一个选择 所以我必须忍受飞行的 OOME 因为 OOME 只杀死线程 所以我的软件运行在一些糟糕的状态 从外面看一切都很好 因为进程正在运行 但在内部却是脑死亡 我想拔掉它的插头 但
  • org.apache.poi 中的异常

    我试图编写一个可以读取和写入的程序 xlsx文件中 下面提供的代码旨在能够编写其第一个 Excel 程序 package excel reader import java io FileOutputStream import java io
  • 更改JavaFX TableView字体大小[重复]

    这个问题在这里已经有答案了 您好 我想在表视图列内的文本上设置字体 我如何用 Java 做到这一点 这是我的代码 感谢帮助 private final TableView
  • 用二进制数、常规数字和格雷编码填充矩阵

    我有一个包含 1 s 或 0 s 的矩阵 用于创建二进制数 其宽度为n 对于 n 2 和 n 3 它看起来像 00 000 01 001 10 010 11 011 100 101 110 111 等等 现在我正在使用以下代码来生成它 in
  • Java中如何将Object[]转换为String[]?

    我有一个关于 Java 的问题 我有一个Object Java默认的 不是用户定义的 我想将它转换为String 谁能帮我 谢谢 这是转换 for int i 0 i lt objectArr length i try strArr i o
  • Java 日期和 MySQL 时间戳时区

    我正在编辑一段代码 其基本功能是 timestamp new Date 然后坚持下去timestamp中的变量TIMESTAMPMySQL 表列 然而 通过调试我看到Date显示在正确时区的对象 GMT 1 当持久化在数据库上时 它是GMT
  • 为什么我无法使用 HttpUrlConnection 上传第一个文件块?

    在我的项目中 我应该从一台服务器逐块下载文件 并将每个块立即上传到另一台服务器 我有一个应该下载的文件的 URL 我们就这样称呼它吧downloadUrl 因此 这就是我逐块下载文件的方式 val chunkSize 1024 1024 B
  • mysql 准备好的语句错误:MySQLSyntaxErrorException

    我使用准备好的语句编写了选择语句 每次尝试运行都会出现此错误 我如何克服这个错误 我的jdbc连接器是mysql connector java 5 1 13 bin jar 我的代码 public Main add ad to getAdD
  • 如何在列表视图中选择时启用视频序列自动播放?

    大家好 有人可以与我分享一下我如何编写我的 viewvideo java 类 以便它允许自动播放视频功能 自动排序在列表视图中播放所选视频的任务 从当前位置到最新录制的视频 按顺序直到最新的视频播放完毕 这类似于 YouTube 自动播放功
  • 如何让 Camel FTP 按需只获取一次

    我对骆驼还很陌生 我一直在尝试让 Camel 根据需要仅通过 FTP 获取单个文件一次 我无法让它发挥作用 这是我尝试过的 让我知道什么是最好的方法以及我的代码有什么问题 1 读取文件后发送一条空消息当收到空消息时 停止路由 from di
  • java.lang.IllegalStateException:FragmentManager 已被销毁

    活动中onResume我称之为 volley request 的方法 它获取项目列表 然后将它们加载到此活动内的 ListFragment 中 当我第一次进入活动时 一切正常 但当我重新进入活动时 ListFragment 为空 并且控制台
  • 使用 ProcessBuilder 启动 CMD

    我尝试使用以下代码在 Windows 中启动 CMD 应用程序 但它无法按预期工作 来自不同网站的几个示例表明 cmd 作为 ProcessBuilder 构造中的参数应该有效 我需要做什么才能让我的 Java 应用程序在 Windows
  • 当框架被拖动时,如何设置 JWindow 的位置位于文本字段下方?

    我正在制作一个自动完成项目 就像谷歌一样 我的框架中有一个 jtextfield 每当我在该字段中输入内容时 该文本字段下方就会出现一个 JWindow 并且该窗口来自另一个类 现在的问题是 每当我拖动框架时 如何使窗口始终出现在文本字段下

随机推荐

  • 如何使用 python mysqldb 一次插入多行

    我有一个列表列表 例如 a b c d 我有一张桌子叫T和两个字段F1 F2 字段列表中的第一项映射到F1 其次是F2 如何在单个命令或调用中为每个内部列表插入行 而不是使用这样的 for 循环 for i in a b c d c exe
  • jqgrid 在网格加载后将单元格输入更改为只读

    加载所有网格数据后 如何将单元格输入更改为 只读 我只希望加载的行将此单元格设置为只读 当我添加新行时 我不希望此单元格设置为只读 提前致谢 UPDATE 这是我的代码 Grid1 jqGrid editurl clientArray da
  • Python 错误:未指定主机

    我刚刚写了一个简单的python demo 同时遇到了一个令人困惑的问题 import requests print requests get http www sina com cn 我知道正确的结果是返回 Response 200 但是
  • PHP的use语句在加载类时会导致额外的工作吗?

    代码示例1 use Outline Drawing var new Drawing 代码示例2 var new Outline Drawing 问题 如果我使用示例 1 中的代码 PHP 是否会使硬件工作更加困难 查找更多文件或执行更多处理
  • 在 Flask 中同时渲染多个模板

    我正在制作一个 Flask 应用程序 我有一个登录区域 一个博客区域 如果我想获取用户的登录信息 我将渲染登录模板 但这不会呈现必须显示在登录区域下方的博客模板 我会尽力让它更清楚 block login endblock blah bla
  • SSIS 脚本组件删除 CHAR(和)字段中的“\0”字符

    我目前正在开发一个字段中有 0 字符的数据库 例如领域 Category CHAR 4 有时值为 0 0 0 0 4 个零字符 有时为 4 个空白字符 我想使用脚本组件来区分存在此问题的所有字段 我编写了以下脚本 但它不起作用 因为 C 将
  • 设置 AnchoredOffsetbox 的线宽和面颜色?

    是否可以更改 AnchoredOffsetbox 的线宽 面颜色等 我用它来列出我的图旁边的一些变量 例如 A 1 B 2 以这种方式 垂直对齐 所以它有点像一个额外的图例 但句柄也是文本 但我不知道如何像图例那样设置框架的属性 非常感谢任
  • 动态更改 Quill 占位符

    我知道在实例化 Quill 编辑器时 有一个占位符选项 有没有一种方法可以在实例化编辑器后动态更改此占位符 占位符是通过 CSS 规则实现的 ql editor before content attr data placeholder 所以
  • 如何在别名列上使用聚合函数 SUM?

    Invoice ID PO Number and dueDate are shown in duplicates TotalPrice 是一个别名 应该是单价 总价是一个错误 所以假设是单价而不是总价 TotalShippingPrice
  • JSON.net 与 XPATH:如何保留 SelectToken 中的节点顺序?

    XPath 2 规定应按照文档中的顺序返回选择的节点顺序 当您在 JSON Net 中 SelectTokens JSONPath 时 情况似乎并非如此 当我处理以下文档时 string json Files dir1 Files file
  • 文件版本信息和程序集信息

    给出 Blah dll 的 AssemblyInfo cs 中的这段代码 assembly AssemblyVersion 3 3 3 3 assembly AssemblyFileVersion 2 2 2 2 然后在一个单独的 exe中
  • 如何将字符串转换为HashMap? [复制]

    这个问题在这里已经有答案了 我有一个 Java 属性文件 并且有一个KEY as ORDER 所以我检索VALUE那个KEY使用getProperty 加载属性文件后的方法如下 String s prop getProperty ORDER
  • Java 8 Javascript 引擎向后兼容性

    我正在我的项目中尝试 Java 8 但遇到了与构建过程相关的错误 我正在使用 ANT 脚本 有时我会使用一些 javascript 嵌入到 ANT 中 来执行一些构建特定的操作 导致错误的脚本部分如下所示 该项目使用 Java 7 或 Ja
  • Cygwin 上的 C 编译器生成什么?

    根据下面的 recognize compilers sh 脚本的输出 Cygwin 上似乎可以使用以下 C 编译器 那些标记为 Cygwin 的要求 cygwin1 dll 文件可用 pc 和 w64 编译器有什么区别 为什么没有 x86
  • C++20 std::ranges:范围适配器跳过每个第 n 个元素

    我正在尝试更熟悉 C 20std ranges我遇到了一个看似简单的问题 如果不滚动我自己的实现 我无法找到标准解决方案 问题很简单 我只想访问和处理使用 C 20 范围适配器的范围中的每个第 N 个元素 例如 我正在寻找一个实用程序 其中
  • 从一个文件访问另一个文件中的类实例?

    我有两个文件 都在同一个项目中 网络抓取框架的一部分 File1 处理 File2 生成的项目 在 File2 中 我有一个函数可以打印出有关进程的一些基本统计信息 已生成的项目数等 我在 File1 中有计数 我想使用 File1 中的统
  • Qt 库中的 GUI 线程检测

    我需要知道我的函数在哪个线程的上下文中运行 是主 GUI 线程还是某个工作线程 我无法使用简单的解决方案将 QThread 指针存储在主函数中并将其与 QThread currentThread 进行比较 因为我正在编写一个库并且无权访问主
  • php通过字符串名称调用类函数

    如何通过名称调用普通 非静态 类函数 下面给出了一个错误 指出 param 1 需要是一个有效的回调 我不希望该函数是静态的 我希望它是一个普通的函数 并且到目前为止我看到的所有示例都将它们设为静态 class Player public
  • Dagger 2 组件中的 getter 方法的用途是什么?

    我正在尝试了解 Dagger 2 中的组件 这是一个示例 Component modules MyModule class public interface MyComponent void inject InjectionSite inj
  • SingleOutputStreamOperator#returns(TypeHint typeHint) 方法的 javadoc

    我正在阅读源代码SingleOutputStreamOperator returns 它的javadoc是 Adds a type information hint about the return type of this operato