Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Added new public-facing Connect API methods and connector configuration properties

...

Name

Type

Default

Importance

Doc

exactly.once.source.enabled 

BOOLEAN 

false 

HIGH 

Whether to enable exactly-once support for source connectors in the cluster by writing source records and their offsets in a Kafka transaction, and by proactively fencing out old task generations before bringing up new ones. Note that this must be enabled on every worker in a cluster in order for exactly-once delivery to be guaranteed, and that some source connectors may still not be able to provide exactly-once delivery guarantees even with this support enabled.

And a single four new per-connector configuration property properties will be added:

storage.topic The name of a separate offsets topic to use for this connector. If empty or not specified, the worker’s global offsets topic name will be used. If specified, the offsets topic will be created if it does not already exist on the Kafka cluster targeted by this connector (which may be different from the one used for the worker's global offsets topic if the bootstrap.servers property of the connector's producer has been overridden from the worker's).

Name

Type

Default

Importance

Docoffsets

exactly.once STRINGnull"requested" 

LOW 

MEDIUM Permitted values are "requested" and "required". If set to "required", forces a preflight check for the connector to ensure that it can provide exactly-once delivery with the given configuration. Some connectors may be capable of providing exactly-once delivery but not signal to Connect that they support this; in that case, documentation for the connector should be consulted carefully before creating it, and the value for this property should be set to "requested". Additionally, if the value is set to "required" but the worker that performs preflight validation does not have exactly-once support enabled for source connectors, requests to create or validate the connector will fail.
transaction.boundary STRING "poll" MEDIUM Permitted values are "poll", "connector", and "interval". If set to "poll", a new producer transaction will be started and committed for every batch of records that each task from this connector provides to Connect. If set to "connector", relies on connector-defined transaction boundaries; note that not all connectors are capable of defining their own transaction boundaries, and in that case, attempts to create them with this property set to "connector" will fail. Finally, if set to "interval", commits transactions only after a user-defined time interval has passed.

offsets.storage.topic 

STRING 

null 

LOW 

The name of a separate offsets topic to use for this connector. If empty or not specified, the worker’s global offsets topic name will be used. If specified, the offsets topic will be created if it does not already exist on the Kafka cluster targeted by this connector (which may be different from the one used for the worker's global offsets topic if the bootstrap.servers property of the connector's producer has been overridden from the worker's).

transaction.boundary.interval.ms LONG null LOW If "transaction.boundary" is set to "interval", determines the interval for producer transaction commits by connector tasks. If unset, defaults to the value of the worker-level "offset.flush.interval.ms" property.

Connector API expansions

Any newly-introduced interfaces, classes, etc. here will be added to the connect-api  artifact, since they will be part of the public API for Connect.

A new ExactlyOnceSupport  enum is introduced:

Code Block
languagejava
titleExactlyOnce
package org.apache.kafka.connect.source;

/**
 * An enum to represent the level of support for exactly-once delivery from a source connector.
 */
public enum ExactlyOnceSupport {
    /**
     * Signals that a connector supports exactly-once delivery.
     */
	SUPPORTED,
    /**
     * Signals that a connector does not support exactly-once delivery.
     */
    UNSUPPORTED
}

The SourceConnector  API is expanded to allow developers to specify whether their connector supports exactly-once delivery:

Code Block
languagejava
titleSourceConnector
package org.apache.kafka.connect.source;

public abstract class SourceConnector extends Connector {
	// Existing fields and methods omitted

    /**
     * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration.
     * Developers can assume that worker-level exactly-once support is enabled when this method is invoked.
     * The default implementation will return {@code null}.
     * @param connectorConfigs the configuration that will be used for the connector.
     * @return {@link ExactlyOnceSupport#SUPPORTED} if the connector can provide exactly-once support,
     * and {@link ExactlyOnceSupport#UNSUPPORTED} if it cannot. If {@code null}, it is assumed that the
     * connector cannot.
     */
    public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> connectorConfig) {
		return false;
    }
}

A new TransactionContext  interface is introduced:

Code Block
languagejava
titleTransactionContext
package org.apache.kafka.connect.source;

/**
 * Provided to source tasks to allow them to define their own producer transaction boundaries when
 * exactly-once support is enabled.
 */
public interface TransactionContext {
    /**
     * Request a transaction commit after the next batch of records from {@link SourceTask#poll()}
     * is processed.
     */
    void commitTransaction();

    /**
     * Request a transaction commit after a source record is processed. The source record will be the
     * last record in the committed transaction.
     * @param record the record to commit the transaction after.
     */
    void commitTransaction(SourceRecord record);

    /**
     * Requests a transaction abort the next batch of records from {@link SourceTask#poll()}. All of
     * the records in that transaction will be discarded and will not appear in a committed transaction..
     */
    void abortTransaction();

    /**
     * Requests a transaction abort after a source record is processed. The source record will be the
     * last record in the aborted transaction. All of the records in that transaction will be discarded
     * and will not appear in a committed transaction.
     * @param record the record to abort the transaction after.
     */
    void abortTransaction(SourceRecord record);
}

The SourceTaskContext  interface is expanded to provide developers access to a TransactionContext  instance (Javadocs largely copied from existing docs on the SinkTaskContext::errantRecordReporter ):

Code Block
languagejava
titleSourceTaskContext
package org.apache.kafka.connect.source;

public interface SourceTaskContext {
	// Existing fields and methods omitted

    /**
     * Get a {@link TransactionContext} that can be used to define producer transaction boundaries
     * when exactly-once support is enabled for the connector.
     *
     * <p>This method was added in Apache Kafka 3.0. Source tasks that use this method but want to
     * maintain backward compatibility so they can also be deployed to older Connect runtimes
     * should guard the call to this method with a try-catch block, since calling this method will result in a
     * {@link NoSuchMethodException} or {@link NoClassDefFoundError} when the sink connector is deployed to
     * Connect runtimes older than Kafka 3.0. For example:
     * <pre>
     *     TransactionContext transactionContext;
     *     try {
     *         transactionContext = context.transactionContext();
     *     } catch (NoSuchMethodError | NoClassDefFoundError e) {
     *         transactionContext = null;
     *     }
     * </pre>
     *
     * @return the transaction context, or null if the user does not want the connector to define
     * its own transaction boundaries
     * @since 3.0
     */
	default TransactionContext transactionContext() {
		return null
 	}
}

And the SourceConnector  API is expanded with a second new method to allow developers to specify whether their connector can define its own transaction boundaries:

Code Block
languagejava
titleSourceConnector
package org.apache.kafka.connect.source;

public abstract class SourceConnector extends Connector {
	// Existing fields and methods omitted

    /**
     * Signals whether the connector can define its own transaction boundaries with the proposed
     * configuration. Developers must override this method if they wish to add connector-defined
     * transaction boundary support; if they do not, users will be unable to create instances of
     * this connector that use connector-defined transaction boundaries. The default implementation
     * will return {@code false}.
     * @param connectorConfigs the configuration that will be used for the connector
     * @return whether the connector can define its own transaction boundaries  with the given
     * config.
     */
    public boolean canDefineTransactionBoundaries(Map<String, String> connectorConfig) {
		return false;
    }
}

New metrics

Three new task-level JMX properties will be added:

...