Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleKafkaProducer.java
	/**
     * Asynchronously send a record to a topic and invoke the provided callback when the send has been acknowledged.
     *
 	 * @param record The record to send
     * @param callback A user-supplied callback to execute when the record has been acknowledged by the server (null
     *        indicates no callback)
     *
     * @throws AuthenticationException if authentication fails. See the exception for more details
     * @throws AuthorizationException fatal error indicating that the producer is not allowed to write
     * @throws IllegalStateException if a transactional.id has been configured and no transaction has been started, or
     *                               when send is invoked after producer has been closed.
     * @throws InterruptException If the thread is interrupted while blocked
     * @throws SerializationException If the key or value are not valid objects given the configured serializers
	 * @throws UnknownTopicOrPartitionException If the record topic partition is not included in the pre-registered partitions
     * @throws KafkaException If a Kafka related error occurs that does not belong to the public API exceptions.
     */
    @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        // intercept the record, which can be potentially modified; this method does not throw exceptions
        ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
        return doSend(interceptedRecord, callback);
    }

	/** 
     * Sends a list of specified offsets to the consumer group coordinator, and also marks
     * those offsets as part of the current transaction.
	 *
     * @throws IllegalStateException if no transactional.id has been configured or no transaction has been started.
     * @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active
     * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker
     *         does not support transactions (i.e. if its version is lower than 0.11.0.0) or
     *         the broker doesn't support latest version of transactional API with consumer group metadata (i.e. if its version is
     *         lower than 2.5.0).
     * @throws org.apache.kafka.common.errors.UnsupportedForMessageFormatException fatal error indicating the message
     *         format used for the offsets topic on the broker does not support transactions
     * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
     *         transactional.id is not authorized, or the consumer group id is not authorized.
     * @throws org.apache.kafka.clients.consumer.CommitFailedException if the commit failed and cannot be retried
     *         (e.g. if the consumer has been kicked out of the group). Users should handle this by aborting the transaction.
     * @throws org.apache.kafka.common.errors.FencedInstanceIdException if this producer instance gets fenced by broker due to a
     *                                                                  mis-configured consumer instance id within group metadata.
	 * @throws org.apache.kafka.common.errors.InvalidGroupIdException if the given consumer group id doesn't match the
	 *																  pre-registered value.             
     * @throws KafkaException if the producer has encountered a previous fatal or abortable error, or for any
     *         other unexpected error
     */
    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
                                         ConsumerGroupMetadata groupMetadata) throws ProducerFencedException

Performance Testing

Since this KIP is about performance improvement, benchmarking the producer performance with/without pre-registration is necessary. There already exists a performance benchmark as org.apache.kafka.tools.ProducerPerformance which could be utilized to perform local benchmark tests. A new config parameter shall be added:

Code Block
languagejava
titleProducerPerformance
  parser.addArgument("--pre-registration")
                .action(store())
                .required(false)
                .type(Boolean.class)
                .metavar("PRE-REGISTRATION")
                .help("Whether to pre-register topic partitions");

Moreover, the ProducerPerformance module currently only deals with producer only, which doesn't take consumer into consideration. We would like to introduce a separate set of performance benchmark called EOSPerformance to verify the throughput under poll-process-produce scenario. The new benchmark would be having a similar setup as ProducerPerformance but adds a couple of extra configurations, and remove some unnecessary configs (record generation payload, for example):

Code Block
languagejava
titleEOSPerformance
 // ----- New configurations beyond ProducerPerformance ----- 
 parser.addArgument("--input-topic")
                .action(store())
                .required(true)
                .type(String.class)
                .metavar("INPUT-TOPIC")
                .dest("inputTopic")
                .help("consume messages from this topic");

 parser.addArgument("--output-topic")
                .action(store())
                .required(true)
                .type(String.class)
                .metavar("OUTPUT-TOPIC")
                .dest("outputTopic")
                .help("produce messages to this topic");

 parser.addArgument("--consumer-config")
                .action(store())
                .required(false)
                .type(String.class)
                .metavar("CONFIG-FILE")
                .dest("consumerConfigFile")
                .help("consumer config properties file.");

 parser.addArgument("--consumer-props")
                .nargs("+")
                .required(false)
                .metavar("PROP-NAME=PROP-VALUE")
                .type(String.class)
                .dest("consumerConfig")
                .help("kafka consumer related configuration properties like bootstrap.servers,client.id etc. " +
                      "These configs take precedence over those passed via --consumer-config.");
 
 parser.addArgument("--consumer-group-id")
                .action(store())
                .required(false)
                .type(String.class)
                .metavar("CONSUMER-GROUP-ID")
                .dest("consumerGroupId")
                .setDefault("performance-consumer-default-group-id")
                .help("The consumerGroupId to use if we use consumer as the data source.");
 
parser.addArgument("--poll-timeout")
                .action(store())
                .required(false)
                .type(Long.class)
                .metavar("POLL-TIMEOUT")
                .dest("pollTimeout")
                .setDefault(100L)
                .help("consumer poll timeout");

parser.addArgument("--pre-registration")
                .action(store())
                .required(false)
                .type(Boolean.class)
                .metavar("PRE-REGISTRATION")
                .help("Whether to pre-register topic partitions");

We have built a prototype last year but it is incomplete and could break some external dependency on ProducerPerformance. In this KIP proposal, we would try to build the benchmark with the effort to maintain compatibility as necessary.

Compatibility, Deprecation, and Migration Plan

The blocking behavior for the first AddPartitionsToTxn shall be applied to both old and new producers, which are expected to be retried when the request gets timeout eventually.

The new beginTransaction API and blocking behavior shall is only be available for new producers, so there should not be a compatibility issue. To enable blocking behavior and pre-registration, the broker has to be on the latest version as well, otherwise UnsupportedVersionException would be thrown.

...