import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
/**
* @author liu a fu
* @version 1.0
* @date 2021/3/7 0007
* @DESC TODO: Window statistics case demonstration: Sliding time window (Sliding Time Window), real-time traffic statistics at traffic junctions
*
* In the Window window case, especially in the TimeWindow,
* If you do not see the `Window` size (start time, end time), use the `apply` function to get the window size.
*/
public class StreamSlidingTimeWindowApply {
public static void main(String[] args) throws Exception {
// 1. Execution environment-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2. Data source-source
DataStreamSource<String> inputDataStream = env.socketTextStream("", 9999);
// 3. Data conversion-transformation
/*
data:
a,3
a,2
a,7
d,9
b,6
a,5
b,3
e,7
e,4
*/
SingleOutputStreamOperator<Tuple2<String, Integer>> mapDataStream = inputDataStream
.filter(line -> null != line && line.trim().split(",").length == 2)
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String line) throws Exception {
String[] split = line.trim().split(",");
return new Tuple2<String, Integer>(split[0], Integer.parseInt(split[1]));
}
});
// TODO: First group according to the bayonet, then perform window operation, and finally aggregate and accumulate
SingleOutputStreamOperator<Tuple2<String, Integer>> sumDataStream = mapDataStream
.keyBy(0) // Subscript index, bayonet number
// public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide)
.timeWindow(Time.seconds(10),Time.seconds(5)) // TODO: Sliding time window, counting the data for the last 10 seconds every 5 seconds
.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
private FastDateFormat format = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss") ;
@Override
public void apply(Tuple tuple, // Represents a grouping key, encapsulated into a tuple
TimeWindow window, // Time window, get startTime and endTime
Iterable<Tuple2<String, Integer>> input, // Data in the window, perform aggregation operation
Collector<Tuple2<String, Integer>> out) throws Exception {
// Get grouping key
String key = (String)((Tuple1)tuple).f0 ;
// Get the window start time and end time
long start = window.getStart();
long end = window.getEnd();
// Output content
String output = "[" + format.format(start) + "~" + format.format(end) + "] -> " + key ;
// Aggregate the data in the window
int count = 0 ;
for (Tuple2<String, Integer> item: input){
count += item.f1 ;
}
// The final output
out.collect(Tuple2.of(output, count));
}
});
// 4. Data terminal-sink
sumDataStream.printToErr();
// 5. Trigger execution -execute
env.execute(StreamSlidingTimeWindowApply.class.getSimpleName()) ;
}
}