Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under discussion Accepted

Discussion threadhere

Voting threadhere

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-10000
Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-6080

...

Note: This KIP was earlier published with the title "Atomic commit of source connector records and offsets", which corresponded to KAFKA-10000. It has now been expanded to also include general EoS support described in KAFKA-6080.

Background and References

...

  • Require as few per-connector changes as possible. The Connect ecosystem is fairly mature at this point and even something as lightweight as a new SourceTask  method would have to be applied to potentially dozens if not hundreds of connectors. The current proposal requires no such changes for existing connectors, as long as they use they use the source offsets API correctly.
  • Minimize configuration changes. The implementation of this feature may grow fairly complex, but it should be easy for users to enable and understand. KIP-98, for example, added exactly-once support for Kafka and its Java clients, and only exposed three producer properties and one consumer property to users. The current proposal only adds two five user-facing configuration properties.
  • Minimize operational burden on users. It's likely that this feature will be used heavily and may even be enabled by default with a later release of Connect. A wide range of Connect deployment styles and environments should be supported with as little pain as possible, including but not limited to security-conscious and resource-conscious environments. The current proposal's requirements in security-conscious environments are well-defined and easy-to-anticipate, and it does not significantly alter the resource allocation story of Connect.
  • Minimize gotchas and potential footguns. Nobody likes fine print. If we can address a common use case for Connect users or Connect developers, we should; leaving things out for the sake of a simpler design or smaller changes should only be done as a last resort. Even if well-documented, known gaps in use cases (especially those that vary from connector to connector) may not be propagated to or understood by end users, and may be seen as a fault in the quality of this feature.
  • Overall, make this a feature that gives people joy to use, not pain. Jury's out on this one until the vote thread passes!

Public Interfaces

New properties

A single worker-level configuration property will be added for distributed mode:

Name

Type

Default

Importance

Doc

exactly.once.source.enabledsupport 

BOOLEANSTRING 

falsedisabled 

HIGH 

Whether to enable exactly-once support for source connectors in the cluster by writing source records and their offsets in a Kafka transaction, and by proactively fencing out old task generations before bringing up new ones. Note that this must be enabled on every worker in a cluster in order for exactly-once delivery to be guaranteed, and that some source connectors may still not be able to provide exactly-once delivery guarantees even with this support enabled.

Permitted values are "disabled", "preparing", and "enabled". In order to safely enable exactly-once support for source connectors, all workers in the cluster must first be updated to use the "preparing" value for this property. Once this has been done, a second update of all of the workers in the cluster should be performed to change the value of this property to "enabled".

And four new And a single per-connector configuration property properties will be added:

storagetopic 

Name

Type

Default

Importance

Docoffsets

exactly.once.support STRINGnull"requested" 

LOW 

The name of a separate offsets topic to use for this connector. If empty or not specified, the worker’s global offsets topic name will be used. If specified, the offsets topic will be created if it does not already exist on the Kafka cluster targeted by this connector (which may be different from the one used for the worker's global offsets topic if the bootstrap.servers property of the connector's producer has been overridden from the worker's).

Proposed Changes

Atomic offset writes

Offset reads

When exactly.once.source.enabled is set to true for a worker, the consumers used by that worker to read source offsets from offsets topics will be configured with a new default property, isolation.level=read_committed. This will cause them to ignore any records that are part of a non-committed transaction, but still consume records that are not part of a transaction at all.

Users will be not able to explicitly override this in the worker configuration with the property consumer.isolation.level  or in the connector configuration with the property consumer.override.isolation.level. If they attempt to do so, the user-provided value will be ignored and a warning message will be logged notifying them of this fact.

Offset (and record) writes

When exactly.once.source.enabled is set to true for a worker, all source tasks created by that worker will use a transactional producer to write to Kafka. All source records that they provide to the worker will be written to Kafka inside a transaction. Once it comes time for offsets to be committed for a task, offset information for the current batch will be written to the offsets topic, inside the same transaction. Then, the transaction will be committed. This will ensure that source offsets will be committed to Kafka if and only if the source records for that batch were also written to Kafka.

The timing for offset commits will remain the same as it is today; they will be triggered periodically on a fixed interval that users can adjust via the offset.flush.interval.ms property. Transaction boundaries and record batches will be defined by these offset commits; expanded support for transaction boundary definition can be added later and is noted in the "future work" section.

MEDIUM Permitted values are "requested" and "required". If set to "required", forces a preflight check for the connector to ensure that it can provide exactly-once delivery with the given configuration. Some connectors may be capable of providing exactly-once delivery but not signal to Connect that they support this; in that case, documentation for the connector should be consulted carefully before creating it, and the value for this property should be set to "requested". Additionally, if the value is set to "required" but the worker that performs preflight validation does not have exactly-once support enabled for source connectors, requests to create or validate the connector will fail.
transaction.boundary STRING "poll" MEDIUM Permitted values are "poll", "connector", and "interval". If set to "poll", a new producer transaction will be started and committed for every batch of records that each task from this connector provides to Connect. If set to "connector", relies on connector-defined transaction boundaries; note that not all connectors are capable of defining their own transaction boundaries, and in that case, attempts to create them with this property set to "connector" will fail. Finally, if set to "interval", commits transactions only after a user-defined time interval has passed.

offsets.storage.topic 

STRING 

null 

LOW 

The name of a separate offsets topic to use for this connector. If empty or not specified, the worker’s global offsets topic name will be used. If specified, the offsets topic will be created if it does not already exist on the Kafka cluster targeted by this connector (which may be different from the one used for the worker's global offsets topic if the bootstrap.servers property of the connector's producer has been overridden from the worker's).

transaction.boundary.interval.ms LONG null LOW If "transaction.boundary" is set to "interval", determines the interval for producer transaction commits by connector tasks. If unset, defaults to the value of the worker-level "offset.flush.interval.ms" property.

Connector API expansions

Any newly-introduced interfaces, classes, etc. here will be added to the connect-api  artifact, since they will be part of the public API for Connect.

A new ExactlyOnceSupport  enum is introduced:

Code Block
languagejava
titleExactlyOnce
package org.apache.kafka.connect.source;

/**
 * An enum to represent the level of support for exactly-once delivery from a source connector.
 */
public enum ExactlyOnceSupport {
    /**
     * Signals that a connector supports exactly-once delivery.
     */
	SUPPORTED,
    /**
     * Signals that a connector does not support exactly-once delivery.
     */
    UNSUPPORTED
}

The SourceConnector  API is expanded to allow developers to specify whether their connector supports exactly-once delivery:

Code Block
languagejava
titleSourceConnector
package org.apache.kafka.connect.source;

public abstract class SourceConnector extends Connector {
	// Existing fields and methods omitted

    /**
     * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration.
     * Developers can assume that worker-level exactly-once support is enabled when this method is invoked.
     * The default implementation will return {@code null}.
     * @param connectorConfigs the configuration that will be used for the connector.
     * @return {@link ExactlyOnceSupport#SUPPORTED} if the connector can provide exactly-once support,
     * and {@link ExactlyOnceSupport#UNSUPPORTED} if it cannot. If {@code null}, it is assumed that the
     * connector cannot.
     */
    public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> connectorConfig) {
		return null;
    }
}

A new TransactionContext  interface is introduced:

Code Block
languagejava
titleTransactionContext
package org.apache.kafka.connect.source;

/**
 * Provided to source tasks to allow them to define their own producer transaction boundaries when
 * exactly-once support is enabled.
 */
public interface TransactionContext {
    /**
     * Request a transaction commit after the next batch of records from {@link SourceTask#poll()}
     * is processed.
     */
    void commitTransaction();

    /**
     * Request a transaction commit after a source record is processed. The source record will be the
     * last record in the committed transaction.
     * @param record the record to commit the transaction after.
     */
    void commitTransaction(SourceRecord record);

    /**
     * Requests a transaction abort the next batch of records from {@link SourceTask#poll()}. All of
     * the records in that transaction will be discarded and will not appear in a committed transaction..
     */
    void abortTransaction();

    /**
     * Requests a transaction abort after a source record is processed. The source record will be the
     * last record in the aborted transaction. All of the records in that transaction will be discarded
     * and will not appear in a committed transaction.
     * @param record the record to abort the transaction after.
     */
    void abortTransaction(SourceRecord record);
}

The SourceTaskContext  interface is expanded to provide developers access to a TransactionContext  instance (Javadocs largely copied from existing docs on the SinkTaskContext::errantRecordReporter ):

Code Block
languagejava
titleSourceTaskContext
package org.apache.kafka.connect.source;

public interface SourceTaskContext {
	// Existing fields and methods omitted

    /**
     * Get a {@link TransactionContext} that can be used to define producer transaction boundaries
     * when exactly-once support is enabled for the connector.
     *
     * <p>This method was added in Apache Kafka 3.0. Source tasks that use this method but want to
     * maintain backward compatibility so they can also be deployed to older Connect runtimes
     * should guard the call to this method with a try-catch block, since calling this method will result in a
     * {@link NoSuchMethodException} or {@link NoClassDefFoundError} when the source connector is deployed to
     * Connect runtimes older than Kafka 3.0. For example:
     * <pre>
     *     TransactionContext transactionContext;
     *     try {
     *         transactionContext = context.transactionContext();
     *     } catch (NoSuchMethodError | NoClassDefFoundError e) {
     *         transactionContext = null;
     *     }
     * </pre>
     *
     * @return the transaction context, or null if the user does not want the connector to define
     * its own transaction boundaries
     * @since 3.0
     */
	default TransactionContext transactionContext() {
		return null
 	}
}

A new ConnectorTransactionBoundaries  enum is introduced:

Code Block
languagejava
titleSourceConnector
package org.apache.kafka.connect.source;

/**
 * An enum to represent the level of support for connector-defined transaction boundaries.
 */
public enum ConnectorTransactionBoundaries {
    /**
     * Signals that a connector can define its own transaction boundaries.
     */
	SUPPORTED,
    /**
     * Signals that a connector cannot define its own transaction boundaries.
     */
    UNSUPPORTED
}

And the SourceConnector  API is expanded with a second new method to allow developers to specify whether their connector can define its own transaction boundaries:

Code Block
languagejava
titleSourceConnector
package org.apache.kafka.connect.source;

public abstract class SourceConnector extends Connector {
	// Existing fields and methods omitted

    /**
     * Signals whether the connector can define its own transaction boundaries with the proposed
     * configuration. Developers must override this method if they wish to add connector-defined
     * transaction boundary support; if they do not, users will be unable to create instances of
     * this connector that use connector-defined transaction boundaries. The default implementation
     * will return {@code UNSUPPORTED}.
     * @param connectorConfigs the configuration that will be used for the connector
     * @return whether the connector can define its own transaction boundaries  with the given
     * config.
     */
    public ConnectorTransactionBoundaries canDefineTransactionBoundaries(Map<String, String> connectorConfig) {
		return ConnectorTransactionBoundaries.UNSUPPORTED;
    }
}

REST API pre-flight validation

When a user submits a new connector configuration today, Connect takes steps to ensure that the connector configuration is valid before writing it to the config topic. These steps are performed synchronously and, if any errors are detected or unexpected failures occur, reported to the user in the body and status of the HTTP response for their request.

This validation takes place on the following endpoints:

  • POST /connectors/
  • PUT /connectors/{connector}/config
  • PUT /connector-plugins/{connectorType}/config/validate

Some new pre-flight validation logic will be added for exactly-once logic that also applies to these endpoints (and any other endpoints in the future that perform similar pre-flight connector config validation).

When exactly-once support is required

If the exactly.once.support connector configuration is set to required , the connector's SourceConnector::exactlyOnceSupport  method will be queried.

If the result is UNSUPPORTED , then an error will be reported with the exactly.once.support  property stating that the connector does not provide exactly-once support with the given configuration.

If the result is null , then an error will be reported with the exactly.once.support  property stating that Connect cannot determine whether the connector provides exactly-once guarantees, and that the user should carefully consult documentation for the connector before proceeding by possibly setting the property to "requested".

If the result is SUPPORTED , no error will be reported.

When connector-defined transaction boundaries are required

If the transaction.boundary  connector configuration is set to connector , the connector's SourceConnector::canDefineTransactionBoundaries  method will be queried.

If the result is ConnectorTransactionBoundaries.UNSUPPORTED or null , then an error will be reported with the transaction.boundary  property stating that the connector does not support defining its own transaction boundaries, and the user will be encouraged to configure the connector with a different transaction boundary type.

If the result is ConnectorTransactionBoundaries.SUPPORTED , no error will be reported.

New metrics

Three new task-level JMX properties will be added:

MBean namekafka.connect:type=source-task-metrics,connector=([-.\w]+),task=([\d]+)

Metric nameDescription
transaction-size-min The number of records in the smallest transaction the task has committed so far.
transaction-size-max The number of records in the largest transaction the task has committed so far.
transaction-size-avg The average number of records in the transactions the task has committed so far.

Proposed Changes

Atomic offset writes

Offset reads

When exactly.once.source.support is set to enabled for a worker, the consumers used by that worker to read source offsets from offsets topics will be configured with a new default property, isolation.level=read_committed. This will cause them to ignore any records that are part of a non-committed transaction, but still consume records that are not part of a transaction at all.

Users will be not able to explicitly override this in the worker configuration with the property consumer.isolation.level  or in the connector configuration with the property consumer.override.isolation.level. If they attempt to do so, the user-provided value will be ignored and a warning message will be logged notifying them of this fact.

Offset (and record) writes

When exactly.once.source.support is set to enabled for a worker, all source tasks created by that worker will use a transactional producer to write to Kafka. All source records that they provide to the worker will be written to Kafka inside a transaction. Once it comes time for that transaction to be committed, offset information for the current batch will be written to the offsets topic, inside the same transaction. Then, the transaction will be committed. This will ensure that source offsets will be committed to Kafka if and only if the source records for that batch were also written to Kafka.

Users will be able to configure the transaction boundaries for connectors with the newly-introduced transaction.boundary connector configuration property (described in greater detail in the "Public Interfaces" section).

Once an offset commit is complete, if the connector is (implicitly or explicitly) configured with a separate Once an offset commit is complete, if the connector is configured with a custom offsets topic, the committed offsets will also be written to the worker’s global offsets topic using a non-transactional producer and the worker’s principal. This will be handled on a separate thread from the task’s work and offset commit threads, and should not block or interfere with the task at all. If the worker fails to write these offsets for some reason, it will retry indefinitely, but not fail the task. This will be done in order to facilitate "hard" downgrades and cases where users switch from per-connector offsets topics back to the global offsets topic.

...

It may take longer than the transaction timeout for a task to flush all of its records to Kafka. In this case, there are two some remedial actions that users can take to nurse their connector back to health: reconfigure it to produce records with a lower tune their producer configuration for higher throughput, or increase the transaction timeout for the producers used by the connectortransaction timeout for the producers used by the connector, decrease the offset commit interval (if using interval-based transaction boundaries), or switch to the poll  value for the transaction.boundary  property. We will include include these steps in the error message for a task that fails due to producer transaction timeout.

...

Because of this, it cannot be guaranteed that these methods will be invoked after every successful offset commit. However, as long as a task is able to accurately recover based solely off of the offset information provided to the Connect framework with each record it has produced, this should not compromise exactly-once delivery guarantees. These methods can and should still be used to clean up and relinquish resources after record delivery is guaranteed, but using them to track offsets will prevent a connector from being viable for exactly-once support.

Per-connector offsets topics

Whether or not Regardless of the value for the exactly.once.source.enabled is set to true for support property on a worker, source connectors will be allowed to use custom offsets topics, configurable via the offsets.storage.topic property.

Motivation

First, with the advent of KIP-458, it is now possible to bring up a single Connect cluster whose connectors each target a different Kafka cluster (via the producer.override.bootstrap.servers, consumer.override.bootstrap.servers, and/or admin.override.bootstrap.servers connector properties). At the moment, source task offsets are still tracked in a single global offsets topic for the entire Connect cluster; however, if the producer for a task is used to write source offsets, and that producer targets a different Kafka cluster than the one the Connect worker uses for its internal topics, that will no longer be possible. In order to support this case well, per-connector offsets topics must be reasoned about, to allow for smooth upgrades and downgrades where large gaps in offsets data are not created unnecessarily (see “Migration” below). Although users do not necessarily need to be given control over the name of the offsets topic for each connector in this case (the worker could simply create an offsets topic with the same name on the connector’s overridden Kafka cluster, for example), exposing this level of control adds very little complexity to this design once the necessity for per-connector offsets topics is realized. As a benefit, it should make it easier for users to configure connectors in secured environments where the connector principal may have limited access on the Kafka cluster it targets. For users and developers familiar with Kafka Streams, this is the major differentiating factor that makes a global offsets topic impossible.

...

Finally, it allows users to limit the effect that hanging transactions on an offsets topic will have. If tasks A and B use the same offsets topic, and task A initiates a transaction on that offsets topic right before task B starts up, then task A dies suddenly without committing its transaction, task B will have to wait for that transaction to time out before it can read to the end of the offsets topic. If the transaction timeout is set very high for task A (to accommodate bursts of high throughput, for example), this will block task B from processing any data for a long time. Although this scenario may be unavoidable in some cases, using a dedicated offsets topic for each connector should allow cluster administrators to isolate the blast radius of a hanging transaction on an offsets topic. This way, although tasks of the same connector may still interfere with each other, they will at least not interfere with tasks of other connectors. This should be sufficient for most multitenant environments.

Configuration

Only the name of the per-connector offsets topic will be configurable by users. Other properties, such as the number of partitions and the replication factor, will be derived from the worker config using the behavior described in KIP-605.

Hosting Kafka cluster

Regardless of whether the value for the exactly.once.source.enabled  is set to true  for the workersupport property on a worker, if a connector configuration contains a value for the offsets.storage.topic  property, it will use an offsets topic with that name on the Kafka cluster that it produces data to (which may be different from the one that hosts the worker's global offsets topic).

Implicit usage

A per-connector offsets topic might be implicitly configured under certain circumstances. Specifically, this will occur (when exactly-once source support is enabled) for connectors whose configurations do not contain the offsets.storage.topic  property, but do contain an overridden bootstrap.servers  value that causes the connector to target a different Kafka cluster than the one that hosts the worker's global offsets topic.

...

The name of this topic will be the same as the name of the global offsets topic, which is controlled by the offsets.storage.topic  property in the worker config.

Creation

If a connector is explicitly or implicitly configured to use a separate offsets topic but that topic does not exist yet, the worker will automatically try to create the topic before startup for any task and connector instances will automatically try to create the topic before startupor Connector  instances belonging to that connector. This will be done using an admin client constructed from the connector's configuration (using the various admin.override.*  connector properties, admin.* worker properties, and top-level worker properties in descending order of precedence), using the same logic that the worker already uses for its internal topics.

Smooth migration

When a separate offsets topic is created for the first time, it will naturally be empty. In order to avoid losing offset information that may be stored in the global offsets topic, when a connector or task instance requests offsets from the worker, it will be given a combined view of the offset information present in both its separate offsets topic and the worker's global offsets topic. Precedence will be given to offset information present in the separate offsets topic.

...

A new internal endpoint will be added to the REST API:

POST PUT /connectors/{connector}/fence

This endpoint will be secured by the session key mechanism introduced in KIP-507: Securing Internal Connect REST Endpoints and will only be used for inter-worker communication; users should not query it directly. It will be available regardless of the value of whether the exactly.once.source.enabled  is set to true in support  value for the worker.

When a worker receives a request to this endpoint, it will:

  1. Check to see if a rebalance is pending and, if it is, serve an HTTP 409 CONFLICT response. (This step is actually unnecessary and has been removed from the implementation; see
    Jira
    serverASF JIRA
    serverId5aa69414-a9e9-3523-82ec-879b028fb15b
    keyKAFKA-15059
    )
  2. Check to see if it is the leader. If it is not the leader, either forward the request to the leader or fail the request, using at most two hops. This two-hop strategy is already implemented in Connect for other REST endpoints that require forwarding of requests, so it is not described in great detail here.
  3. Verify that the connector in the request URL both exists and is a source connector; if not, fail the request.
  4. Read to the end of the config topic.
  5. If there is a task count record for the connector in the config topic after the latest set of task configs for the connector, serve an empty-bodied 200 response.
  6. If there is an existing task count record for the connector in the config topic, and either the task count in that record is above 1 or the new number of tasks for the connector is above 1*:
    1. Fence out all producers that may still be active for prior task instances of this connector by instantiating an admin client using the connector principal and invoking Admin::fenceProducers (a new API described below; see Admin API to Fence out Transactional Producers). The transactional IDs will be the IDs that each task of the connector would have used, assuming as many tasks were active as the most recent task count record for the connector. This will be done in parallel.
    2. If the leader receives a request to write new task configs for the connector during this period, the round of fencing will be immediately cancelled and an HTTP 409 CONFLICT response will be served.
  7. Write the new task count record for all of the fenced-out connectors connector to the config topic
  8. Read to the end of the config topic to verify that the task count record has been written successfully.
  9. Serve an empty-bodied 200 response.

...

* - The check for task counts is done to avoid unnecessary fencing work for permanently single-task connectors, such as Debezium's CDC source connectors. If the most recent task count record for a connector shows one task, there is only one task that needs to be fenced out. And, if the new configuration for that connector contains one task, that new task will automatically fence out its single predecessor as they will use the same transactional ID. This same logic does not apply for multi-task connectors, even when the number of tasks is unchanged after a reconfiguration; for details on why, see the rejected alternative Non-generational task fencing.

Preparation for rebalance

When exactly.once.source.enabledsupport  is set to true , workers will preemptively stop source tasks when to enabled  and a new set of task configurations for their a connector is detected, then workers will preemptively stop source tasks for that connector. In greater detail:

When a rebalance is triggered, before rejoining the cluster group, workers will preemptively stop all tasks of all source connectors for which task configurations are present in the config topic after the latest task count record. This step is not necessary to preserve exactly-once delivery guarantees, but should provide a reasonable opportunity for tasks to shut down gracefully before being forcibly fenced out.

Stopping these tasks will be done in parallel (as en-masse connector/task stop/start is already done for distributed workers), and any tasks that the worker cannot stop within the graceful task shutdown timeout period (which defaults to five seconds but can be controlled via the task.shutdown.graceful.timeout.ms  worker property) will be abandoned and allowed to continue running. They will be disabled later on if/when the leader fences out their producers during a round of zombie fencing (described below).

Because this work will be done in parallel, the default timeout for task shutdown is fairly low (five seconds), and the current number of threads used to perform en-masse task stops is eight, it is highly unlikely that this will hinder a worker's ability to rejoin a group before the rebalance timeout (which defaults to sixty seconds) times out.

This section was the result of a misunderstanding of the herder's rebalance mechanics, and is actually unnecessary. The worker already performs this preemptive stop for all reconfigured tasks (if using incremental rebalancing) or all tasks (if using eager rebalancing) before (re)joining the group during a rebalance (five seconds), and the current number of threads used to perform en-masse task stops is eight, it is highly unlikely that this will hinder a worker's ability to rejoin a group before the rebalance timeout (which defaults to sixty seconds) times out.

Source task startup

When exactly.once.source.enabledsupport  is set to true enabled  for a worker, extra steps will be taken to ensure that tasks are only run when it is safe to do so.

Before a worker is starts a source task, it will first send a request to the leader's internal zombie fencing endpoint for the task's connector. If that request fails for any reason, the task will be marked as FAILED  and startup will be aborted.

...

Leader access to config topic

Regardless of whether When exactly.once.source.enabledsupport  is set to truepreparing  for or enabled  for a worker, if the worker is the leader of the cluster, it will now use a transactional producer to guarantee that at most one worker is capable of writing to the config topic at any time. In greater detail:

...

By using a transactional producer on the leader, we can guarantee that a leader will only be able to write a task count record to the config topic if no other workers have become the leader and written new task configurations for the connector to the topic since then. We also address existing edge cases in Connect where a zombie leader may write incorrect information to the config topic.

Admin API to Fence out Transactional Producers

Java API additions

The Admin interface will be expanded to the config topic.

Admin API to Fence out Transactional Producers

Java API additions

The Admin interface will be expanded to include new methods for fencing out producers by transactional IDinclude new methods for fencing out producers by transactional ID. This same functionality is already possible by employing one or more transactional producers and invoking their initTransactions  methods, but is replicated on the admin client to make it easier to use for other cases (for more details, see the Fencing-only producers rejected alternative):

Code Block
languagejava
titleAdmin.java
public interface Admin {

    /**
     * Fence out all active producers that use any of the provided transactional IDs.
     *
     * @param transactionalIds The IDs of the producers to fence.
     * @param options  		   The options to use when fencing the producers.
     * @return The FenceProducersResult.
     */
    FenceProducersResult fenceProducers(Collection<String> transactionalIds,
                                        FenceProducersOptions options);

    /**
     * Fence out all active producers that use any of the provided transactional IDs, with the default options.
     * <p>
     * This is a convenience method for {@link #fenceProducers(Collection, FenceProducersOptions)}
     * with default options. See the overload for more details.
     *
     * @param transactionalIds The IDs of the producers to fence.
     * @return The FenceProducersResult.
     */
    default FenceProducersResult fenceProducers(Collection<String> transactionalIds) {
        return fenceProducers(groupIdstransactionalIds, new FenceProducersOptions());
    }

}

...

Code Block
languagejava
titleFenceProducersOptions.java
package org.apache.kafka.clients.admin;

/**
 * Options for {@link AdminClient#fenceProducersAdmin#fenceProducers(Collection, FenceProducersOptions)}
 *
 * The API of this class is evolving. See {@link AdminClientAdmin} for details.
 */
@InterfaceStability.Evolving
public class FenceProducersOptions extends AbstractOptions<FenceProducersOptions> {
    // No options (yet) besides those inherited from AbstractOptions
}

...

The ACLs required for this new API will be the same as the ones required to use a transactional producer for each of the specified transactional IDs. Specifically, this amounts to grants for the Write  and Describe  operations on the TransactionalId  resource, and a grant for the IdempotentWrite operation on the Cluster  resource.

Note that the IdempotentWrite  ACL has been deprecated as of 2.8 (see KIP-679) and will only be necessary for Connect clusters running on pre-2.8 Kafka clusters.

Limitations

  • Distributed mode is required; standalone mode will not be supported (yet) for exactly-once source connectors
  • Exactly-once source support can only be enabled for all source connectors or none; it cannot be toggled on a per-connector basis
  • In order to be viable for exactly-once delivery, connectors must assign source partitions to at most one task at a time (otherwise duplicate writes may occur)
  • In order to be viable for exactly-once delivery, connectors must use the Connect source offset API for tracking progress (otherwise duplicate writes or dropped records may occur)

...

These scenarios can compromise a cluster’s exactly-once delivery guarantees.

Heterogeneous clusters

Heterogeneous clusters

If a worker is active and does not have support for exactly-once delivery (either because exactly.once.source.support  is not set to enabled , or the worker is running an older version of Connect for which the feature is not available)If a worker is active and does not have support for exactly-once delivery, the entire cluster’s ability to provide exactly-once guarantees will be compromised. There is no way to fence out non-compliant workers. Even if one is developed, the problem would only be slightly transformed: if a worker is active that cannot be fenced out by other workers in the cluster, we’d be in the exact same place as before.

...

If anything besides a valid leader worker ends up writing to the config topic, all assumptions that the cluster relies upon to determine whether it is safe to start tasks and whether a fencing is necessary can be invalidated. This should never happen in a reasonably-secured Connect cluster and if it does, compromised delivery guarantees may pale in comparison to the other consequences of corrupting the config topic.

Compatibility, Deprecation, and Migration Plan

...

comparison to the other consequences of corrupting the config topic.

Compatibility, Deprecation, and Migration Plan

Interoperability with rebalancing protocols

Exactly-once support will be possible with both the eager and incremental cooperative rebalancing protocols. The additional design complexity to support both is minimal, and given that both protocols are still supported for the Connect framework, it’s likely that some users will still be using eager rebalancing. They should not have to upgrade to the newer rebalancing protocol in order to gain access to this feature if the work to support both protocols is not prohibitive.

Worker principal permissions

Before upgrading a worker to 3.0, its producer principal setting exactly.once.source.support  to preparing  or enabled , the producer principal for the worker must be given the following permissions on the Kafka cluster it writes to:

Operation

Resource Type

Resource Name

Write

TransactionalId

connect-cluster-${groupId} , where ${groupId } is the group ID of the cluster

Describe

TransactionalId

connect-cluster-${groupId} , where ${groupId } is the group ID of the cluster

IdempotentWrite

IdempotentWrite *

Cluster

Kafka cluster targeted by the Connect cluster

These new ACLs will be required regardless of whether exactly once source support is enabled on the worker.

Interoperability with rebalancing protocols

Exactly-once support will be possible with both the eager and incremental cooperative rebalancing protocols. The additional design complexity to support both is minimal, and given that both protocols are still supported for the Connect framework, it’s likely that some users will still be using eager rebalancing. They should not have to upgrade to the newer rebalancing protocol in order to gain access to this feature if the work to support both protocols is not prohibitive* - Note that the IdempotentWrite  ACL has been deprecated as of 2.8 (see KIP-679) and will only be necessary for Connect clusters running on pre-2.8 Kafka clusters.

Connector principal permissions

...

Operation

Resource Type

Resource Name

Write

TransactionalId

${groupId}-${connector}-${taskId}, for each task that the connector will create, where ${groupId} is the group ID of the Connect cluster, ${connector} is the name of the connector, and ${taskId} is the ID of the task (starting from zero). A wildcarded prefix of ${groupId}-${connector}* can be used for convenience if there is no risk of conflict with other transactional IDs or if conflicts are acceptable to the user.

Describe

TransactionalId

${groupId}-${connector}-${taskId}, for each task that the connector will create, where ${groupId} is the group ID of the Connect cluster, ${connector} is the name of the connector, and ${taskId} is the ID of the task (starting from zero). A wildcarded prefix of ${groupId}-${connector}* can be used for convenience if there is no risk of conflict with other transactional IDs or if conflicts are acceptable to the user.

Write

TransactionalId

${groupId}-${connector}. Only necessary if the connector uses a separate offsets topic.

Describe

TransactionalId

${groupId}-${connector}. Only necessary if the connector uses a separate offsets topic.

IdempotentWrite

Cluster

Kafka cluster targeted by the Connect clusterthe user.

Write

Topic

Offsets topic used by the connector, which is either the value of the offsets.storage.topic property in the connector’s configuration if provided, or the value of the offsets.storage.topic property in the worker’s configuration if not.

IdempotentWrite *

Cluster

Kafka cluster targeted by the connector.

* - Note that the IdempotentWrite  ACL has been deprecated as of 2.8 (see KIP-679) and will only be necessary for Connect clusters running on pre-2.8 Kafka clusters.

Consumer

Each source connector’s consumer principal must be given the following permissions on the Kafka cluster it reads offsets from:

...

If the (implicitly- or explicitly-configured) offsets topic for a connector does not exist yet, its admin principal must be given the following permissions on the Kafka cluster that will host the offsets topic:

Create

Topic

Offsets topic used by the connector, which is the value of the offsets.storage.topic property in the connector’s configuration. If not provided and the connector targets the same Kafka cluster that the worker uses for its internal topics, this ACL is not needed, as the worker’s shared offsets topic should be created automatically before any connectors are started.

Additionally, the connector's admin principal must be given the following permissions on the Kafka cluster it reads offsets from no matter what:

...

This is necessary in order to ensure that the Connect worker has read up to the end of the offsets topic, even when there are open transactions on the topic at the start of the read to the end. This will be accomplished by listing the end offsets for the offsets topic using an admin client with the read_uncommitted  isolation level, and then consuming from that topic with the read_committed  isolation level until at least those offsets listed by the admin client.

Rolling upgrade(s)

At most two rolling upgrades will be required to enable exactly-once source support on a Connect cluster.

The first rolling upgrade will be to upgrade every worker to a version of Connect that can provide exactly-once source support.

The second rolling upgrade will be to actually enable exactly-once source support on each worker (by setting exactly.once.source.enabled  to true ).

Two upgrades will be required in order to ensure that the internal fencing endpoint is available on every worker before it is required by any of them.

Downgrade

Two kinds of downgrade are possible. Exactly-once support for a cluster can be disabled by setting exactly.once.source.enabled to false for workers in the cluster (a “soft” downgrade), or workers can be reverted to use older versions of the Connect framework that does not support exactly-once sources at all (a “hard” downgrade).

Soft downgrade

Soft downgrades should be possible via a cluster roll, where each worker is shut down, its configuration is altered to disable exactly-once source support, and then it is restarted. During the process, some workers that are still configured with exactly-once source support enabled may refuse to start tasks for connectors if a task count record is not found for those connectors after the most recent set of task configurations. This isn’t especially likely given that the most common cause of this would be connector reconfiguration during downgrade, but even if it occurs, it will only affect workers that have yet to be downgraded. There should be no long-lasting impact on the cluster once all workers have been downgraded.

Hard downgrade

Hard downgrades can be performed in the same way as soft downgrades. Because source task offsets on upgraded workers are still written to the worker’s global offsets topic, even if a downgraded worker does not support per-connector offsets topics, it will still pick up on relatively-recent source offsets for its connectors. Some of these offsets may be out-of-date or older than the ones in the connector’s separate offsets topic, but the only consequence of this would be duplicate writes by the connector, which will be possible on any cluster without exactly-once support enabled. Without any writes to the global offsets topic, all records processed by a connector since a switch to a dedicated offsets topic would be re-processed after the downgrade and would likely result in a flood of duplicates. While technically permissible given that the user in this case will have knowingly switched to a version of the Connect framework that doesn't support exactly-once source connectors (and is therefore susceptible to duplicate delivery of records), the user experience in this case could be quite bad, so a little extra effort on the part of Connect to significantly reduce the fallout of downgrades in this case is warranted.

listed by the admin client.

Rolling upgrade(s)

At most two rolling upgrades will be required to enable exactly-once source support on a Connect cluster.

The first rolling upgrade will be to upgrade every worker to a version of Connect that can provide exactly-once source support, and to set exactly.once.source.support  to preparing .

The second rolling upgrade will be to actually enable exactly-once source support on each worker (by setting exactly.once.source.support  to enabled).

Two upgrades will be required in order to ensure that the internal fencing endpoint is available on every worker before it is required by any of them, and that no non-leader workers are able to write to the config topic during the upgrade.

Downgrade

Two kinds of downgrade are possible. Exactly-once support for a cluster can be disabled by setting exactly.once.source.support  to either disabled  or preparing  for workers in the cluster (a “soft” downgrade), or workers can be reverted to use older versions of the Connect framework that does not support exactly-once sources at all (a “hard” downgrade).

Soft downgrade

Soft downgrades should be possible via a cluster roll, where each worker is shut down, its configuration is altered to disable exactly-once source support, and then it is restarted. During the process, all connectors and their tasks should continue to run and process data, thanks to the same logic that makes a rolling upgrade possible.

Hard downgrade

Offsets accuracy

Because source task offsets on upgraded workers are still written to the worker’s global offsets topic, even if a downgraded worker does not support per-connector offsets topics, it can still pick up on relatively-recent source offsets for its connectors. Some of these offsets may be out-of-date or older than the ones in the connector’s separate offsets topic, but the only consequence of this would be duplicate writes by the connector, which will be possible on any cluster without exactly-once support enabled. Without any writes to the global offsets topic, all records processed by a connector since a switch to a dedicated offsets topic would be re-processed after the downgrade and would likely result in a flood of duplicates. While technically permissible given that the user in this case will have knowingly switched to a version of the Connect framework that doesn't support exactly-once source connectors (and is therefore susceptible to duplicate delivery of records), the user experience in this case could be quite bad, so a little extra effort on the part of Connect to significantly reduce the fallout of downgrades in this case is warranted.

If a re-upgrade is desirable at this point, any separate per-connector offsets topics may need to be deleted beforehand. Otherwise, the worker will give precedence to the existing separate offsets topic, even if the data in that topic is stale and there is actually newer information present in the global offsets topic.

Two-step downgrade

The safest way to perform a hard downgrade is to follow the same steps for a rolling upgrade, but in reverse. Specifically:

Perform an initial rolling downgrade wherein exactly.once.source.support  is set to false  for every worker.

Perform a second rolling downgrade where each worker is modified to use an earlier version of Connect.

All connectors and tasks should continue to run and process data, thanks to the same logic that makes a rolling upgrade possible.

Single-step downgrade

If a hard downgrade is performed in a single-step rolling downgrade (i.e., after each worker is shut down, it is immediately downgraded to a lower version of Connect), some tasks may begin to fail as their workers will be unable to reach the internal fencing endpoint on workers that have already been downgraded. Once the downgrade is complete, it should be sufficient to restart these tasks in order to get them running againIf a re-upgrade is desirable at this point, any separate per-connector offsets topics may need to be deleted beforehand. Otherwise, the worker will give precedence to the existing separate offsets topic, even if the data in that topic is stale and there is actually newer information present in the global offsets topic.

Additional configuration properties

The worker-level exactly.once.source.enabledsupport  and connector-level offsets.storage.topic , transaction.boundary , exactly.once.support , and transaction.boundary.interval.ms  configuration properties introduced here should not violate backwards compatibility, as they both all come with default values that preserve existing behavior. Technically, an existing connector may expose a offsets.storage.topic  configuration property that will now conflict with this newly-introduced framework property, but the risk is low enough to be acceptable.

...

Rejected because: in order to maintain the integrity of the config topic, it's imperative that only a single worker be able to access it at a time for a given connector. This could be accomplished by allowing each worker to write to the config topic with a transactional producer whose transactional ID is mapped in a 1:1 fashion from the name of the connector. However, if a rebalancing bug occurs and two non-zombie workers believe they both own the same Connector object, it's unclear how the cluster could gracefully recover from this, and it's likely that manual intervention by the user would be required.

Connector-defined transaction boundaries

Summary: allow connectors to dictate when a transaction should be started, committed, rolled back, etc.

Rejected because: out of scope; can be pursued as an additional feature later onby the user would be required.

Per-connector exactly-once property

...

Summary: instead of expanding the AdminClientAdmin  API to add producer-fencing functionality, just instantiate a throwaway producer for the sole purpose of calling Producer::initTransactions .

...

Rejected because: this approach was significantly more complex, involved duplicating existing data, and provided little if any advantages over the current proposal.

Future Work

Finer control over offset commits

over the current proposal.

Non-configurable transaction boundaries

Summary: instead of allowing users to configure the transaction boundaries for their connector, define all transaction boundaries the same way (by interval, poll batch, connector definition, connector with a fallback of poll, etc.).

Rejected because: there is no one-size-fits-all strategy for transaction boundaries that can be expected to accommodate every reasonable combination of connector and use case. Defining transactions on the batches returned from SourceTask::poll  would heavily limit throughput for connectors that frequently produce small record batches. Defining transactions on an interval would add a latency penalty to the records at the beginning of these transactions and, in the case of very large transactions, would inflate the memory requirements of downstream consumers (which would have to buffer the entire transaction locally within each topic-partition before beginning to process any of the records in it (this is actually incorrect; consumers do not have to buffer transactions locally)). And some connectors may have no reasonable way to define their own transaction boundaries at all.

Future Work

Per-connector granularity

We may want to enable exactly-once to be enabled on a per-connector basis. This could be implemented by instantiating each source task with a "fencible producer" that uses a transactional ID to write to Kafka, but does not actually produce records inside transactions. This way, if exactly-once becomes enabled for that connector, the first round of zombie fencing for it will be able to fence out all prior producer instances, even if they aren't using the traditional transactional producerThe framework periodically commits source offsets at a fixed interval. Since source offset commits are now tied to the commits of the transactions in which source records are sent, Connect users and connector developers may want to trigger offset commits differently. Options for this include allowing source tasks to define their own transaction boundaries, allowing per-connector source offset intervals, or allowing offsets to be committed after a certain number of records.

Standalone mode support

Since the design for atomic writes of source records and their offsets relies on source offsets being stored in a Kafka topic, standalone mode is not eligible. If there is sufficient demand, we may add this capability to standalone mode in the future.

...