web123456

Flink 1.14.* version kafkaSource source code

@Internal public class KafkaFetcher<T> extends AbstractFetcher<T, TopicPartition> { //Threads that consume data from kafka server final KafkaConsumerThread consumerThread; //Constructor public KafkaFetcher(SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets, SerializedValue<WatermarkStrategy<T>> watermarkStrategy, ProcessingTimeService processingTimeProvider, long autoWatermarkInterval, ClassLoader userCodeClassLoader, String taskNameWithSubtasks, KafkaDeserializationSchema<T> deserializer, Properties kafkaProperties, long pollTimeout, MetricGroup subtaskMetricGroup, MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception { super(sourceContext, assignedPartitionsWithInitialOffsets, watermarkStrategy, processingTimeProvider, autoWatermarkInterval, userCodeClassLoader, consumerMetricGroup, useMetrics); this.deserializer = deserializer; this.handover = new Handover(); this.consumerThread = new KafkaConsumerThread(LOG, this.handover, kafkaProperties, this.unassignedPartitionsQueue, this.getFetcherName() + " for " + taskNameWithSubtasks, pollTimeout, useMetrics, consumerMetricGroup, subtaskMetricGroup); this.kafkaCollector = new KafkaFetcher.KafkaCollector(); } public void runFetchLoop() throws Exception { try { //Start a separate consumption thread and consume data from kafka server to handover this.consumerThread.start(); //Get data from handover to perform deserialization, and while(this.running) { ConsumerRecords<byte[], byte[]> records = this.handover.pollNext(); Iterator var2 = this.subscribedPartitionStates().iterator(); while(var2.hasNext()) { KafkaTopicPartitionState<T, TopicPartition> partition = (KafkaTopicPartitionState)var2.next(); List<ConsumerRecord<byte[], byte[]>> partitionRecords = records.records((TopicPartition)partition.getKafkaPartitionHandle()); this.partitionConsumerRecordsHandler(partitionRecords, partition); } } } finally { this.consumerThread.shutdown(); } try { this.consumerThread.join(); } catch (InterruptedException var8) { Thread.currentThread().interrupt(); } } protected void partitionConsumerRecordsHandler(List<ConsumerRecord<byte[], byte[]>> partitionRecords, KafkaTopicPartitionState<T, TopicPartition> partition) throws Exception { Iterator var3 = partitionRecords.iterator(); while(var3.hasNext()) { ConsumerRecord<byte[], byte[]> record = (ConsumerRecord)var3.next(); //Deserialization operation, deserializer is customized by R&D, that is, the DeserializationSchema deserializationSchema at the beginning of the article this.deserializer.deserialize(record, this.kafkaCollector); this.emitRecordsWithTimestamps(this.kafkaCollector.getRecords(), partition, record.offset(), record.timestamp()); if (this.kafkaCollector.isEndOfStreamSignalled()) { this.running = false; break; } } } } @Internal public abstract class AbstractFetcher<T, KPH> { protected void emitRecordsWithTimestamps(Queue<T> records, KafkaTopicPartitionState<T, KPH> partitionState, long offset, long kafkaEventTimestamp) { synchronized(this.checkpointLock) { Object record; while((record = records.poll()) != null) { //Get the timestamp long timestamp = partitionState.extractTimestamp(record, kafkaEventTimestamp); //Send to the downstream, that is, the source ends. this.sourceContext.collectWithTimestamp(record, timestamp); //Save partition status partitionState.onEvent(record, timestamp); } //Submit the status of the site to partition partitionState.setOffset(offset); } } }