Flink Window分析及Watermark解决乱序数据机制深入剖析-Flink牛刀小试

2023-05-16

版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。版权声明:禁止转载,欢迎学习。QQ邮箱地址:1120746959@qq.com,如有任何问题,可随时联系。

本文决心讲清楚这个纠结的水印Watermark问题,Come on !

1 The Time

针对stream数据中的时间,可以分为以下三种:

  • Event Time:事件产生的时间,它通常由事件中的时间戳描述。
  • Ingestion time:事件进入Flink的时间
  • Processing Time:事件被处理时当前系统的时间

  • Flink中,默认Time类似是ProcessingTime ,可以在代码中设置:

1.1 tips(请认真思考下面的话,绝对震聋发溃!)

  • 在水印的处理中,我们一般取事件时间序列的最大值作为水印的生成时间参考。

  • 按照信号发生的顺序,时间是不断增加的,所以在时间序列上若出现事件时间小于时间序列最大值,一般都是延时的事件,时间序列最大值不会改变。

  • 每处理一条事件数据,watermark时间就生成一次,后面窗的触发就是依据水印时间。若设置乱序延时为10s,则生成规则就是:

     final Long maxOutOfOrderness = 10000L;// 最大允许的乱序时间是10s
     new Watermark(currentMaxTimestamp - maxOutOfOrderness)
    复制代码
  • 数据会按照时间进行依次Append,

  • 水印依赖的条件是窗,水印只是决定了窗的触发时间,若设置最大允许的乱序时间是maxOutOfOrderness=10s,则窗的触发时机就是:

      watermark 时间 >= window_end_time
    复制代码
  • 窗触发时,数据除了正常的时间序列,同时也包含延时到达的序列。在窗触发前(也就水印时间),计算除了把之前的正常窗数据给触发了,同时还包含了本来也属于该窗的延时数据。

2 窗与水印的世纪谜题

  • 事件时间的最大值,也就是当前的实际事件时间,因此需要以此为参考点。
  • 实际窗:意思就是数据就在那里Append,窗数据已经准备好,等待触发时机。
  • 水印时间不受影响:就是每次来的数据的事件时间最大值,不受延迟数据时间影响。
  • 下面例子中,等水印时间为10:11:33时,满足时间窗 10:11:30 <-> 10:11:33的触发时机,此时需要处理的数据不仅包含正常数据10:11:21 ,同时还包含乱序数据10:11:31。
  • 再次强调:窗时机到来时,会遍历乱序数据和原窗数据。
  • 实际窗在流动,只是暂不触发。
  • 水印也在标记流动
  • 窗时机触发也在流动。
  • watermark 时间 >= window_end_time时,触发历史窗执行。

3 EventTime和Watermarks 水位线理论碰撞

  • 流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络延迟等原因,导致乱序的产生,特别是使用kafka的话,多个分区的数据无法保证有序。所以在进行window计算的时候,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark,watermark是用于处理乱序事件的。

  • 通常,在接收到source的数据后,应该立刻生成watermark;但是,也可以在source后,应用简单的map或者filter操作后,再生成watermark。注意:如果指定多次watermark,后面指定的会覆盖前面的值。 生成方式

  • With Periodic Watermarks

      周期性的触发watermark的生成和发送,默认是100ms,每隔N秒自动向流里
      注入一个WATERMARK 时间间隔由ExecutionConfig.setAutoWatermarkInterval 
      决定. 每次调用getCurrentWatermark 方法, 如果得到的WATERMARK
      不为空并且比之前的大就注入流中 
      可以定义一个最大允许乱序的时间,这种比较常用
      实现AssignerWithPeriodicWatermarks接口
    复制代码
  • With Punctuated Watermarks

      基于某些事件触发watermark的生成和发送基于事件向流里注入一个WATERMARK,
      每一个元素都有机会判断是否生成一个WATERMARK. 如果得到的WATERMARK
      不为空并且比之前的大就注入流中实现AssignerWithPunctuatedWatermarks接口
    复制代码
  • 多并行度流的watermarks

    注意:多并行度的情况下,watermark对齐会取所有channel最小的watermark。

4 With Periodic Watermarks案例实战

4.1 最朴实的水印方案(基于事件序列最大值)

 public class StreamingWindowWatermark {

    public static void main(String[] args) throws Exception {
        //定义socket的端口号
        int port = 9010;
        //获取运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置使用eventtime,默认是使用processtime
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //设置并行度为1,默认并行度是当前机器的cpu数量
        env.setParallelism(1);
        //连接socket获取输入的数据
        DataStream<String> text = env.socketTextStream("SparkMaster", port, "\n");

        //解析输入的数据
        DataStream<Tuple2<String, Long>> inputMap = text.map(new MapFunction<String, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(String value) throws Exception {
                String[] arr = value.split(",");
                return new Tuple2<>(arr[0], Long.parseLong(arr[1]));
            }
        });

        //抽取timestamp和生成watermark
        DataStream<Tuple2<String, Long>> waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {

            Long currentMaxTimestamp = 0L;
            final Long maxOutOfOrderness = 10000L;// 最大允许的乱序时间是10s

            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
 
            @Nullable
            @Override
            public Watermark getCurrentWatermark() {
                return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
            }

            //定义如何提取timestamp
            @Override
            public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {
                long timestamp = element.f1;
                currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
                long id = Thread.currentThread().getId();
                System.out.println("作者:秦凯新 键值 :"+element.f0+",事件事件:[ "+sdf.format(element.f1)+" ],currentMaxTimestamp:[ "+
                        sdf.format(currentMaxTimestamp)+" ],水印时间:[ "+sdf.format(getCurrentWatermark().getTimestamp())+" ]");
                return timestamp;
            }
        });

        DataStream<String> window = waterMarkStream.keyBy(0)
                .window(TumblingEventTimeWindows.of(Time.seconds(3)))//按照消息的EventTime分配窗口,和调用TimeWindow效果一样
                .apply(new WindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() {
                    /**
                     * 对window内的数据进行排序,保证数据的顺序
                     * @param tuple
                     * @param window
                     * @param input
                     * @param out
                     * @throws Exception
                     */
                    @Override
                    public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {
                        String key = tuple.toString();
                        List<Long> arrarList = new ArrayList<Long>();
                        Iterator<Tuple2<String, Long>> it = input.iterator();
                        while (it.hasNext()) {
                            Tuple2<String, Long> next = it.next();
                            arrarList.add(next.f1);
                        }
                        Collections.sort(arrarList);
                        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                        String result = "\n 作者:秦凯新 键值 : "+ key + "\n              触发窗内数据个数 : " + arrarList.size() + "\n              触发窗起始数据: " + sdf.format(arrarList.get(0)) + "\n              触发窗最后(可能是延时)数据:" + sdf.format(arrarList.get(arrarList.size() - 1))
                                + "\n              实际窗起始和结束时间: " + sdf.format(window.getStart()) + "《----》" + sdf.format(window.getEnd()) + " \n \n ";

                        out.collect(result);
                    }
                });
        //测试-把结果打印到控制台即可
        window.print();

        //注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行
        env.execute("eventtime-watermark");

    }
}
复制代码
  • 执行测试数据

     0001,1538359882000		2018-10-01 10:11:22
     0002,1538359886000		2018-10-01 10:11:26
     0003,1538359892000		2018-10-01 10:11:32
     0004,1538359893000		2018-10-01 10:11:33
     0005,1538359894000		2018-10-01 10:11:34
     0006,1538359896000		2018-10-01 10:11:36
     0007,1538359897000		2018-10-01 10:11:37
     
     0008,1538359899000		2018-10-01 10:11:39
     0009,1538359891000		2018-10-01 10:11:31
     0010,1538359903000		2018-10-01 10:11:43
     
     0011,1538359892000		2018-10-01 10:11:32
     0012,1538359891000		2018-10-01 10:11:31
     
     0010,1538359906000		2018-10-01 10:11:46
    复制代码

第一个窗触发:2018-10-01 10:11:21.000《----》2018-10-01 10:11:24.000

第二个窗触发:2018-10-01 10:11:24.000《----》2018-10-01 10:11:27.000

第三个窗触发:2018-10-01 10:11:30.000《----》2018-10-01 10:11:33.000

第四个窗触发:10:11:33.000《----》2018-10-01 10:11:36.000

4.2 最霸道的水印设计(allowedLateness与OutputLateData)

  • 在某些情况下, 我们希望对迟到的数据再提供一个宽容的时间。 Flink 提供了 allowedLateness 方法可以实现对迟到的数据设置一个延迟时间, 在指定延迟时间内到达的数据还是可以触发 window 执行的。

  • 第二次(或多次)触发的条件是 watermark < window_end_time + allowedLateness 时间内, 这个窗口有 late 数据到达时。

  • 举例:当 watermark 等于 10:11:34 的时候, 我们输入 eventtime 为 10:11:30、 10:11:31、10:11:32 的数据的时候, 是可以触发的, 因为这些数据的 window_end_time 都是 10:11:33, 也就是10:11:34<10:11:33+2 为 true。

  • 举例:但是当 watermark 等于 10:11:35 的时候,我们再输入 eventtime 为 10:11:30、10:11:31、10:11:32的数据的时候, 这些数据的 window_end_time 都是 10:11:33, 此时, 10:11:35< 10:11:33+2 为false 了。 所以最终这些数据迟到的时间太久了, 就不会再触发 window 执行了,预示着丢弃。

  • 同时注意,对于延迟的数据,我们完全可以把它揪出来作分析。通过设置sideOutputLateData。

      public class StreamingWindowWatermark2 {
      public static void main(String[] args) throws Exception {
          //定义socket的端口号
          int port = 9000;
          //获取运行环境
          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
          //设置使用eventtime,默认是使用processtime
          env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
          //设置并行度为1,默认并行度是当前机器的cpu数量
          env.setParallelism(1);
    
          //连接socket获取输入的数据
          DataStream<String> text = env.socketTextStream("hadoop100", port, "\n");
    
          //解析输入的数据
          DataStream<Tuple2<String, Long>> inputMap = text.map(new MapFunction<String, Tuple2<String, Long>>() {
              @Override
              public Tuple2<String, Long> map(String value) throws Exception {
                  String[] arr = value.split(",");
                  return new Tuple2<>(arr[0], Long.parseLong(arr[1]));
              }
          });
    
          //抽取timestamp和生成watermark
          DataStream<Tuple2<String, Long>> waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {
    
              Long currentMaxTimestamp = 0L;
              final Long maxOutOfOrderness = 10000L;// 最大允许的乱序时间是10s
    
              SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
              /**
               * 定义生成watermark的逻辑
               * 默认100ms被调用一次
               */
              @Nullable
              @Override
              public Watermark getCurrentWatermark() {
                  return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
              }
    
              //定义如何提取timestamp
              @Override
              public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {
                  long timestamp = element.f1;
                  currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
                  System.out.println("key:"+element.f0+",eventtime:["+element.f1+"|"+sdf.format(element.f1)+"],currentMaxTimestamp:["+currentMaxTimestamp+"|"+
                          sdf.format(currentMaxTimestamp)+"],watermark:["+getCurrentWatermark().getTimestamp()+"|"+sdf.format(getCurrentWatermark().getTimestamp())+"]");
                  return timestamp;
              }
          });
    
          //保存被丢弃的数据
          OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("late-data"){};
          //注意,由于getSideOutput方法是SingleOutputStreamOperator子类中的特有方法,所以这里的类型,不能使用它的父类dataStream。
          SingleOutputStreamOperator<String> window = waterMarkStream.keyBy(0)
                  .window(TumblingEventTimeWindows.of(Time.seconds(3)))//按照消息的EventTime分配窗口,和调用TimeWindow效果一样
                  //.allowedLateness(Time.seconds(2))//允许数据迟到2秒
                  .sideOutputLateData(outputTag)
                  .apply(new WindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() {
                      /**
                       * 对window内的数据进行排序,保证数据的顺序
                       * @param tuple
                       * @param window
                       * @param input
                       * @param out
                       * @throws Exception
                       */
                      @Override
                      public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {
                          String key = tuple.toString();
                          List<Long> arrarList = new ArrayList<Long>();
                          Iterator<Tuple2<String, Long>> it = input.iterator();
                          while (it.hasNext()) {
                              Tuple2<String, Long> next = it.next();
                              arrarList.add(next.f1);
                          }
                          Collections.sort(arrarList);
                          SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                          String result = key + "," + arrarList.size() + "," + sdf.format(arrarList.get(0)) + "," + sdf.format(arrarList.get(arrarList.size() - 1))
                                  + "," + sdf.format(window.getStart()) + "," + sdf.format(window.getEnd());
                          out.collect(result);
                      }
                  });
          //把迟到的数据暂时打印到控制台,实际中可以保存到其他存储介质中
          DataStream<Tuple2<String, Long>> sideOutput = window.getSideOutput(outputTag);
          sideOutput.print();
          //测试-把结果打印到控制台即可
          window.print();
    
          //注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行
          env.execute("eventtime-watermark");
          }
      }
    复制代码

4.3 多并行度下的 watermark触发机制

4.3.1 先领会代码(感谢 github xuwei)

    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
    import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
    import org.apache.flink.streaming.api.watermark.Watermark;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    import org.apache.flink.util.OutputTag;
    
    import javax.annotation.Nullable;
    import java.text.SimpleDateFormat;
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.Iterator;
    import java.util.List;
    
    public class StreamingWindowWatermark2 {
    
        public static void main(String[] args) throws Exception {
            //定义socket的端口号
            int port = 9010;
            //获取运行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            //设置使用eventtime,默认是使用processtime
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
            //设置并行度为1,默认并行度是当前机器的cpu数量
            env.setParallelism(8);
    
            //连接socket获取输入的数据
            DataStream<String> text = env.socketTextStream("SparkMaster", port, "\n");
    
            //解析输入的数据
            DataStream<Tuple2<String, Long>> inputMap = text.map(new MapFunction<String, Tuple2<String, Long>>() {
                @Override
                public Tuple2<String, Long> map(String value) throws Exception {
                    String[] arr = value.split(",");
                    return new Tuple2<>(arr[0], Long.parseLong(arr[1]));
                }
            });
    
            //抽取timestamp和生成watermark
            DataStream<Tuple2<String, Long>> waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {
    
                Long currentMaxTimestamp = 0L;
                final Long maxOutOfOrderness = 10000L;// 最大允许的乱序时间是10s
    
                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                /**
                 * 定义生成watermark的逻辑
                 * 默认100ms被调用一次
                 */
                @Nullable
                @Override
                public Watermark getCurrentWatermark() {
                    return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
                }
    
                //定义如何提取timestamp
                @Override
                public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {
                    long timestamp = element.f1;
                    currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
                    long id = Thread.currentThread().getId();
                    System.out.println("作者:秦凯新 键值 :"+element.f0+"线程验证 :"+  id   +" , 事件事件:[ "+sdf.format(element.f1)+" ],currentMaxTimestamp:[ "+
                            sdf.format(currentMaxTimestamp)+" ],水印时间:[ "+sdf.format(getCurrentWatermark().getTimestamp())+" ]");                return timestamp;
                }
            });
    
            //保存被丢弃的数据
            OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("late-data"){};
            //注意,由于getSideOutput方法是SingleOutputStreamOperator子类中的特有方法,所以这里的类型,不能使用它的父类dataStream。
            SingleOutputStreamOperator<String> window = waterMarkStream.keyBy(0)
                    .window(TumblingEventTimeWindows.of(Time.seconds(3)))//按照消息的EventTime分配窗口,和调用TimeWindow效果一样
                    //.allowedLateness(Time.seconds(2))//允许数据迟到2秒
                    .sideOutputLateData(outputTag)
                    .apply(new WindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() {
                        /**
                         * 对window内的数据进行排序,保证数据的顺序
                         * @param tuple
                         * @param window
                         * @param input
                         * @param out
                         * @throws Exception
                         */
                        @Override
                        public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {
                            String key = tuple.toString();
                            List<Long> arrarList = new ArrayList<Long>();
                            Iterator<Tuple2<String, Long>> it = input.iterator();
                            while (it.hasNext()) {
                                Tuple2<String, Long> next = it.next();
                                arrarList.add(next.f1);
                            }
                            Collections.sort(arrarList);
                            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                            String result = "\n 作者:秦凯新 键值 : "+ key + "\n              触发窗内数据个数 : " + arrarList.size() + "\n              触发窗起始数据: " + sdf.format(arrarList.get(0)) + "\n              触发窗最后(可能是延时)数据:" + sdf.format(arrarList.get(arrarList.size() - 1))
                                    + "\n              实际窗起始和结束时间: " + sdf.format(window.getStart()) + "《----》" + sdf.format(window.getEnd()) + " \n \n ";
                            out.collect(result);
                        }
                    });
            //把迟到的数据暂时打印到控制台,实际中可以保存到其他存储介质中
            DataStream<Tuple2<String, Long>> sideOutput = window.getSideOutput(outputTag);
            sideOutput.print();
            //测试-把结果打印到控制台即可
            window.print();
    
            //注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行
            env.execute("eventtime-watermark");
    
        }
    }
复制代码

4.3.2 前面代码中设置了并行度为 1:

    env.setParallelism(1);
复制代码

如果这里不设置的话, 代码在运行的时候会默认读取本机 CPU 数量设置并行度。

下面我们来验证一下, 把代码中的并行度调整为 2:

    env.setParallelism(2);
复制代码
  • 发现玄机如下:在第二条事件时,其实已经达到窗的触发时机,但是因为并行度为2,只有等到最小

  • watermark 到的时候才会触发窗计算。发现线程44处理的是001和003 ,线程42处理的是0002,所以只有等到线程42到达后,水印才会起作用执行2018-10-01 10:11:33.000所在的窗。

      0001,1538359890000		2018-10-01 10:11:30
      0002,1538359903000		2018-10-01 10:11:43
      0003,1538359908000		2018-10-01 10:11:48
    复制代码

4.3.3 现在代码中设置了并行度为 8:

  • 发现 这 7 条数据都是被不同的线程处理的。 每个线程都有一个 watermark。且每一个线程都是基于自己接收数据的事件时间最大值。

  • 因此,导致到最后现在还没获取到最小的 watermark, 所以 window 无法被触发执行。

  • 只有所有的线程的最小watermark都满足watermark 时间 >= window_end_time时,触发历史窗才会执行。

      0001,1538359882000		2018-10-01 10:11:22
      0002,1538359886000		2018-10-01 10:11:26
      0003,1538359892000		2018-10-01 10:11:32
      0004,1538359893000		2018-10-01 10:11:33
      0005,1538359894000		2018-10-01 10:11:34
      0006,1538359896000		2018-10-01 10:11:36
      0007,1538359897000		2018-10-01 10:11:37
    复制代码

  • 当持续发生事件数据时。一旦所有线程都达到最低的窗触发时机时,就会进行窗触发执行了。输入数据如下:

      0007,1538359897000		2018-10-01 10:11:37
      0008,1538359897000		2018-10-01 10:11:37
      0009,1538359897000		2018-10-01 10:11:37
      0010,1538359897000		2018-10-01 10:11:37
      0011,1538359897000		2018-10-01 10:11:37
      0012,1538359897000		2018-10-01 10:11:37
      0013,1538359897000		2018-10-01 10:11:37
      0014,1538359897000		2018-10-01 10:11:37
      0015,1538359897000		2018-10-01 10:11:37
    复制代码

5 再下一城

版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。版权声明:禁止转载,欢迎学习。QQ邮箱地址:1120746959@qq.com,如有任何问题,可随时联系。

截止到201811250141时间点,我的个人技术博客已经涵盖:

-《Spark Core商业源码实战系列目录》
-《SparkStreaming商业源码实战系列目录》
-《SparkSQL商业源码实战系列目录》
-《Spark商业应用实战系列目录》
-《Spark商业调优实战系列目录》
-《Spark商业ML实战系列目录》
-《Flink牛刀小试实战系列目录》
-《Hadoop商业环境实战系列目录》
-《kafka商业环境实战系列目录》
-《OLAP商业环境实战系列目录》
-《DW商业环境实战系列目录》
复制代码

博客原创作品达到64篇高质量的精品,在这里给自己的坚持加油,送上一句豪情万丈。

秦凯新 于深圳

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink Window分析及Watermark解决乱序数据机制深入剖析-Flink牛刀小试 的相关文章

  • Eclipse插件开发首选项篇

    介绍 如果你的插件需要保存一些数据 xff08 参数 xff09 xff0c 比如要设置一些ip地址等等 这时候 xff0c 就要用到Eclipse提供的首选项这个扩展点 我们这里的首选项的数据类型只包括Java中的基本数据类型 扩展点 o
  • java 判断字符串是否是json格式

    2019独角兽企业重金招聘Python工程师标准 gt gt gt 两种实现方式 xff1a 1 通过fastjson解析来判断 xff0c 解析成功 xff0c 是json格式 xff1b 否则 xff0c 不是json格式 xff1a
  • Error response from daemon: Error processing tar file(exit status 1): remove /hosts: device or resou

    2019独角兽企业重金招聘Python工程师标准 gt gt gt You cannot copy over etc hosts Docker provides the container with a custom etc hosts f
  • Jquery动态添加标签元素,在指定标签前或者标签后(append/prepend用法)

    Jquery动态添加标签元素 在指定标签前或者标签后 xff08 append prepend用法 xff09 1 append 方法在被选元素的结尾插入指定内容 2 appendTo 方法在被选元素的结尾插入 HTML 元素 3 prep
  • ASP.NET中常用正则表达式

    34 d 43 34 非负整数 xff08 正整数 43 0 xff09 34 0 9 1 9 0 9 34 正整数 34 d 43 0 43 34 非正整数 xff08 负整数 43 0 xff09 34 0 9 1 9 0 9 34 负
  • 总结 IOS 7 内存管理

    iOS7的一些总结 5 iOS中的内存管理 我们知道 xff0c 为了更加方便地处理内存管理问题 xff0c 将开发人员从繁琐的内存的分配和释放工作中解放出来而专注于产品和逻辑 xff0c iOS提供了一种有效的方法 xff0c 即自动引用
  • 《STL源码剖析》---list容器insert操作的个人理解

    最近在看STL源码剖析 xff0c 感觉还是挺深奥的 xff0c 感觉看不太懂 今天在看list容器这块 xff0c 讲到了insert操作 xff0c 便记录一番自己的理解吧 摘抄书上的 xff1a iterator insert ite
  • PROCESS_YIELD()宏和C语言的switch语句< contiki学习笔记之七>

    写在前面 xff1a 按照main 函数的代码一行一行的分析 xff0c 该是看到了 etimer process 这个位置 但是etimer process实现里的一个宏 PROCESS YIELD 引出了很多故事 xff0c 于是单独把
  • 用c语言指针实现vector,C使用指针将对象添加到Vector中

    我有一个向量添加包含 SDL Surface 指针作为数据成员的对象 xff0c 这意味着我必须使用复制构造函数来实现指针的深层复制 该对象释放析构函数中的表面 指针 xff0c 这就是问题发生的地方 当对象被添加到向量中时 通过按下按钮
  • 【Http认证方式】——Basic认证

    访问请求 xff1a http 192 168 2 113 8080 geoserver rest workspaces时 xff0c 浏览器弹出窗口需要输入用户名和密码 xff0c 并且 xff0c 如果不输入或者输入错误 xff0c 浏
  • c++ http请求

    平常我们要访问某个URL一般都是通过浏览器进行 xff1a 提交一个URL请求后 xff0c 浏览器将请求发向目标服务器或者代理服务器 xff0c 目标服务器或者代理服务器返回我们所需要的数据 xff0c 浏览器接收到这些数据后保存成文件并
  • libcurl实现http登录功能

    用Fiddler Web Debugger捕捉http数据包 xff1a 观察看看 xff0c POST请求的地址为http passport cnblogs com login aspx ReturnUrl 61 http 3a 2f 2
  • 服务器机柜和网络机柜的区别

    原文转载自 http www fwqtg net 服务器机柜 xff0c 用来组合安装面板 插件 插箱 电子元件 器件和机械零件与部件 xff0c 使其构成一个整体的安装箱 服务器机柜由框架和盖板 xff08 门 xff09 组成 xff0
  • Eclipse+Maven创建webapp项目<一>

    Eclipse 43 Maven创建webapp项目 lt 一 gt 1 开启eclipse xff0c 右键new other xff0c 如下图找到maven project 2 选择maven project xff0c 显示创建ma
  • java日期格式(年月日时分秒毫秒)

    package test remote tools combine import java text SimpleDateFormat import java util Calendar import java util Date impo
  • 游戏中的帧同步要求的计算一致性——定点数(Fixed Point)

    最近做了一款帧同步游戏 xff0c 其寻路算法采用了RVO算法 但是由于是移动端的游戏 需要在不同的设备上运行 xff0c 其所有运算必须符合一致性 即所有客户端运算出来的结果必须一致 但是由于浮点数的特性 xff0c 具有误差 xff0c
  • 敏捷测试驱动模式-项目质量保障体系

    结合敏捷项目管理 xff0c 测试驱动模式 xff0c 让测试跑起来 我给这套体系的定义就是 保障质量的同时保证项目进度 xff0c 四个节点及时反馈及时沟通 xff0c 有效的让产品 研发和测试都动起来 xff0c 避免任意一方的停滞 质
  • angularjs自定义指令函数传参

    问题描述 在编写导入指令的时候 xff0c 需要将函数绑定到指令中 xff0c 并传入一个参数 初步实现 首先指令的js文件如下 xff0c 基本的绑定参数和绑定函数 xff0c 没有什么说的 xff1a angular module 39
  • 浅谈JSONObject解析JSON数据

    个人博客同步文章 https mr houzi com 2018 06 根据一段天气API来说一下JSONObject如何解析json数据 xff0c 尽管现在在开发中使用Gson等 xff0c 对于像我这样初次使用Java做开发的小白 x
  • 能ping通,但是不能wget或curl

    2019独角兽企业重金招聘Python工程师标准 gt gt gt 当出现http接口请求超时时 xff0c 可以从以下几个方面排查问题 xff1a 检查接口服务本身是否正常 xff1b 检查接口所在服务器的防火墙是否开启 xff0c 尝试

随机推荐

  • R语言选择特定的行,对某一列排序

    R语言的数据框跟MySQL 中的表很像 根据某一列的特定值选择相应的行 d是个数据框 xff0c 有一列的名字是name d d name 61 61 34 95 34 这样就选中了 name为 95 的所有行 m 是个数据框 xff0c
  • excel表格公式无效、不生效的解决方案及常见问题、常用函数

    1 表格公式无效 不生效 使用公式时碰到了一个问题 xff0c 那就是公式明明已经编辑好了 xff0c 但是在单元格里不生效 xff0c 直接把公式显示出来了 xff0c 网上资料说有4种原因 xff0c 但是我4种都不是 xff0c 是第
  • JVM_栈详解一

    1 Java虚拟机栈 2 栈的存储单位 栈中存储什么 xff1f 每个线程都有自己的栈 xff0c 栈中的数据都是以栈帧 xff08 Stack Frame xff09 的格式存在 在这个线程上正在执行的每个方法都各自对应一个栈帧 xff0
  • EntLib 3.1学习笔记(6) : Security Application Block

    http www microsoft com china MSDN library enterprisedevelopment softwaredev dnpag2entlib mspx mfr 61 true http msdn2 mic
  • Delphi文件操作所涉及的一些函数 附例子

    判断文件是否存在 FileExists 判断文件夹是否存在 DirectoryExists 删除文件 DeleteFile Windows DeleteFile 删除文件夹 RemoveDir RemoveDirectory 获取当前文件夹
  • 排序算法

    include lt iostream gt include lt cstdlib gt include lt cstdio gt include lt time h gt using namespace std 插入排序 void Ins
  • C++应用中调用YOLOv3(darknet)进行目标检测

    YOLOv3论文 xff1a https pjreddie com media files papers YOLOv3 pdf 官网和代码 xff1a https pjreddie com darknet yolo属于one stage x
  • DJI开发之航线重叠率的计算

    介绍 无人机在规划一块区域的时候 xff0c 我们需要手动的给予一些参数来影响无人机飞行 xff0c 对于一块地表 xff0c 无人机每隔N秒在空中间隔的拍照地表的一块区域 xff0c 在整个任务执行结束后 xff0c 拍到的所有区域照片能
  • MODBUS MASTER RTU在STM32上的实现

    MODBUS MASTER RTU在STM32上的实现 1 概述 最近需要将几个信号采集模块通过总线串联起来 xff0c 这样便于系统模块化 故将目光关注到了工业上经常使用的modbus协议 modbus协议是一种一主多从的拓扑结构 xff
  • 基于HttpClient的HttpUtils(后台访问URL)

    最近做在线支付时遇到需要以后台方式访问URL并获取其返回的数据的问题 xff0c 在网络上g了一把 xff0c 发现在常用的还是Apache的HttpClient 因为以经常要用到的原故 xff0c 因此我对其进行了一些简单的封装 xff0
  • micropython安装ros_ROS2与STM32入门教程-microROS的freertos版本

    ROS2与STM32入门教程 micro ros的freertos版本 说明 xff1a 介绍如何安装使用micro ros 测试开发板 xff1a olimex stm32 e407 步骤 xff1a 安装ros2版本foxy xff0c
  • C#中通过com组件操作excel不能关闭的问题

    问题 xff1a 当用如下代码操作完Excel xff0c 虽然调用了Application的Quit 方法 xff0c 但发现Excel进程并没退出 object missing 61 System Reflection Missing
  • 交叉编译的概念及交叉编译工具的安装

    目录 一 什么是交叉编译 二 为什么要交叉编译 xff1f 三 交叉编译链的安装 四 相关使用方法 五 软连接 一 什么是交叉编译 交叉编译是指将一种编程语言编写的程序编译成另一种编程语言的程序 xff0c 通常是在不同的操作系统或硬件环境
  • .cn根服务器被攻击之后

    如果是互联网行业的人员应该知道 xff0c 8月25日凌晨 xff0c 大批的 cn 域名的网站都无法访问 xff0c 当然包括weibo cn等大型网站 个人比较奇怪的一件事情是 xff0c 微博PC网页版是 www weibo com
  • [UML]UML系列——包图Package

    系列文章 UML UML系列 用例图Use Case UML UML系列 用例图中的各种关系 xff08 include extend xff09 UML UML系列 类图Class UML UML系列 类图class的关联关系 xff08
  • VBA编程中的 sheet1 与 sheets(1)的区别

    自己理解 sheet1是一个专有名词 xff0c 不是任何对象的属性 xff0c 只能单独使用 xff0c 特指代码所在工作簿的那个sheet1 和顺序无关 xff0c 是固定的一个表 xff0c sheets 1 则和顺序有关 参考资料
  • python练习笔记——计算1/1-1/3+1/5-1/7……的和

    1 1 1 3 43 1 5 1 7 43 求100000个这样的分式计算之为是多少 xff1f 将此值乘以4后打印出来 xff0c 看看是什么 xff1f num list 61 count 61 1 i 61 1 while True
  • Django Model获取指定列的数据

    model一般都是有多个属性的 xff0c 但是很多时候我们又只需要查询特定的某一个 xff0c 这个时候可以用到values和values list 利用values查询 from attendence models import Emp
  • HIVE自定义函数的扩展

    作者简介 淳敏 xff0c 物流架构师同时也是一位team leader xff0c 工作认真负责 xff0c 曾在休假期间 面向大海编程 xff0c 不明觉厉 在Hive中 xff0c 用户可以自定义一些函数 xff0c 用于扩展Hive
  • Flink Window分析及Watermark解决乱序数据机制深入剖析-Flink牛刀小试

    版权声明 xff1a 本套技术专栏是作者 xff08 秦凯新 xff09 平时工作的总结和升华 xff0c 通过从真实商业环境抽取案例进行总结和分享 xff0c 并给出商业应用的调优建议和集群环境容量规划等内容 xff0c 请持续关注本套博