Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

A minor issue is that the current AddOffsetsToTxn API is only registering a consumer group id towards the transaction coordinator. Unless for certain advanced use cases where multiple consumer groups are behind one transactional producer, this shall also be deterministic and could be combined with AddPartitionToTxn to save a network trip as well.

More details on a transaction session workflow could be found here.

Proposed Changes

Block Begin Transaction

...

Pre-register Output Partitions And Consumer Group

Considering Look at the latest EOS example below:

...

As one could see, with the set of output records, developer could easily infer the output partitions. Even further for advanced users, the output partitions could be inferred as well. The same is true for consumer group id is also known before the transaction begins.

Public Interfaces

We would bump the AddPartitionToTxn API version by one to enable the blocking path on the broker, and add a nullable consumer group id field:

Code Block
titleAddPartitionsToTxn.json
{
  "apiKey": 24,
  "type": "request",
  "name": "AddPartitionsToTxnRequest",
  // Version 1 is the same as version 0.
  // 
  // Version 2 adds the blocking behavior and consumer group id.
  "validVersions": "0-2",
  "flexibleVersions": "none",
  "fields": [
    { "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId",
      "about": "The transactional id corresponding to the transaction."},
    { "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId",
      "about": "Current producer id in use by the transactional id." },
    { "name": "ProducerEpoch", "type": "int16", "versions": "0+",
      "about": "Current epoch associated with the producer id." },
	// -------------- START NEW FIELD -------------- 
    { "name": "GroupId", "type": "string", "versions": "2+", "entityType": "groupId", "nullableVersions": "2+", "default": "null",
      "about": "The unique group identifier." }
	// -------------- END NEW FIELD ----------------
    { "name": "Topics", "type": "[]AddPartitionsToTxnTopic", "versions": "0+",
      "about": "The partitions to add to the transaction.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
        "about": "The name of the topic." },
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partition indexes to add to the transaction" }
    ]}
  ]
}

...

Note that the consumerGroupId could be null so that it is not a strong requirement to put it as part of your user code when you are not using consumerconsumer is not used.


This API shall be flexible for user to decide whether to call it before the record processing to pipeline the effort, as:

...

To ensure that we don't encounter human mistake, when the application is using the new `beginTransaction` API, we shall not expect any produced record going to an unknown partition unexpectedly, or commit to some non-registered group. We shall let producer.send() throw UNKNOWN_TOPIC_OR_PARTITION and producer.sendOffsets() throw INVALID_GROUP_ID, while augmenting their corresponding definitions:

...

Code Block
titleEndTxnRequest.json
{
  "apiKey": 26,
  "type": "request",
  "name": "EndTxnRequest",
  // Version 1 is the same as version 0.
  // 
  // Version 2 adds the support for new PRODUCER_FENCED error code.
  //
  // Version 3 adds the partitions actually gets written during the current transaction, or null to indicate that
  // all the partitions registered so far gets written to. 
  "validVersions": "0-3",
  "flexibleVersions": "none",
  "fields": [
    { "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId",
      "about": "The ID of the transaction to end." },
    { "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId",
      "about": "The producer ID." },
    { "name": "ProducerEpoch", "type": "int16", "versions": "0+",
      "about": "The current epoch associated with the producer." },
    { "name": "Committed", "type": "bool", "versions": "0+",
      "about": "True if the transaction was committed, false if it was aborted." },
	// -------------- START NEW FIELD -------------- 
	{ "name": "Topics", "type": "[]EndTxnTopic", "versions": "3+",
      "about": "The partitions ending up writing in current transaction.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
        "about": "The name of the topic." },
      { "name": "Partitions", "type": "[]int32", "versions": "3+",
        "about": "The partition indexes to add to the transaction" }
    ]}
	// -------------- END NEW FIELD ----------------
  ]
}

This logic will be handled internally for the producer to bookkeep sent record partitions, no action needed for user.

Performance Testing

Since this KIP is about performance improvement, benchmarking the producer performance with/without pre-registration is necessary. There already exists a performance benchmark as org.apache.kafka.tools.ProducerPerformance which could be utilized to perform local benchmark tests. A new config parameter shall be added:

...

Code Block
languagejava
titleEOSPerformance
 // ----- New configurations beyond ProducerPerformance ----- 
 parser.addArgument("--input-topic")
                .action(store())
                .required(true)
                .type(String.class)
                .metavar("INPUT-TOPIC")
                .dest("inputTopic")
                .help("consume messages from this topic");

 parser.addArgument("--output-topic")
                .action(store())
                .required(true)
                .type(String.class)
                .metavar("OUTPUT-TOPIC")
                .dest("outputTopic")
                .help("produce messages to this topic");

 parser.addArgument("--consumer-config")
                .action(store())
                .required(false)
                .type(String.class)
                .metavar("CONFIG-FILE")
                .dest("consumerConfigFile")
                .help("consumer config properties file.");

 parser.addArgument("--consumer-props")
                .nargs("+")
                .required(false)
                .metavar("PROP-NAME=PROP-VALUE")
                .type(String.class)
                .dest("consumerConfig")
                .help("kafka consumer related configuration properties like bootstrap.servers,client.id etc. " +
                      "These configs take precedence over those passed via --consumer-config.");
 
 parser.addArgument("--consumer-group-id")
                .action(store())
                .required(false)
                .type(String.class)
                .metavar("CONSUMER-GROUP-ID")
                .dest("consumerGroupId")
                .setDefault("performance-consumer-default-group-id")
                .help("The consumerGroupId to use if we use consumer as the data source.");
 
 parser.addArgument("--poll-timeout")
                .action(store())
                .required(false)
                .type(Long.class)
                .metavar("POLL-TIMEOUT")
                .dest("pollTimeout")
                .setDefault(100L)
                .help("consumer poll timeout");

 parser.addArgument("--pre-registration")
                .action(store())
                .required(false)
                .type(Boolean.class)
                .metavar("PRE-REGISTRATION")
                .help("Whether to pre-register topic partitions");

...