Producerofclientend
Sample code
Properties props = new Properties();
("", "localhost:9092");
("", "DemoProducer");
("", ".ByteArraySerializer");
("", ".ByteArraySerializer");
producer = new KafkaProducer<Integer, String>(props);
this.topic = topic;
this.isAsync = isAsync;
String messageStr = "Message_";
long startTime = System.currentTimeMillis();
if (isAsync) {
Asynchronous processing, this process requires defining a callback function to listen for the response result of the sent message.
// Send asynchronously
producer.send(new ProducerRecord<byte[], byte[]>(topic,
()/*key*/,
()/*value*/),
/*Async processing, callback function*/
new DemoCallBack(startTime, messageNo, messageStr));
} else {
Synchronous processing, after the transmission is completed, wait for the sent response result.
// Send synchronously
try {
producer.send(new ProducerRecord<Integer, String>(topic,
()/*key*/,
()/*value*/)).get();
System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
} catch (InterruptedException e) {
();
} catch (ExecutionException e) {
();
}
}
Callbacks about asynchronous processingfunctiondefinition:
This callback function implementation needs to be implementedorg.interface.
class DemoCallBack implements Callback
And implement the functions in the interface:
public void onCompletion(RecordMetadata metadata, Exception exception) {
The startTime here is the start time of the message being sent when the callback function is generated when sending this message.
Calculated the time it took to send this message this time
long elapsedTime = System.currentTimeMillis() - startTime;
if (metadata != null) {
If the metadata information is not empty, it means that the message is added successfully, and you can get the offset of the message that is currently added successfully.
System.out.println(
"message(" + key + ", " + message + ") sent to partition("
+ () +
"), " +
"offset(" + () + ") in " + elapsedTime + " ms");
} else {
In this case, it means that the exception has a value, that is, the addition of the message failed, and the content of the failed message can be printed directly.
();
}
}
ClientGeneration and processing flow of the end
generateKafkaProducerExample
1. First, take a look at the KafkaProducer instance generation:
According to the incoming properties configuration information, a config instance for Producer is generated.
this(new ProducerConfig(properties), null, null);
2. Analyze the necessary configuration items:
2,1, Configuration Items, used to mark an encoded value on the client side, the default value isproducer-1. If there is no specified in the same process, the value 1 is increased backwards according to the default value.
2,2, configuration items, configure the allocation operator instance used to calculate the partition corresponding to this data when the producer writes data. This instance must bePartitioneraccomplish. When the instance is initialized, the configuration function will be called to pass the configuration file in. It is used for instance generation. By default, the partition operator isDefaultPartitioner. This default operator performs murmur2 hash based on the current key value and modulates the number of the corresponding topic. If the key is null, modulates based on the value of an autoincrement integer and the number of the partition.
2,3, Configuration Items, used for the retry interval time after the failed sending data to the broker, the default value is 100ms
2,4, Configuration Items, used to configure the expiration time of the metadata of each producer side cache topic, the default value is 5 minutes. Configure the above configurations of 2, 3, and 2, 4 to generate a Metadata instance.
2,5, Configuration Items, used to configure the maximum number of bytes per producer request, the default value is 1MB.
2,6, Configuration Items, used to configure the size of the buffer that the producer side waits for data sent to the server, the default value is 32MB.
2,7, Configuration Items, the default value is none, the compression algorithm used to configure data, the default is non-compressed, the configurable value isnone,gzip,snappy,lz4。
2,8, configuration items, used to configure send data or partitionFor function to get the corresponding leader, the maximum waiting time is 60 seconds.
2,9, Configuration Items, used to configure the maximum timeout for socket requests, the default value is 30 seconds.
3. Generate a record accumulator, which is an example used to buffer the data to be sent by the producer:
this.accumulator = new RecordAccumulator(
(ProducerConfig.BATCH_SIZE_CONFIG),
this.totalMemorySize,
this.compressionType,
(ProducerConfig.LINGER_MS_CONFIG),
retryBackoffMs,
metrics,
time,
metricTags);
3,1, Configuration required by RecordAccumulator instance:
3,1,1 configuration items, the batch byte size used for batch commits, the default value is 16384.
3, 1, 2 configuration items, This configuration is used in conjunction with 3, 1, 1, and is used to configure the maximum delay time of data cache, with a default value of 0.
Other configuration items that depend on 3, 1, 3: 2, 6 2, 7 2, 3.
4. According to configuration items, multiple configurations are separated by commas,
Generate a socket requestInetSocketAddressAn instance collection.
4,1 and generate an instance of Cluster based on the configured broker connection address set. Update cluster instance to metadata instance.
5. GenerateNetworkClientInstance, this instance is used to communicate socket with each broker, generate a Sender instance for data transmission, and generate a KafkaThread thread for data transmission and start.
6. According to configuration items/, generate a serialized instance of key and value, this instance must beSerializerImplementation of .
KafkaThreadThread initialization
generateNetworkClientConfiguration items required by the instance:
1. Configuration Items, the default value is 9 minutes, which is used to set the maximum idle time for the connection.
2. Configuration items, the default value is 5, which is used to set the maximum number of requests per connection
3. Configuration items, the default value is 50ms, which is used to set the waiting time for retrying the connection.
4. Configuration items, the default value is 128kb, used to set the socket's send bufferSO_SNDBUFsize.
5. Configuration items, the default value is 32kb, which is used to set the buffer for receiving responses of the socket.SO_RCVBUFsize.
6,Configuration Items, used to configure the maximum timeout for socket requests, the default value is 30 seconds.
NetworkClient client = new NetworkClient(
new Selector(
(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
this.metrics, time, "producer", metricTags, channelBuilder),
this.metadata,
clientId,
(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
(ProducerConfig.SEND_BUFFER_CONFIG),
(ProducerConfig.RECEIVE_BUFFER_CONFIG),
this.requestTimeoutMs, time);
Sender is a thread used to send data.
Required configuration items:
1,Configuration Items, used to configure the maximum number of bytes per producer request, the default value is 1MB.
2,Configuration Itemsacks, the default value is 1, and the types of acks to be configured, -1, 0, and 1 are used to configure the requested three types.
3. Configuration itemsretries, the default value is 0, which is used to configure the number of retries that failed to send.
this.sender = new Sender(client,
this.metadata,
this.accumulator,
(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
(short) parseAcks((ProducerConfig.ACKS_CONFIG)),
(ProducerConfig.RETRIES_CONFIG),
this.metrics,
new SystemTime(),
clientId,
this.requestTimeoutMs);
String ioThreadName = "kafka-producer-network-thread"
+ (clientId.length() > 0 ? " | " + clientId : "");
Here is a Sender instance for starting a threaded Sender instance for sending data in the producer.
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
passproducerSend data
Producerofsendfunction
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return send(record, null);
}
If you need to consider the callback processing of successful data transmission, you need to implement Callback.
public Future<RecordMetadata> send(ProducerRecord<K, V> record,
Callback callback) {
try {
Here, the metadata information corresponding to the topic is obtained based on the name of the topic of the requested record, and here it is obtained through the Metadata instance. The return value of the function is the reading time of the metadata information of the topic.
1. From the metadata instancetopicsCheck whether this topic exists in the collection. If it does not exist, add this topic to the collection.
2. From the corresponding Cluster instance of metadata (the connection information of each broker is stored here)partitionsByTopicThe set of partition information corresponding to the topic is obtained based on the topic in the set. If there is a record of corresponding partitions in the partitionsByTopic, it means that the metadata information of the topic has been loaded and the function directly returns 0.
3. If the current topic does not have corresponding partitions information in metadata, according toThe maximum waiting time configured, a broker's connection is randomly taken out through the connection of each broker. If the broker's connection does not exist, this connection will be created and a Topic will be initiated to the broker.MetadataRequestRequest to get the metadata information corresponding to this topic.
// first make sure the metadata for the topic is available
long waitedOnMetadataMs = waitOnMetadata((),
this.maxBlockTimeMs);
The total waiting time is obtained here. After deleting the time taken to obtain metadata information, it can also be used to wait for data to be added to the sending queue to process the waiting time.
long remainingWaitMs = Math.max(0, this.maxBlockTimeMs -
waitedOnMetadataMs);
Serialize the passed key and value, and obtain the serialized key and value of byte array.
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize((), ());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class "
+ ().getClass().getName() +
" to class " + producerConfig.getClass(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in ");
}
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize((),
());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " +
().getClass().getName() +
" to class " + producerConfig.getClass(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in ");
}
Get the corresponding partition of this record, and generate a TopicPartition based on this partition.
When obtaining the corresponding partition, if the incoming parameter contains the id of the partition, it is necessary to determine whether the value of the partition is within the specified range. If the partition is not passed in, the id value of a partition is generated based on the kv information of the record through the specified partitioner instance.
int partition = partition(record, serializedKey, serializedValue,
metadata.fetch());
Get the length of a record, the length of this record is size(4), offset(8), crc(4), magic(1), attr(1),
Keysize(4),key,valuesize(4),value
int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey,
serializedValue);
ensureValidRecordSize(serializedSize);
TopicPartition tp = new TopicPartition((), partition);
log.trace("Sending record {} with callback {} to topic {} partition {}",
record, callback, (), partition);
Write this message to the client's message buffer.
result = accumulator.append(tp,
serializedKey, serializedValue, callback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
If the size of the batch of the current buffer is full, or if a batch is regenerated in this buffer, wake up the sender thread and let the sender's run function continue to execute, completing the data sending operation.
log.trace("Waking up the sender since topic {} partition {} is either full
or getting a new batch", (), partition);
this.sender.wakeup();
}
return result.future;
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
} catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
if (callback != null)
(null, e);
this.errors.record();
return new FutureFailure(e);
} catch (InterruptedException e) {
this.errors.record();
throw new InterruptException(e);
} catch (BufferExhaustedException e) {
this.errors.record();
this.metrics.sensor("buffer-exhausted-records").record();
throw e;
} catch (KafkaException e) {
this.errors.record();
throw e;
}
}
ProducerThe buffer ofappend
When executing the send function of the producer, it does not directly initiate a network request to the socket, but first stores the data in the sent buffer. The implementation of this buffer is aRecordAccumulatorExample.
The configuration items required when this instance is generated:
Configuration Items, the batch byte size used for batch commits, the default value is 16384.
Configuration Items, This configuration is used in conjunction with 3, 1, 1, and is used to configure the maximum delay time of data cache, with a default value of 0.
Configuration Items, used to configure the size of the buffer that the producer side waits for data sent to the server, the default value is 32MB.
Configuration Items, the default value is none, the compression algorithm used to configure data, the default is non-compressed, the configurable value isnone,gzip,snappy,lz4。
Configuration Items, used for the retry interval time after the failed sending data to the broker, the default value is 100ms
Next, take a look at the append function used to add data to the buffer
public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback, long maxTimeToBlock) throws InterruptedException {
appendsInProgress.incrementAndGet();
try {
if (closed)
throw new IllegalStateException(
"Cannot send after the producer is closed.");
First, we get the double-ended queue of RecordBatch corresponding to this partition from the current batches collection.
If the double-ended queue corresponding to partition does not exist in the batchs collection, a queue instance of ArrayDeque is generated and placed in the batchs collection. This function returns a double-ended queue instance corresponding to this partition in the batchs collection.
// check if we have an in-progress batch
Deque<RecordBatch> dq = dequeFor(tp);
synchronized (dq) {
Get the last RecordBatch instance value in this queue. In an ArrayDeque double-ended queue, an array of 16 elements (multiple of 2) is generated by default when the instance is initialized. If it is addLast, it is added backward from 0, and if it is addFirst, it is added forward from the end of the array.
RecordBatch last = ();
if (last != null) {
If an instance of RecordBatch is obtained for storing batches in this double-ended queue, it means that there is information about the batch to be submitted in this queue. Add this kvy-value to this recordBatch.
This process will not be executed when the partition message is added for the first time.
Here, add this kv message to the last RecordBatch in the queue. When this process is executed, it means that there must be a record number greater than 0 records in this RecordBatch.
The process of adding messages to this RecordBatch:
1. Check whether this batch is in a writable state. When the Sender thread does not submit this batch, this value is true. When nothing is written in this buffer, the size of this buffer can put down the current kv.
2. Check whether the current memory location in this batch plus the size of the kv to be sent in exceeds the size of batchSize.
3. If the process is executed here, it means that this kv can be added to the RecordBatch, add this message to the RecordBatch, and return the information of the FutureRecordMetadata of the RecordBatch.
4. If the current RecordBatch does not have enough space to store this kv, the future returned here is a null value.
FutureRecordMetadata future = (key, value, callback,
time.milliseconds());
If the value of future is not empty, it means that the message is successfully added to this buffer, check whether the number of RecordBatches in this queue is greater than 1 or whether the current RecordBatch size has reached the situation where it cannot be written. If one of these two cases is satisfied, then generate thisThe second parameter of RecordAppendResult is true, otherwise it is false. The third parameter is added because in this case, a buffe in the queue obtained directly is added, and it is not a newly created RecordBatch, the value here is false.
if (future != null)
return new RecordAppendResult(future, () > 1 ||
last.records.isFull(), false);
}
}
If the message is added to the buffer for the first time in the double-ended queue of the corresponding partition, or the message stored in the last RecordBatch in the current double-ended queue has reached the batch size, a RecordBatch needs to be regenerated to calculate the size of the batch storage.
// we don't have an in-progress record batch try to allocate a new batch
int size = Math.max(this.batchSize, Records.LOG_OVERHEAD +
Record.recordSize(key, value));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}",
size, (), ());
Allocate a buffer of a specified size from the memory pool, and the allocation process:
1. If the size applied is the same as the size of each batchSize (which can be understood as a memory page), and there is a cached buffer in the memory pool, directly retrieve the first buffer in the double-ended queue (using the move out method).
2. If the available memory in the current memory pool plus the size of all allocated memory pages (each memory page is the size of a batchSize) is greater than or equal to the size to be applied for, if the currently available memory is less than the size requested, the capacity of the allocated buffer cached in the double-ended queue of the memory pool is released (this process is an iterative process until the released memory reaches the size that can be stored. The last buffer is moved out in each iteration), the current available memory is subtracted from the allocated size, and a ByteBuffer instance is generated based on this size. Return to this ByteBuffer.
3. This situation means that the current buffer has no sufficient size to allocate the buffer. It is iterated through while until the available memory reaches the size. Each iteration of the current send thread will wait, wait for the Sender thread to submit the buffer and release it to wake up the thread.
3,1, After the thread is awakened, check if the current iteration of while is the first iteration, and the size to be allocated is exactly the size of the memory page. At the same time, there are more buffers in the free memory pages in the memory pool, and take out the first buffer of the double-ended queue in the memory pool. Stop iteration and return this buffer.
3,2 In this case, the thread of the send function has been awakened, but the size to be allocated is a size larger than the batchSize (memory page), if there is still a cached memory page in the current memory pool, the cached memory page buffer is released until it is freed (if more space cannot be released, while iterates again, the thread reenters the wait state, waiting for the wakeup after submission). In this case, the returned buffer will reapply a ByteBuffer according to the size size and return. After allocating the available memory space, the available memory in the current memory pool is greater than 0, or the memory pool has a cached memory page buffer, wake up the next waiting thread.
ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {
if (closed)
throw new IllegalStateException(
"Cannot send after the producer is closed.");
If an instance of RecordBatch is obtained for storing batches in this double-ended queue, it means that there is information about the batch to be submitted in this queue. Add this kvy-value to this recordBatch.
This process will not be executed when the partition message is added for the first time.
Here, add this kv message to the last RecordBatch in the queue. When this process is executed, it means that there must be a record number greater than 0 records in this RecordBatch.
The process of adding messages to this RecordBatch:
1. Check whether this batch is in a writable state. When the Sender thread does not submit this batch, this value is true. When nothing is written in this buffer, the size of this buffer can put down the current kv.
2. Check whether the current memory location in this batch plus the size of the kv to be sent in exceeds the size of batchSize.
3. If the process is executed here, it means that this kv can be added to the RecordBatch, add this message to the RecordBatch, and return the information of the FutureRecordMetadata of the RecordBatch.
4. If the current RecordBatch does not have enough space to store this kv, the future returned here is a nul value.
RecordBatch last = ();
if (last != null) {
FutureRecordMetadata future = (key, value, callback,
time.milliseconds());
The following future is not empty usually should not happen. If it happens, the currently applied buffer will be released. And put this buffer into the cache queue of this memory pool, and it is used for the next use, and there is no need to repeatedly apply for memory.
If the value of future is not empty, it means that the message is successfully added to this buffer, check whether the number of RecordBatches in this queue is greater than 1 or whether the current RecordBatch size has reached the situation where it cannot be written. If one of these two cases is satisfied, then generate thisThe second parameter of RecordAppendResult is true, otherwise it is false. The third parameter is added because in this case, a buffe in the queue obtained directly is added, and it is not a newly created RecordBatch, the value here is false.
if (future != null) {
free.deallocate(buffer);
return new RecordAppendResult(future, () > 1 ||
last.records.isFull(), false);
}
}
The process is executed here, indicating that the last RecordBatch of the corresponding double-ended queue in the current partition cannot store the size of this kv, or that there is no RecordBatch in the current queue. A new Records are generated to store the message set to be sent, and a RecordBatch is generated to classify the records, and the KV to be sent is added to this record,
and generate oneFutureRecordMetadataThe instance returns,
In this instance, a corresponding recordBatch is referenced.ProduceRequestResultExample, this example is used in the Sender thread to control whether the batch is submitted successfully. The second parameter is to indicate the number of messages that have been stored in the current RecordBatch. When adding messages through RecordBatch (tryAppend function), an instance of FutureRecordMetadata will definitely be returned in the processing of this process, because this is the first time that the buffer in RecordBatch is defined to be able to store this kv or this buffer can not only store this kv.
MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression,
this.batchSize);
RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
FutureRecordMetadata future = Utils.notNull((key, value,
callback, time.milliseconds()));
Finally, add this newly generated batch to the queue.
(batch);
incomplete.add(batch);
Generate the information to be returned. This information contains future to control whether the submission is successful. The second parameter 1: If there are more than one batch in the queue, it means that the first batch is full, or the first batch can no longer store the kv sent in, and a new batch is created. 2: The records of the newly generated batch have already stored data greater than or equal to the batchSize.
==>If this value is true, it means that the above two cases of this parameter satisfy at least one.
The third parameter is true since this is a newly created RecordBatch.
return new RecordAppendResult(future, () > 1 ||
batch.records.isFull(), true);
}
} finally {
appendsInProgress.decrementAndGet();
}
}
SenderThread processing data transmission
The thread's run function:
/**
* The main run loop for the sender thread
*/
public void run() {
log.debug("Starting Kafka producer I/O thread.");
When the KafkaProducer instance is generated, after the KafkaThread thread is started, the run function in the Sender instance will be executed.
// main loop, runs until close is called
while (running) {
If the producer does not execute the shutdown operation, the run function will be executed at this place, and the run function will be continuously executed and the system time is passed to the current execution time.
try {
run(time.milliseconds());
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
If the process is executed here, it means that the producer has performed the shutdown operation and is ready to execute the operation to stop the producer.
log.debug("Beginning shutdown of Kafka producer I/O thread,
sending remaining records.");
// okay we stopped accepting requests but there may still be
// requests in the accumulator or waiting for acknowledgment,
// wait until these are completed.
while (!forceClose && (this.accumulator.hasUnsent() ||
this.client.inFlightRequestCount() > 0)) {
If the current accumulator buffer still has data not been processed, and there are still ongoing requests in the networkClient, iteratively execute the run function until all the data is sent.
try {
run(time.milliseconds());
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
if (forceClose) {
this.accumulator.abortIncompleteBatches();
}
Turn off the network connection.
try {
this.client.close();
} catch (Exception e) {
log.error("Failed to close network client", e);
}
log.debug("Shutdown of Kafka producer I/O thread has completed.");
}
Functions that perform the sending operation of buffer data:
/**
* Run a single iteration of sending
*
* @param now
* The current POSIX time in milliseconds
*/
public void run(long now) {
Get the connection information of all broker nodes in the current cluster.
Cluster cluster = metadata.fetch();
Here, calculate from all partition batches in the buffer, and retrieve the prepared set of brokers that need to be sent. The specific process:
1. Iterate the batchs set in the buffer, and take out the double-ended queue corresponding to each partition (the batch that stores data cache).
2. If the partition is in the cluster corresponding to thepartitionsByTopicPartitionExist in the set, indicating that the metadata of this topic has been loaded and gets the leader of this partition,
3. If the partition leader does not exist,
Set this function to returnReadyCheckResultType ofunknownLeadersExistThe value is true.
4. If the leader of the iterated partition exists, take out the first batch in the queue of this partition. If this batch exists, it means there is cached data.
4,1 Check whether this batch has been submitted, the number of retry times is greater than 0,
At the same time, the last retry time has been greater than(Default 100ms)The configured waiting time,
Add the leader of this partition to the returnedReadyCheckResultIn the examplereadyNodesin the collection.
(readyNodes is aset collection)
4,2 If the corresponding queue of this partition has more than one batch has been cached,
Or if the cache size of batch has reached the configuration size of batchSize,
Add this leader to readyNodes.
4,3 If there is a batch in the queue of this partition, it has reached(Default value 0)The configured waiting time,
Add this leader to readyNodes.
5, this returnsReadyCheckResultIn the instance, propertiesnextReadyCheckDelayMsThe value of the value indicates the next time to delay, that is, the wait time of the next execution. If all the current batches do not exceed the waiting time (/), that is, the difference between the current execution time and the waiting time.
// get the list of partitions with data ready to send
result = this.accumulator.ready(cluster,
now);
If the above execution returns the resultunknownLeadersExistThe property value is true, which means that the topic's metadata has not been loaded yet (this case generally does not happen), and the marking metadata needs to be updated.
if (result.unknownLeadersExist)
this.metadata.requestUpdate();
The result set returnedreadyNodesThe prepared nodes in the collection are iterated. This while iteration mainly executes the following process:
1. Check whether this node has been connected through NetworkClient. At the same time, the metadata has not reached the time to be updated. At the same time, the number of the connection queue is less thanThe number of connections configured. Then this node will be retained.
2. If the connection of the currently iterated node has timed out, or the metadata needs to be updated, or the node corresponding broker has not yet created a connection, move out of this node.
// remove any nodes we aren't ready to send to
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (()) {
Node node = ();
if (!this.client.ready(node, now)) {
();
If the connection is a connection that has been closed,this.client.connectionDelay(node, now)The returned timeout is(50ms)The value of the configuration.
notReadyTimeout = Math.min(notReadyTimeout,
this.client.connectionDelay(node, now));
}
}
When the process is executed here, the result isreadyNodesThe collection contains information about nodes that have been connected to the broker.
Here, according to the nodes collection of brokers that can initiate connections, iterates over the queues of all partitions in each node.
Take out the first recordBatch in this queue (if the batch has failed to send once and the retry time interval has not yet reached, skip this batch), close the batch (indicates that the batch cannot be written) and add the batch to the map collection to be returned. This iterative process is until the first element of the queue in the corresponding partition in all nodes is found.
Or reachThe maximum number of requested message bytes is configured to end.
// create produce requests
Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
result.readyNodes,
this.maxRequestSize,
now);
Here according to(Default 30 seconds)The configured request timeout time is obtained, and the batch timeout of all requests in the buffer is obtained (the time of the last message written by the batch is determined by whether the timeout has been reached). If the batch has been found, the batch is removed from the buffer and the buffer corresponding to the batch is recycled.
List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(
this.requestTimeout, cluster, now);
// update sensors
for (RecordBatch expiredBatch : expiredBatches)
this.sensors.recordErrors(expiredBatch.topicPartition.topic(),
expiredBatch.recordCount);
sensors.updateProduceRequestMetrics(batches);
According to the message set of the batch of the corresponding partition of each broker, the correspondingProduceRequestask,
Each broker in this request generates a request, which contains a collection of all partition buffers in this broker.
List<ClientRequest> requests = createProduceRequests(batches, now);
Here is the waiting interval required for the next execution, according to/The configuration time. If the set of readyNodes that need to be submitted to the specified broker this time is greater than 0, set this interval time to 0.
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
if (result.readyNodes.size() > 0) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
log.trace("Created {} produce requests: {}", (), requests);
pollTimeout = 0;
}
Iterates over the Produce request of each broker and sends corresponding requests to each broker through the NetworkClient.
for (ClientRequest request : requests)
client.send(request, now);
Check whether metadata needs to be updated. If necessary, send a metadata request to the broker again and update metadata.
Receive the request response information and call the corresponding callback function.
Check if there is a specified time in the connection(Default 9 minutes)There is no active broker connection, if so, close this connection.
this.client.poll(pollTimeout, now);
}