public static class WatermarkDemoFunction implements WatermarkStrategy<JSONObject>{
private Tuple2<Long,Boolean> state = Tuple2.of(0L,true);
@Override
public WatermarkGenerator<JSONObject> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator<JSONObject>() {
private long maxWatermark;
@Override
public void onEvent(JSONObject waterSensor, long l, WatermarkOutput watermarkOutput) {
maxWatermark = Math.max(maxWatermark,waterSensor.getLong("ts"));
state.f0 = System.currentTimeMillis();
System.out.println("maxWatermark is " + maxWatermark);
state.f1 = false;
}
@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
//Outline time
long outOfTime = 3000L;
if (maxWatermark - outOfTime <=0){
} else {
// If there is no data within 10s, close the current window
System.out.println("() - state.f0:" + (System.currentTimeMillis() - state.f0));
System.out.println("state.f1:" + state.f1);
if (System.currentTimeMillis() - state.f0 >= 9000L && !state.f1){
watermarkOutput.emitWatermark(new Watermark(maxWatermark + 6000L));
state.f1 = true;
System.out.println("Trigger window, maxWatermark + 6000L:" + (maxWatermark + 6000L));
} else {
System.out.println("Send watermark normally");
watermarkOutput.emitWatermark(new Watermark(maxWatermark - outOfTime));
}
}
}
};
}
}