web123456

flink The last window has no new data, and the window is not closed

public static class WatermarkDemoFunction implements WatermarkStrategy<JSONObject>{ private Tuple2<Long,Boolean> state = Tuple2.of(0L,true); @Override public WatermarkGenerator<JSONObject> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator<JSONObject>() { private long maxWatermark; @Override public void onEvent(JSONObject waterSensor, long l, WatermarkOutput watermarkOutput) { maxWatermark = Math.max(maxWatermark,waterSensor.getLong("ts")); state.f0 = System.currentTimeMillis(); System.out.println("maxWatermark is " + maxWatermark); state.f1 = false; } @Override public void onPeriodicEmit(WatermarkOutput watermarkOutput) { //Outline time long outOfTime = 3000L; if (maxWatermark - outOfTime <=0){ } else { // If there is no data within 10s, close the current window System.out.println("() - state.f0:" + (System.currentTimeMillis() - state.f0)); System.out.println("state.f1:" + state.f1); if (System.currentTimeMillis() - state.f0 >= 9000L && !state.f1){ watermarkOutput.emitWatermark(new Watermark(maxWatermark + 6000L)); state.f1 = true; System.out.println("Trigger window, maxWatermark + 6000L:" + (maxWatermark + 6000L)); } else { System.out.println("Send watermark normally"); watermarkOutput.emitWatermark(new Watermark(maxWatermark - outOfTime)); } } } }; } }