web123456

Detailed explanation of window and apply methods in Flink Window

import org.apache.commons.lang3.time.FastDateFormat; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; /** * @author liu a fu * @version 1.0 * @date 2021/3/7 0007 * @DESC TODO: Window statistics case demonstration: Sliding time window (Sliding Time Window), real-time traffic statistics at traffic junctions * * In the Window window case, especially in the TimeWindow, * If you do not see the `Window` size (start time, end time), use the `apply` function to get the window size. */ public class StreamSlidingTimeWindowApply { public static void main(String[] args) throws Exception { // 1. Execution environment-env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 2. Data source-source DataStreamSource<String> inputDataStream = env.socketTextStream("", 9999); // 3. Data conversion-transformation /* data: a,3 a,2 a,7 d,9 b,6 a,5 b,3 e,7 e,4 */ SingleOutputStreamOperator<Tuple2<String, Integer>> mapDataStream = inputDataStream .filter(line -> null != line && line.trim().split(",").length == 2) .map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String line) throws Exception { String[] split = line.trim().split(","); return new Tuple2<String, Integer>(split[0], Integer.parseInt(split[1])); } }); // TODO: First group according to the bayonet, then perform window operation, and finally aggregate and accumulate SingleOutputStreamOperator<Tuple2<String, Integer>> sumDataStream = mapDataStream .keyBy(0) // Subscript index, bayonet number // public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) .timeWindow(Time.seconds(10),Time.seconds(5)) // TODO: Sliding time window, counting the data for the last 10 seconds every 5 seconds .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() { private FastDateFormat format = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss") ; @Override public void apply(Tuple tuple, // Represents a grouping key, encapsulated into a tuple TimeWindow window, // Time window, get startTime and endTime Iterable<Tuple2<String, Integer>> input, // Data in the window, perform aggregation operation Collector<Tuple2<String, Integer>> out) throws Exception { // Get grouping key String key = (String)((Tuple1)tuple).f0 ; // Get the window start time and end time long start = window.getStart(); long end = window.getEnd(); // Output content String output = "[" + format.format(start) + "~" + format.format(end) + "] -> " + key ; // Aggregate the data in the window int count = 0 ; for (Tuple2<String, Integer> item: input){ count += item.f1 ; } // The final output out.collect(Tuple2.of(output, count)); } }); // 4. Data terminal-sink sumDataStream.printToErr(); // 5. Trigger execution -execute env.execute(StreamSlidingTimeWindowApply.class.getSimpleName()) ; } }