Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state:  [One of " Under Discussion", "Accepted", "Rejected"]

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here [Change the link from KAFKA-95921 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Describe the problems you are trying to solve.

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

Proposed Changes

...

Currently if a transactional producer hits a fatal exception, the caller usually catches the exception and handle it by invoking abortTransaction to abort the transaction, and then closing the producer, which makes sense and sounds clean. The tricky scenario is that abortTransaction is not a safe call when the producer is already in an error state, which means user has to do another try-catch with the first layer catch block, making the error handling pretty annoying. It is more appropriate and user-friendly to ensure that the producer client does not throw the same exception again while aborting transaction.

Proposed Changes

Our proposal is quite straightforward and simple. To avoid throwing the same exception twice, we would remember whether a fatal exception has already been thrown to the application level, so that in abortTransaction we will not throw it again, thus making the function safe to be called in an error state.

Public Interfaces

We shall modify the comments on abortTransaction and remove all exceptions except TimeoutException and InterruptException.

Code Block
languagejava
titleKafkaProducer.java
/**
* Aborts the ongoing transaction. Any unflushed produce messages will be aborted when this call is made.
* This call will be a no-op if previous transactional call has thrown an exception and made the producer in an error state.
*
* Note that this method will raise {@link TimeoutException} if the transaction cannot be aborted before expiration
* of {@code max.block.ms}. Additionally, it will raise {@link InterruptException} if interrupted.
* It is safe to retry in either case, but it is not possible to attempt a different operation (such as commitTransaction)
* since the abort may already be in the progress of completing. If not retrying, the only option is to close the producer.
*
* @throws TimeoutException if the time taken for aborting the transaction has surpassed <code>max.block.ms</code>.
* @throws InterruptException if the thread is interrupted while blocked
*/
public void abortTransaction();

Suggested Coding Pattern

With this proposed change, users may use transactional APIs as follows.

Code Block
languagejava
titleExample.java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

producer.initTransactions();

try {
    producer.beginTransaction();
    for (int i = 0; i < 100; i++)
        producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction();
    if(e instanceof IllegalStateException || 
        e instanceof ProducerFencedException ||
	    e instanceof UnsupportedVersionException || 
        e instanceof AuthorizationException) {
        producer.close();
    }
}


Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives

Our proposed change can be considered as minor. In users' perspective, they have less exceptions to worry and handle when calling abortTransaction. 

Rejected Alternatives

  1. Internally abort any ongoing transaction within `producer.close`, and comment on `abortTransaction` call to warn user not to do it manually. 
  2. Similar to 1, but get a new `close(boolean abortTransaction)` API call in case some users want to handle transaction state by themselves.
  3. Introduce a new abort transaction API with a boolean flag indicating whether the producer is in error state, instead of throwing exceptions
  4. Introduce a public API `isInError` on producer for user to validate before doing any transactional API calls

The alternatives are rejected because they all require bigger changesIf there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.