在以下代码中:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        SingleOutputStreamOperator<Tuple2<String, Integer>> dataStream = env.fromElements(
            Tuple2.of("01", 1),
            Tuple2.of("02", 2),
            Tuple2.of("03", 3),
            Tuple2.of("04", 4),
            Tuple2.of("05", 5)
            ).assignTimestampsAndWatermarks(
                                WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps()
                                        .withTimestampAssigner(
                                                (tuple,  ts) -> System.currentTimeMillis()));
  env.execute("dfjghf")

 env.execute("gghh");

Flink-1.17.1, Java 11


在这条有界的溪流中将注入多少水印?是五点吗?

2) 如何打印这些水印元素?只是为了调试.

推荐答案

  1. forMonotonousTimestamps是一个周期性的水印生成器,默认情况下每200毫秒发出一次水印.这项工作不会持续足够长的时间,不会发生这种情况.因此,我预计在工作结束时只会有一个水印.(有关作业(job)结束水印的详细信息,请参阅this answer.)

  2. 可以在Flink Web用户界面中判断水印--这是调试它们的最简单方法.否则,您可以通过传递到键控进程函数的processElement on onTimer方法中的上下文来访问当前水印.

Java相关问答推荐

从头开始使用Jgit初始化InMemoryRepository

Jooq隐式地将bigint转换为数字,并且索引不起作用

虚拟线程似乎在外部服务调用时阻止运营商线程

如何在Java中声明未使用的变量?

Android Studio—java—在onBindViewHolder中,将断点和空白添加到BackclerView中

需要一个找不到的jakarta.sistence.EntityManager类型的Bean

在bash中将数组作为Java程序的参数传递

Hibernate EmptyInterceptor可以工作,但不能拦截器

使用REST客户端和对象映射器从字符串反序列化Json

基于调车场算法的科学计算器

Spring Data JPA慢慢地创建了太多非活动会话

为什么我的回收视图会显示重复的列表?

如何在不删除Java中已有内容的情况下覆盖文件内容?

如果List是一个抽象接口,那么Collectors.toList()如何处理流呢?

如何使这两种方法合二为一?

如何在运行docker的应用程序中获取指定的配置文件

控制器建议异常处理

Intellij 2023 IDE:始终在顶部显示菜单栏

[jdk21][Foreign Function&;Memory API]MemoryLayout::varHandle通过可变数组进行 struct 化的问题

ControlsFX RangeSlider在方向垂直时滞后