Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
linenumberstrue
    // Read some records from the consumer and collect the offsets to commit
    ConsumerRecords consumed = consumer.poll(Duration.ofMillis(5000)); // This will be the fencing point if there are pending offsets for the first time.
    Map<TopicPartition, OffsetAndMetadata> consumedOffsets = offsets(consumed);

    // Do some processing and build the records we want to produce
    List<ProducerRecord> processed = process(consumed);

	// --------- Call after process ---------- //
	Set<TopicPartition> outputPartitions = outputPartitions(processed); 
    producer.beginTransaction(outputPartitions);

    // Write the records and commit offsets under a single transaction
    for (ProducerRecord record : processed)
      producer.send(record);

	// Pass the entire consumer group metadata
    producer.sendOffsetsToTransaction(consumedOffsets, consumer.groupMetadata());

    producer.commitTransaction();


To ensure that we don't encounter human mistake, when the application is using the new `beginTransaction` API, we shall not expect any produced record going to an unknown partition unexpectedly, so we shall let producer.send() throw UNKNOWN_TOPIC_OR_PARTITION, while augmenting its definition:

Code Block
UNKNOWN_TOPIC_OR_PARTITION(3, "This server does not host this topic-partition, or this error is thrown from a transactional producer who doesn't expect this topic partition to send to.", UnknownTopicOrPartitionException::new),

Updated JavaDoc for send:

Code Block
languagejava
titleKafkaProducer.java
	/**
     * Asynchronously send a record to a topic and invoke the provided callback when the send has been acknowledged.
     *
 	 * @param record The record to send
     * @param callback A user-supplied callback to execute when the record has been acknowledged by the server (null
     *        indicates no callback)
     *
     * @throws AuthenticationException if authentication fails. See the exception for more details
     * @throws AuthorizationException fatal error indicating that the producer is not allowed to write
     * @throws IllegalStateException if a transactional.id has been configured and no transaction has been started, or
     *                               when send is invoked after producer has been closed.
     * @throws InterruptException If the thread is interrupted while blocked
     * @throws SerializationException If the key or value are not valid objects given the configured serializers
	 * @throws UnknownTopicOrPartitionException If the record topic partition is not included in the pre-registered partitions
     * @throws KafkaException If a Kafka related error occurs that does not belong to the public API exceptions.
     */
    @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        // intercept the record, which can be potentially modified; this method does not throw exceptions
        ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
        return doSend(interceptedRecord, callback);
    }

Compatibility, Deprecation, and Migration Plan

...