Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.


Code Block
     * Asynchronously send a record to a topic and invoke the provided callback when the send has been acknowledged.
 	 * @param record The record to send
     * @param callback A user-supplied callback to execute when the record has been acknowledged by the server (null
     *        indicates no callback)
     * @throws AuthenticationException if authentication fails. See the exception for more details
     * @throws AuthorizationException fatal error indicating that the producer is not allowed to write
     * @throws IllegalStateException if a has been configured and no transaction has been started, or
     *                               when send is invoked after producer has been closed.
     * @throws InterruptException If the thread is interrupted while blocked
     * @throws SerializationException If the key or value are not valid objects given the configured serializers
	 * @throws UnknownTopicOrPartitionException If the record topic partition is not included in the pre-registered partitions
     * @throws KafkaException If a Kafka related error occurs that does not belong to the public API exceptions.
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        // intercept the record, which can be potentially modified; this method does not throw exceptions
        ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
        return doSend(interceptedRecord, callback);

     * Sends a list of specified offsets to the consumer group coordinator, and also marks
     * those offsets as part of the current transaction.
     * @throws IllegalStateException if no has been configured or no transaction has been started.
     * @throws ProducerFencedException fatal error indicating another producer with the same is active
     * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker
     *         does not support transactions (i.e. if its version is lower than or
     *         the broker doesn't support latest version of transactional API with consumer group metadata (i.e. if its version is
     *         lower than 2.5.0).
     * @throws org.apache.kafka.common.errors.UnsupportedForMessageFormatException fatal error indicating the message
     *         format used for the offsets topic on the broker does not support transactions
     * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
     * is not authorized, or the consumer group id is not authorized.
     * @throws org.apache.kafka.clients.consumer.CommitFailedException if the commit failed and cannot be retried
     *         (e.g. if the consumer has been kicked out of the group). Users should handle this by aborting the transaction.
     * @throws org.apache.kafka.common.errors.FencedInstanceIdException if this producer instance gets fenced by broker due to a
     *                                                                  mis-configured consumer instance id within group metadata.
	 * @throws org.apache.kafka.common.errors.InvalidGroupIdException if the given consumer group id doesn't match the
	 *																  pre-registered value.             
     * @throws KafkaException if the producer has encountered a previous fatal or abortable error, or for any
     *         other unexpected error
    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
                                         ConsumerGroupMetadata groupMetadata) throws ProducerFencedException

Performance Testing

Since this KIP is about performance improvement, benchmarking the producer performance with/without pre-registration is necessary. There already exists a performance benchmark as which could be utilized to perform local benchmark tests. A new config parameter shall be added:

Code Block
                .help("Whether to pre-register topic partitions");

Moreover, the ProducerPerformance module currently only deals with producer only, which doesn't take consumer into consideration. We would like to introduce a separate set of performance benchmark called EOSPerformance to verify the throughput under poll-process-produce scenario. The new benchmark would be having a similar setup as ProducerPerformance but adds a couple of extra configurations, and remove some unnecessary configs (record generation payload, for example):

There is also an edge case where a user registers a lot of topic partitions, but didn't write to all of them. In this case, it is a waste of resource to write transaction markers to all of the registered partitions. To minimize such unexpected cost, we shall also bump the EndTxn to include the actual written partitions.

Code Block
  "apiKey": 26,
  "type": "request",
  "name": "EndTxnRequest",
  // Version 1 is the same as version 0.
  // Version 2 adds the support for new PRODUCER_FENCED error code.
  // Version 3 adds the partitions actually gets written during the current transaction, or null to indicate that
  // all the partitions registered so far gets written to. 
  "validVersions": "0-3",
  "flexibleVersions": "none",
  "fields": [
    { "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId",
      "about": "The ID of the transaction to end." },
    { "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId",
      "about": "The producer ID." },
    { "name": "ProducerEpoch", "type": "int16", "versions": "0+",
      "about": "The current epoch associated with the producer." },
    { "name": "Committed", "type": "bool", "versions": "0+",
      "about": "True if the transaction was committed, false if it was aborted." },
	// -------------- START NEW FIELD -------------- 
	{ "name": "Topics", "type": "[]EndTxnTopic", "versions": "3+",
      "about": "The partitions ending up writing in current transaction.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
Code Block
 // ----- New configurations beyond ProducerPerformance ----- 
                .help("consume messages from this topic");

        "about": "The name of the    .required(true)topic." },
      { "name": "Partitions", "type": "[]int32", "versions": "3+",
    "about": "The partition indexes to add to the     .metavar("OUTPUT-TOPIC")transaction" }
	// -------------- END NEW FIELD   ----------------

Performance Testing

Since this KIP is about performance improvement, benchmarking the producer performance with/without pre-registration is necessary. There already exists a performance benchmark as which could be utilized to perform local benchmark tests. A new config parameter shall be added:

Code Block
                .help("produce  messages to this topic");

Whether to pre-register topic partitions");

Moreover, the ProducerPerformance module currently only deals with producer only, which doesn't take consumer into consideration. We would like to introduce a separate set of performance benchmark called EOSPerformance to verify the throughput under poll-process-produce scenario. The new benchmark would be having a similar setup as ProducerPerformance but adds a couple of extra configurations, and remove some unnecessary configs (record generation payload, for example):

Code Block
 // ----- New configurations beyond ProducerPerformance ----- 
 parser.addArgument("--input-topic")             .help("consumer config properties file.");

                .help("kafka consumer related configuration properties like bootstrap.servers, etc. " +dest("inputTopic")
                      "These configs take precedence over those passed via"consume messages from this topic");
                .help("Theproduce consumerGroupIdmessages to use if we use consumer as the data source.this topic");

   help("consumer config properties file.");

             .help("consumer  poll timeout");

                .help("Whetherkafka consumer torelated pre-registerconfiguration topic partitions");

We have built a prototype last year but it is incomplete and could break some external dependency on ProducerPerformance. In this KIP proposal, we would try to build the benchmark with the effort to maintain compatibility as necessary.

Compatibility, Deprecation, and Migration Plan

The blocking behavior for the first AddPartitionsToTxn shall be applied to both old and new producers, which are expected to be retried when the request gets timeout eventually.

The new beginTransaction API is only available for new producers, so there should not be a compatibility issue. To enable pre-registration, the broker has to be on the latest version, otherwise UnsupportedVersionException would be thrown.

Rejected Alternatives

We have thought about using a context session on the broker side to remember the partitions being written to in the last transaction, and bump the EndTxn to include the actual written partitions so that the session gets updated. 

Code Block
  "apiKey": 26,
  "type": "request",
  "name": "EndTxnRequest",
  // Version 1 is the same as version 0.
  // Version 2 adds the support for new PRODUCER_FENCED error code.
  // Version 3 
  "validVersions": "0-3",
  "flexibleVersions": "none",
  "fields": [
    { "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId",
      "about": "The ID of the transaction to end." },
    { "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId",
      "about": "The producer ID." },
    { "name": "ProducerEpoch", "type": "int16", "versions": "0+",
      "about": "The current epoch associated with the producer." },
    { "name": "Committed", "type": "bool", "versions": "0+",
      "about": "True if the transaction was committed, false if it was aborted." },
	// -------------- START NEW FIELD -------------- 
	{ "name": "Topics", "type": "[]EndTxnTopic", "versions": "3+",
      "about": "The partitions ending up writing in current transaction.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
        "about": "The name of the topic." },
      { "name": "Partitions", "type": "[]int32", "versions": "3+",
        "about": "The partition indexes to add to the transaction" }
	// -------------- END NEW FIELD ----------------
properties like bootstrap.servers, etc. " +
                      "These configs take precedence over those passed via --consumer-config.");
                .help("The consumerGroupId to use if we use consumer as the data source.");
                .help("consumer poll timeout");

                .help("Whether to pre-register topic partitions");

We have built a prototype last year but it is incomplete and could break some external dependency on ProducerPerformance. In this KIP proposal, we would try to build the benchmark with the effort to maintain compatibility as necessary.

Compatibility, Deprecation, and Migration Plan

The blocking behavior for the first AddPartitionsToTxn shall be applied to both old and new producers, which are expected to be retried when the request gets timeout eventually.

The new beginTransaction API is only available for new producers, so there should not be a compatibility issue. To enable pre-registration, the broker has to be on the latest version, otherwise UnsupportedVersionException would be thrown.

Rejected Alternatives

We have thought about using a context session on the broker side to remember the partitions being written to in the last transaction, and the session gets updated by the EndTxn call.

This  This solution has less flexibility and especially makes the reinitialization harder, for example a re-initialized producer doesn't know what previous session looks like, so there needs to be special logic to make sure it doesn't rely on any pre-defined state which could be false. And it is harder to understand, as well.