web123456

F link CDC

package online.shuihua; import com.alibaba.fastjson.JSONObject; import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource; import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions; import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction; import io.debezium.data.Envelope; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import java.util.Properties; public class Flink_CDCWithCustomerSchema { //1. Create an execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2. Create the Source of Flink-MySQL-CDC Properties properties = new Properties(); //initial (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog. //latest-offset: Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the binlog which means only have the changes since the connector was started. //timestamp: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified timestamp. The consumer will traverse the binlog from the beginning and ignore change events whose timestamp is smaller than the specified timestamp. //specific-offset: Never to perform snapshot on the monitored database tables upon //first startup, and directly read binlog from the specified offset. DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder() .hostname("hadoop102") .port(3306) .username("root") .password("000000") .databaseList("gmall-flink") .tableList("gmall-flink.z_user_info") // Optional configuration item. If this parameter is not specified, the data of all tables under the previous configuration will be read. Note: When specifying, you need to use the "" method. .startupOptions(StartupOptions.initial()) .deserializer(new DebeziumDeserializationSchema<String>() { //Custom Data Parser @Override public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception { //Get topic information, including the database and table name mysql_binlog_source.gmall-flink.z_user_info String topic = sourceRecord.topic(); String[] arr = topic.split("\\."); String db = arr[1]; String tableName = arr[2]; //Get operation type READ DELETE UPDATE CREATE Envelope.Operation operation = Envelope.operationFor(sourceRecord); //Get value information and convert it to Struct type Struct value = (Struct) sourceRecord.value(); //Get changed data Struct after = value.getStruct("after"); //Create JSON objects to store data information JSONObject data = new JSONObject(); for (Field field : after.schema().fields()) { Object o = after.get(field); data.put(field.name(), o); } //Create JSON object to encapsulate the final return value data information JSONObject result = new JSONObject(); result.put("operation", operation.toString().toLowerCase()); result.put("data", data); result.put("database", db); result.put("table", tableName); //Send data to downstream collector.collect(result.toJSONString()); } @Override public TypeInformation<String> getProducedType() { return TypeInformation.of(String.class); } }) .build(); //3. Read data from MySQL using CDC Source DataStreamSource<String> mysqlDS = env.addSource(mysqlSource); //4. Print data mysqlDS.print(); //5. Execute tasks env.execute(); } }