Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The main addition of this KIP is a new variant of the current sendOffsetsToTransaction API which gives us access to the consumer group states, such as generation.id., member.id and group.instance.id, etc.

Code Block
interface Producer {
   /**
     * Sends a list of specified offsets to the consumer group coordinator, and also marks
     * those offsets as part of the current transaction. These offsets will be considered
     * committed only if the transaction is committed successfully. The committed offset should
     * be the next message your application will consume, i.e. lastProcessedMessageOffset + 1.
     * <p>
     * This method should be used when you need to batch consumed and produced messages
     * together, typically in a consume-transform-produce pattern. Thus, the specified
     * {@code consumerGroupId} should be the same as config parameter {@code group.id} of the used
     * {@link KafkaConsumer consumer}. Note, that the consumer should have {@code enable.auto.commit=false}
     * and should also not commit offsets manually (via {@link KafkaConsumer#commitSync(Map) sync} or
     * {@link KafkaConsumer#commitAsync(Map, OffsetCommitCallback) async} commits).
     *
     * This API won't deprecate the existing {@link KafkaProducer#sendOffsetsToTransaction(Map, String) sendOffsets} API as standalone
     * mode EOS applications are still relying on it. If the broker doesn't support the new underlying transactional API, the call will be automatically
     * downgraded to ignore consumer metadata, while in the meantime a warning shall be logged.
	 *
     * @throws IllegalStateException if no transactional.id has been configured or no transaction has been started
     * @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active
     * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker
     *         does not support transactions (i.e. if its version is lower than 0.11.0.0)
     * @throws org.apache.kafka.common.errors.UnsupportedForMessageFormatException  fatal error indicating the message
     *         format used for the offsets topic on the broker does not support transactions
     * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
     *         transactional.id is not authorized. See the exception for more details
     * @throws org.apache.kafka.common.errors.IllegalGenerationException if the passed in consumer metadata has illegal generation
     * @throws org.apache.kafka.common.errors.FencedInstanceIdException if the passed in consumer metadata has a fenced group.instance.id
     * @throws KafkaException if the producer has encountered a previous fatal or abortable error, or for any
     *         other unexpected error
     */ 
 
Code Block
interface Producer {
  /**
   * Should pass in the entire consumer state for new API.
   */
  void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, ConsumerGroupMetadata consumerGroupMetadata) throws ProducerFencedException, IllegalGenerationException, FencedInstanceIdException; // NEW
}

Shrink transactional.timeout

...

Code Block
OffsetFetchRequest => Partitions GroupId IsolationLevelWaitTransaction
  Partitions          => List<TopicPartition>
  GroupId             => String
  WaitTransaction     => Boolean // NEW

...