...
The other issue is the infinite retry mechanism for AddPartitionToTxn when being served as the starting request for a new transaction. Right now the EndTxn call returns immediately when the txn commit message is materialized in transaction log. However broker needs to propagate this information to all the affected parties to complete the transaction, which means there is a black-out period that no more transaction could be started. During this time, all the new AddPartitionToTxn calls shall be rejected and asked to retry, which again creates many unnecessary network trips.
A minor issue is that the current AddOffsetsToTxn API is only registering a consumer group id towards the transaction coordinator. Unless for certain advanced use cases where multiple consumer groups are behind one transactional producer, this shall also be deterministic and could be combined with AddPartitionToTxn to save a network trip as well.
Proposed Changes
Block Begin Transaction
We propose to make AddPartitionToTxn a blocking call when there is other pending transaction going on. This could be done by holding the response callback as part of the transaction marker completion in the purgatory. The goal is to reduce unnecessary bounces during the transaction marker completion.
Pre-register Output Partitions And Consumer Group
Considering the latest EOS example below:
...
As one could see, with the set of output records, developer could easily infer the output partitions. Even further for advanced users, the output partitions could be inferred as well.
Public Interfaces
The same is true for consumer group id.
Public Interfaces
We would bump the AddPartitionToTxn API version by one to enable the blocking path on the broker.
We would change beginTransaction API on the KafkaProducer to give user the access for including the pre-registered partitions.
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Should be called before the start of each new transaction. Note that prior to the first invocation
* of this method, you must invoke {@link #initTransactions()} exactly one time.
*
* @throws IllegalStateException if no transactional.id has been configured or if {@link #initTransactions()}
* has not yet been invoked
* @throws ProducerFencedException if another producer with the same transactional.id is active
* @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker
* does not support transactions (i.e. if its version is lower than 0.11.0.0)
* @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
* transactional.id is not authorized. See the exception for more details
* @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error
*/
public void beginTransaction(Set<TopicPartition> outputPartitions) throws ProducerFencedException; |
This API shall be flexible for user to decide whether to call it before the record processing to pipeline the effort, as:
Code Block | ||
---|---|---|
| ||
// Read some records from the consumer and collect the offsets to commit
ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000)); // This will be the fencing point if there are pending offsets for the first time.
Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed);
// --------- Call before process ---------- //
// The application could infer output partition beforehand
Set<TopicPartition> outputPartitions = outputPartition(consumed);
producer.beginTransaction(outputPartitions);
// Do processing while sending out the built records asynchronously
for (ProducerRecord record : process(consumed))
producer.send(record);
// Pass the entire consumer group metadata
producer.sendOffsetsToTransaction(consumedOffsets, consumer.groupMetadata());
producer.commitTransaction(); |
or user could do it after the processing for larger produce batch.
Code Block | ||
---|---|---|
| ||
// Read some records from the consumer and collect the offsets to commit
ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000)); // This will be the fencing point if there are pending offsets for the first time.
Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed);
// Do some processing and build the records we want to produce
List<ProducerRecord> processed = process(consumed);
// --------- Call after process ---------- //
Set<TopicPartition> outputPartitions = outputPartitions(processed);
producer.beginTransaction(outputPartitions);
// Write the records and commit offsets under a single transaction
for (ProducerRecord record : processed)
producer.send(record);
// Pass the entire consumer group metadata
producer.sendOffsetsToTransaction(consumedOffsets, consumer.groupMetadata());
producer.commitTransaction(); |
To ensure that we don't encounter human mistake, when the application is using the new `beginTransaction` API, we shall not expect any produced record going to an unknown partition unexpectedly, so we shall let producer.send() throw UNKNOWN_TOPIC_OR_PARTITION, while augmenting its definition:
Code Block |
---|
UNKNOWN_TOPIC_OR_PARTITION(3, "This server does not host this topic-partition, or this error is thrown from a transactional producer who doesn't expect this topic partition to send to.", UnknownTopicOrPartitionException::new), |
Updated JavaDoc for send:
, and add a nullable consumer group id field:
Code Block | ||
---|---|---|
| ||
{
"apiKey": 24,
"type": "request",
"name": "AddPartitionsToTxnRequest",
// Version 1 is the same as version 0.
//
// Version 2 adds the blocking behavior and consumer group id.
"validVersions": "0-2",
"flexibleVersions": "none",
"fields": [
{ "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId",
"about": "The transactional id corresponding to the transaction."},
{ "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId",
"about": "Current producer id in use by the transactional id." },
{ "name": "ProducerEpoch", "type": "int16", "versions": "0+",
"about": "Current epoch associated with the producer id." },
{ "name": "GroupId", "type": "string", "versions": "2+", "entityType": "groupId", "nullableVersions": "2+", "default": "null",
"about": "The unique group identifier." }
{ "name": "Topics", "type": "[]AddPartitionsToTxnTopic", "versions": "0+",
"about": "The partitions to add to the transaction.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
"about": "The name of the topic." },
{ "name": "Partitions", "type": "[]int32", "versions": "0+",
"about": "The partition indexes to add to the transaction" }
]}
]
} |
We would change beginTransaction API on the KafkaProducer to give user the access for including the pre-registered partitions.
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Should be called before the start of each new transaction. Note that prior to the first invocation
* of this method, you must invoke {@link #initTransactions()} exactly one time.
*
* @param outputPartitions the set of output partitions to produce to.
* @param consumerGroupId the consumer group id to send offsets to.
*
* @throws IllegalStateException if no transactional.id has been configured or if {@link #initTransactions()}
* has not yet been invoked
* @throws ProducerFencedException if another producer with the same transactional.id is active
* @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker
* does not support transactions (i.e. if its version is lower than 0.11.0.0)
* @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
* transactional.id is not authorized. See the exception for more details
* @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error
*/
public void beginTransaction(Set<TopicPartition> outputPartitions, String consumerGroupId) throws ProducerFencedException; |
Note that the consumerGroupId could be null so that it is not a strong requirement to put it as part of your code when you are not using consumer.
This API shall be flexible for user to decide whether to call it before the record processing to pipeline the effort, as:
Code Block | ||
---|---|---|
| ||
// Read some records from the consumer and collect the offsets to commit
ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000)); // This will be the fencing point if there are pending offsets for the first time.
Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed);
// --------- Call before process ---------- //
// The application could infer output partition beforehand
Set<TopicPartition> outputPartitions = outputPartition(consumed);
producer.beginTransaction(outputPartitions, consumer.groupMetadata().groupId());
// Do processing while sending out the built records asynchronously
for (ProducerRecord record : process(consumed))
producer.send(record);
// Pass the entire consumer group metadata
producer.sendOffsetsToTransaction(consumedOffsets, consumer.groupMetadata());
producer.commitTransaction(); |
or user could do it after the processing for larger produce batch.
Code Block | ||
---|---|---|
| ||
// Read some records from the consumer and collect the offsets to commit
ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000)); // This will be the fencing point if there are pending offsets for the first time.
Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed);
// Do some processing and build the records we want to produce
List<ProducerRecord> processed = process(consumed);
// --------- Call after process ---------- //
Set<TopicPartition> outputPartitions = outputPartitions(processed);
producer.beginTransaction(outputPartitions, consumer.groupMetadata().groupId());
// Write the records and commit offsets under a single transaction
for (ProducerRecord record : processed)
producer.send(record);
// Pass the entire consumer group metadata
producer.sendOffsetsToTransaction(consumedOffsets, consumer.groupMetadata());
producer.commitTransaction(); |
To ensure that we don't encounter human mistake, when the application is using the new `beginTransaction` API, we shall not expect any produced record going to an unknown partition unexpectedly, or commit to some non-registered group. We shall let producer.send() throw UNKNOWN_TOPIC_OR_PARTITION and producer.sendOffsets() throw INVALID_GROUP_ID, while augmenting their corresponding definitions:
Code Block |
---|
UNKNOWN_TOPIC_OR_PARTITION(3, "This server does not host this topic-partition, or this error is thrown from a transactional producer who doesn't expect this topic partition to send to.", UnknownTopicOrPartitionException::new),
INVALID_GROUP_ID(69, "The configured groupId is invalid, or this error is thrown from a transactional producer who doesn't expect to commit offset to this group.", InvalidGroupIdException::new), |
Updated JavaDoc for send:
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Asynchronously send a record to a topic and invoke the provided callback when the send has been acknowledged.
*
* @param record The record to send
* @param callback A user-supplied callback to execute when the record has been acknowledged by the server (null
* indicates no callback)
*
* @throws AuthenticationException if authentication fails. See the exception for more details
* @throws AuthorizationException fatal error indicating that the producer is not allowed to write
* @throws IllegalStateException if a transactional.id has been configured and no transaction has been started, or
* when send is invoked after producer has been closed.
* @throws InterruptException If the thread is interrupted while blocked
* @throws SerializationException If the key or value are not valid objects given the configured serializers
* @throws UnknownTopicOrPartitionException If the record topic partition is not included in the pre-registered partitions
* @throws KafkaException If a Kafka related error occurs that does not belong to the public API exceptions.
*/
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
/*
* Sends a list of specified offsets to the consumer group coordinator, and also marks
* those offsets as part of the current transaction.
*
* @throws IllegalStateException if no transactional.id has been configured or no transaction has been started.
* @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active
* @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker
* does not support transactions (i.e. if its version is lower than 0.11.0.0) or
* the broker doesn't support latest version of transactional API with consumer group metadata (i.e. if its version is
* lower than 2.5.0).
* @throws org.apache.kafka.common.errors.UnsupportedForMessageFormatException fatal error indicating the message
* format used for the offsets topic on the broker does not support transactions
* @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
* transactional.id is not authorized, or the consumer group id is not authorized.
* @throws org.apache.kafka.clients.consumer.CommitFailedException if the commit failed and cannot be retried | ||||
Code Block | ||||
| ||||
/** * Asynchronously send a record to a topic and invoke the provided callback when the send has been acknowledged. * * @param record The record to send * @param callback A user-supplied callback to execute when the record has been acknowledged by the server (null * indicates no callback) * * @throws AuthenticationException if authentication fails. See the exception for more details(e.g. if the consumer has been kicked out of the group). Users should handle this by aborting the transaction. * @throws AuthorizationException fatal error indicating that the producer is not allowed to writeorg.apache.kafka.common.errors.FencedInstanceIdException if this producer instance gets fenced by broker due to a * @throws IllegalStateException if a transactional.id has been configured and no transaction has been started, or * when send is invoked after producer has been closed. * @throws InterruptExceptionmis-configured If the thread is interrupted while blocked consumer instance id within group metadata. * @throws SerializationException Iforg.apache.kafka.common.errors.InvalidGroupIdException if the keygiven orconsumer valuegroup areid notdoesn't validmatch objects given the configured serializers * @throws UnknownTopicOrPartitionException If the record topic partition is not included in the pre-registered partitions pre-registered value. * @throws KafkaException If if the producer has encountered a Kafka relatedprevious fatal or abortable error, occursor thatfor doesany not belong to the public* API exceptions. */ other unexpected @Overrideerror public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { */ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, // intercept the record, which can be potentially modified; this method does not throw exceptions ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record); return doSend(interceptedRecord, callback); }ConsumerGroupMetadata groupMetadata) throws ProducerFencedException |
Compatibility, Deprecation, and Migration Plan
...