Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

When exactly.once.source.enabled is set to to true for a worker, all source tasks created by that worker will use a transactional producer to write to Kafka. All source records that they provide to the worker will be written to Kafka inside a transaction. Once it comes time for offsets to be committed for a task, offset information for the current batch will be written to the offsets topic, inside the same transaction. Then, the transaction will be committed. This will ensure that source offsets will be committed to Kafka if and only if the source records for that batch were also written to Kafka.

...

If a source connector is configured to use a separate offsets topic, the worker will block task startup for as long as necessary (unless interrupted due to task shutdown) until the topic is created. Once the topic exists (after being created by the worker that owns the Connector, if necessary), the worker will read to the end of the offsets topic before starting the task. If there are no offsets for the connector present in the topic and there is no sentinel value in the offsets topic indicating that a migration attempt was made and found no data for the task, the worker will continue to read to the end of the topic until either offset information for the connector or a sentinel value is found. Once this step is completed, the worker will be free to start the task. During this window, the task will either not be present in the REST API's status endpoints (if no prior instances of it had been created), or its state will be UNASSIGNED  (if prior instances had been created and then shut down).

An example of a sentinel offsets record for a connector named “reddit-source” would look like:

...

If the latest task count record for a connector comes after its latest set of task configurations in the config topic, workers will be allowed to create and run tasks for that connector. For example, if the contents of the config topic (represented by the keys of each record) are as follows:

...

then workers will not bring up tasks for the connector.

Fencing during rebalance

...

Preparation for rebalance

When a rebalance is triggered, before rejoining the cluster group, all workers will preemptively stop all tasks of all source connectors for which task configurations are present in the config topic after the latest task count record. This step is not necessary to preserve exactly-once delivery guarantees, but should provide a reasonable opportunity for tasks to shut down gracefully before being forcibly fenced out.

Stopping these tasks will be done in parallel (as en-masse connector/task stop/start is already done for distributed workers) using at most eight threads, and any tasks that the worker cannot stop within the graceful task shutdown timeout period (which defaults to five seconds but can be controlled via the task.shutdown.graceful.timeout.ms  worker property) will be abandoned and allowed to continue running. They will be disabled later on if/when the leader fences out their producers during the rebalance a round of zombie fencing (described below).

Because this work will be done in parallel using up to eight threads and , the default timeout for task shutdown is fairly low (five seconds), and the current number of threads used to perform en-masse task stops is eight, it is highly unlikely that this will hinder a worker's ability to rejoin a group before the rebalance timeout (which defaults to sixty seconds) times out.

Leader

...

Zombie fencing by the leader

...

A new internal endpoint will be added to the REST API:

POST /connectors/{connector}/fence

This endpoint will be secured by the session key mechanism introduced in KIP-507: Securing Internal Connect REST Endpoints and will only be used for inter-worker communication; users should not query it directly.

When a worker receives a request to this endpoint, it will:

  1. Check to see if a rebalance is pending and, if it is, serve an HTTP 409 CONFLICT response.
  2. Check to see if it is the leader. If it is not the leader, either forward the request to the leader or fail the request, using at most two hops. This two-hop strategy is already implemented in Connect for other REST endpoints that require forwarding of requests, so it is not described in great detail here.
  3. Verify that the connector in the request URL both exists and is a source connector; if not, fail the request.
  4. Read to the end of the config topic.
  5. If there is a

...

  1. For each to-be-assigned source connector, if there is a set of task configurations present in the config topic after the most recent task count record for that connector:
  2. If there is an existing
  3. task count record for the connector in the config topic
  4. , and either the task count in
  5. after the latest set of task configs for the connector, serve an empty-bodied 200 response.
  6. If there is an existing task count record for the connector in the config topic, and either the task count in that record is above 1 or the new number of tasks for the connector is above 1*:
    1. Fence out all producers that may still be active for prior task instances of this connector by instantiating an admin client using the connector principal and invoking Admin::fenceProducers (a new API described below; see Admin API to Fence out Transactional Producers). The transactional IDs will be the IDs that each task of the connector would have used, assuming as many tasks were active as the most recent task count record for the connector. This will be done in parallel.
    2. If the leader receives a request to write new task configs for the connector
  7. .** 
    1. during this period, the round of fencing will be immediately cancelled and an HTTP 409 CONFLICT response will be served.
  8. Write the new task count record for all of the fenced-out connectors to the config topic
  9. Read to the end of the config topic to verify that all the task count records have record has been written successfully, and use the new latest offset of the config topic as the offset in the assignment given out to members of the cluster..
  10. Serve an empty-bodied 200 response.

Step 6 will take place on a separate thread in order to avoid blocking the worker's herder tick thread (which is essential for detecting and handling rebalances, and performing the work for some REST requests). Once it is complete, steps 7 through 9 will be handled on the worker's herder tick thread, in order to ensure that the worker has an accurate view of whether new task configs for the connector have been submitted during step 6.

* - The check for task counts is done to avoid unnecessary fencing work for permanently single-task connectors, such as Debezium's CDC source connectors. If the most recent task count record for a connector shows one task, there is only one task that needs to be fenced out. And, if the new configuration for that connector contains one task, that new task will automatically fence out its single predecessor as they will use the same transactional ID. 

**  - This step will be done in parallel for all connectors

...

Requesting zombie fencing from the leader

On source connector startup

When a worker is assigned a source task, it will check to see if a task count record is present in the config topic after the most recent task configs for that connector.

If that is the case, the worker can assume that all producers for all prior generations of tasks for that connector have been fenced out, and it will bring up the source tasks it has been assigned. 

Otherwise, the worker will emit a warning log message and refuse to bring the task up until a rebalance occurs and it has a chance to check again for a task count record after the most recent set of task configurations for the connector.

Once a worker has instantiated a producer for a source task, it will read to the end of the config topic once more, and if a new set of task configurations for that connector has been generated, it will abort startup of the task.

To summarize, the startup process for a task will be:

  1. Worker is assigned a source task during rebalance
  2. If there is no task count record present in the config topic for that connector, or there is a set of task configurations present after the latest task count record, refuse to bring up the task and await a subsequent rebalance
  3. Otherwise:
    1. Instantiate a transactional producer for the task
    2. Read to the end of the config topic
    3. If a new set of task configurations has since been generated for the connector, abandon startup *
    4. Otherwise, begin polling the task for data

* - If this happens, a new task will be automatically brought up in place of this task, in response to the new set of task configurations in the config topic. No action (such as restarting the task) will be necessary on the part of the user.

Leader access to config topic

After a rebalance, a worker may discover that it has become the leader of the cluster. When this happens, the worker will instantiate a transactional producer whose transactional ID is connect-cluster-${groupId} , where ${groupId } is the group ID of the cluster. If users try to override this by setting the transactional.id worker property, the user-provided value will be ignored and a warning message will be logged notifying them of this fact. The worker will use this producer for all writes it performs on the config topic. It will begin and commit a transaction for every record it writes. This may seem unusual--why use a transactional producer if transactions aren’t really necessary?--but it ensures that only the most recent leader is capable of producing to the config topic and that zombie leaders (rare though they may be) will not be able to.

If, for example, a leader stalls while attempting to fence out a prior generation of task producers for a given connector during a rebalance, it may fall out of the group and become a zombie. If another worker is elected leader at this point and the connector is then reconfigured, it’s possible that the zombie leader may become unblocked and then attempt to write a task count record to the config topic after a new set of task configurations are written to the topic. This would corrupt the state of the config topic and violate a key assumption: a task count record present after task configurations for a connector means that it is not necessary to fence out a prior generation of producers for that connector’s tasks before starting them.

By using a transactional producer on the leader, we can guarantee that a leader will only be able to write a task count record to the config topic if no other workers have become the leader and written new task configurations for the connector to the topic since then.

Admin API to Fence out Transactional Producers

Java API additions

The Admin interface will be expanded to include new methods for fencing out producers by transactional ID:

source Connector is started (as a result of creation by the user, reconfiguration by the user, manual restart by the user, reassignment during rebalance, or restart after an eager rebalance), the worker that owns that Connector will request a round of zombie fencing for that connector from the leader.

The sequence of connector startup events will now be:

  1. Instantiate the Connector object, its ConnectorContext , its OffsetStorageReader , etc.
  2. Invoke Connector::start .
  3. Request a set of task configs from the connector using Connector::taskConfigs 
  4. If those task configs differ from the latest set of task configs in the config topic:
    1. Write those task configs to the config topic (or, if not the leader, forward those to the leader, who will write them to the config topic)
  5. Otherwise (new step):
    1. Request a new round of zombie fencing for the connector from the leader

With this startup sequence, if a round of zombie fencing should fail after, e.g., connector reconfiguration, the user will be able to manually trigger a new round of zombie fencing by restarting the connector.

On new source task configs

When a new set of task configs for a source connector is read from the config topic, the worker that owns the Connector object for that connector will automatically request a round of zombie fencing for the connector after the ensuing rebalance is complete.

This will give workers that own task instances for the connector a chance to gracefully shut them down before rejoining the group during the rebalance, instead of being ungracefully fenced out as soon as the Connector generates a new set of task configs.

Source task startup

When a worker is assigned a source task, it will check to see if a task count record is present in the config topic after the most recent task configs for that connector.

If that is the case, the worker can assume that all producers for all prior generations of tasks for that connector have been fenced out, and it will bring up the source tasks it has been assigned. 

Otherwise, the worker will emit a warning log message and refuse to bring the task up until a task count record for the connector is read from the config topic.

Once a worker has instantiated a producer for a source task, it will read to the end of the config topic once more, and if a new set of task configurations for that connector has been generated, it will abort startup of the task.

To summarize, the startup process for a task will be:

  1. Worker is assigned a source task during rebalance
  2. If there is no task count record present in the config topic for that connector, or there is a set of task configurations present after the latest task count record, refuse to bring up the task and await a task count record for the connector to appear in the config topic before bringing up the task
  3. If the connector is configured to use a custom offsets topic, wait for the topic to become available (as described in Migration)
  4. Otherwise:
    1. Instantiate a transactional producer for the task
    2. Read to the end of the config topic
    3. If a new set of task configurations has since been generated for the connector, abandon startup *
    4. Otherwise, begin polling the task for data

* - If this happens, a new task will be automatically brought up in place of this task, in response to the new set of task configurations in the config topic. No action (such as restarting the task) will be necessary on the part of the user.

Leader access to config topic

After a rebalance, if a worker discovers that it has become the leader of the cluster, it will instantiate a transactional producer whose transactional ID is connect-cluster-${groupId} , where ${groupId } is the group ID of the cluster. If users try to override this by setting the transactional.id property in their worker config, the user-provided value will be ignored and a warning message will be logged notifying them of this fact. The worker will use this producer for all writes it performs on the config topic. It will begin and commit a transaction for every record it writes. This may seem unusual--why use a transactional producer if transactions aren’t really necessary?--but it ensures that only the most recent leader is capable of producing to the config topic and that zombie leaders (rare though they may be) will not be able to.

If, for example, a leader stalls while attempting to fence out a prior generation of task producers for a connector, it may fall out of the group and become a zombie. If another worker is elected leader at this point and the connector is then reconfigured, it’s possible that the zombie leader may become unblocked and then attempt to write a task count record to the config topic after a new set of task configurations are written to the topic. This would corrupt the state of the config topic and violate a key assumption: a task count record present after task configurations for a connector means that it is not necessary to fence out a prior generation of producers for that connector’s tasks before starting them.

By using a transactional producer on the leader, we can guarantee that a leader will only be able to write a task count record to the config topic if no other workers have become the leader and written new task configurations for the connector to the topic since then.

Admin API to Fence out Transactional Producers

Java API additions

The Admin interface will be expanded to include new methods for fencing out producers by transactional ID:

Code Block
languagejava
titleAdmin.java
public interface Admin {

    /**
     * Fence out all active producers that use any of the provided transactional IDs.
     *
     * @param transactionalIds The IDs of the producers to fence.
     * @param options  		   The options to use when fencing the producers.
     * @return The FenceProducersResult.
     */
    FenceProducersResult fenceProducers(Collection<String> transactionalIds,
                                        FenceProducersOptions options);

    /**
     * Fence out all active producers that use any of the provided transactional IDs, with the default options.
     * <p>
     * This is a convenience method for {@link #fenceProducers(Collection, FenceProducersOptions)}
     * with default options. See the overload for more details.
     *
     * @param transactionalIds The IDs of the producers to fence.
     * @return The FenceProducersResult.
     */
    default FenceProducersResult fenceProducers(Collection<String> transactionalIds) {
        return fenceProducers(groupIds, new FenceProducersOptions());
    }

}

New classes for the API will be added as well.

The FenceProducersResult  class:

Code Block
languagejava
titleFenceProducersResult.java
package org.apache.kafka.clients.admin;

/**
 * The result of the {@link Admin#fenceProducers(Collection)} call.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class FenceProducersResult {

    /**
     * Return a map from transactional ID to futures which can be used to check the status of
     * individual fencings.
     */
    public Map<String, KafkaFuture<Void>> fencedProducers() {
		// Implementation goes here
    }

	/**
	 * Returns a future that provides the producer ID generated while initializing the given transaction when the request completes.
	 */
	public KafkaFuture<Long> producerId(String transactionalId) {
		// Implementation goes here
	}

	/**
 	 * Returns a future that provides the epoch ID generated while initializing the given transaction when the request completes.
	 */
	public KafkaFuture<Short> epochId(String transactionalId) {
		// Implementation goes here
	}

    /**
     * Return a future which succeeds only if all the producer fencings succeed
Code Block
languagejava
titleAdmin.java
public interface Admin {

    /**
     * Fence out all active producers that use any of the provided transactional IDs.
     *
     * @param transactionalIds The IDs of the producers to fence.
     * @param options  		   The options to use when fencing the producers.
     * @return The FenceProducersResult.
     */
    public FenceProducersResultKafkaFuture<Void> fenceProducersall(Collection<String>) transactionalIds,
   {
		// Implementation goes here
    }

}

And the FenceProducersOptions  class:

Code Block
languagejava
titleFenceProducersOptions.java
package org.apache.kafka.clients.admin;

/**
 * Options for {@link AdminClient#fenceProducers(Collection, FenceProducersOptions)}
 *
  * The API of this class is evolving. See {@link AdminClient} for details.
 */
@InterfaceStability.Evolving
public class FenceProducersOptions extends AbstractOptions<FenceProducersOptions> {
    // FenceProducersOptionsNo options);

    /**
     * Fence out all active producers that use any of the provided transactional IDs, with the default options.
     * <p>
     * This is a convenience method for {@link #fenceProducers(Collection, FenceProducersOptions)}
     * with default options. See the overload for more details.
     *
     * @param transactionalIds The IDs of the producers to fence.
     * @return The FenceProducersResult.
     */
    default FenceProducersResult fenceProducers(Collection<String> transactionalIds) {
        return fenceProducers(groupIds, new FenceProducersOptions());
    }

}

New classes for the API will be added as well.

The FenceProducersResult  class:

Code Block
languagejava
titleFenceProducersResult.java
package org.apache.kafka.clients.admin;

/**
 * The result of the {@link Admin#fenceProducers(Collection)} call.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class FenceProducersResult {

    /**
     * Return a map from transactional ID to futures which can be used to check the status of
     * individual fencings.
     */
    public Map<String, KafkaFuture<Void>> fencedProducers() {
		// Implementation goes here
    }

	/**
	 * Returns a future that provides the producer ID generated while initializing the given transaction when the request completes.
	 */
	public KafkaFuture<Long> producerId(String transactionalId) {
		// Implementation goes here
	}

	/**
 	 * Returns a future that provides the epoch ID generated while initializing the given transaction when the request completes.
	 */
	public KafkaFuture<Short> epochId(String transactionalId) {
		// Implementation goes here
	}

    /**
     * Return a future which succeeds only if all the producer fencings succeed.
     */
    public KafkaFuture<Void> all() {
		// Implementation goes here
    }

}

And the FenceProducersOptions  class:

Code Block
languagejava
titleFenceProducersOptions.java
package org.apache.kafka.clients.admin;

/**
 * Options for {@link AdminClient#fenceProducers(Collection, FenceProducersOptions)}
 *
 * The API of this class is evolving. See {@link AdminClient} for details.
 */
@InterfaceStability.Evolving
public class FenceProducersOptions extends AbstractOptions<FenceProducersOptions> {
    // No options (yet) besides those inherited from AbstractOptions
}

Implementation

Under the hood, this will be implemented by finding the transaction coordinator for each transactional ID via the FindCoordinator protocol, then initializing a new transaction with that ID by invoking the InitProducerId protocol with no producer ID and no producer epoch. In the words of KIP-98, this:

  1. Bumps up the epoch of the PID, so that the any previous zombie instance of the producer is fenced off and cannot move forward with its transaction.
  2. Recovers (rolls forward or rolls back) any transaction left incomplete by the previous instance of the producer.

Limitations

  • Distributed mode is required; standalone mode will not be supported (yet) for exactly-once source connectors
  • Exactly-once source support can only be enabled for all source connectors or none; it cannot be toggled on a per-connector basis
  • In order to be viable for exactly-once delivery, connectors must assign source partitions to at most one task at a time (otherwise duplicate writes may occur)
  • In order to be viable for exactly-once delivery, connectors must use the Connect source offset API for tracking progress (otherwise duplicate writes or dropped records may occur)

Addressed failure/degradation scenarios

These scenarios will not compromise a cluster’s exactly-once delivery guarantees.

Leader cannot write task count record to config topic during rebalance

The leader will try indefinitely to write the task count to the config topic during rebalance and verify that it has been written by reading it back from the topic after writing.

If this takes too long, the rebalance timeout will elapse and a new leader will be elected. If the (now-dethroned) leader becomes unblocked during this process and attempts to write to the config topic, it will have been fenced out by its replacement. If it has been blocked on reading from the config topic, it will wake up and attempt to complete the rebalance, only to be informed by the group coordinator that it is no longer a member of the group. It will then attempt to rejoin the group.

If this does not block but instead fails for some reason, the leader will immediately leave the group and then attempt to rejoin.

Cannot fence out producers during rebalance

The leader may be unable to fence out producers during a rebalance if, for example, it is unable to reach or locate a transaction coordinator.

Automatically marking all tasks for that connector as failed may seem appealing at first, but it's unclear how to do this in a reasonable way that doesn't mask the fact that there are actually tasks still running for that connector (which there very well may be).

Marking the entire rebalance as failed and forcing a follow-up round would be simpler to design and less likely to lead to compromised delivery guarantees, but it would also impact any other connectors and tasks that were due to be reassigned in that round.

To compromise, the leader will limit the time it blocks for producer fencing to complete to be at most half of the configured session timeout for the cluster. If that time elapses and the leader has not yet fenced out the producers for some tasks, it will continue with the rebalance and happily assign all connectors and tasks as normal. No worker will start up new task instances for not-yet-fenced connectors as there will be no task count record present in the config topic after the most recent set of task configurations. Once the leader is able to successfully complete fencing out all producers for a given connector, it will write the task count record to the config topic, read it back, and force a rebalance by rejoining the group. At this point, all workers will be able to (re-)start tasks for the newly-fenced-out connector.

Manual restart of fenced-out tasks

If a task fails because its producer has been fenced out, the user may try to manually restart that task through the REST API. If the worker’s view of the config topic is up-to-date, this will simply result in the restarted task replacing any existing instances of the same task, and exactly-once delivery guarantees will be preserved. However, if the worker has an outdated view of the config topic, restarting that task may compromise exactly-once guarantees if that task ends up producing data from a source partition that has already been assigned to a different task in the latest set of task configurations for the connector.

In order to prevent manual user restarts from compromising exactly-once delivery guarantees, Connect workers will read to the end of the config topic before proceeding with any source task restarts. If the worker is unable to read to the end of the config topic, the restart request will be met with a 500 timeout response. If the worker discovers during its read to the end of the topic that the task no longer exists (because the connector has been deleted or its task count reduced), it will return the usual 404 not found response. If a rebalance becomes expected because of new information read from the config topic, a 409 conflict response will be given.

Worker lag after being assigned source task but before instantiating producer

If a worker is assigned a source task and sees a task count record in the config topic after the latest set of task configurations for that connector, it comes with the assumption that all producers for all prior task generations of that connector have been fenced out. And, at the time of the fencing performed by the leader, this assumption likely holds. However, it's possible that a worker may be assigned a source task, observe a task count record in the config topic that indicates that it is safe to start the task, then block for some large period of time before it is able to construct a producer for that task. For example, consider a scenario with workers F (follower), O (other follower), and L (leader), operating on task T of source connector C:

  1. Connector C is reconfigured and new task configurations for it are written to the config topic
  2. A rebalance is triggered and worker L is able to fence out all possibly-active producers for connector C, write a task count record for C to the config topic, and then assign task T to worker F
  3. Worker F receives its assignment and is able to read the most-recent task count record for connector C from the config topic
  4. Worker F blocks, before it is able to instantiate a transactional producer for task T
  5. Connector C is reconfigured and new task configurations for it are written to the config topic
  6. A rebalance is triggered and worker L is able to fence out all possibly-active producers for connector C, write a task count record for C to the config topic, and then assign task T to worker O as F has fallen out of the group
  7. Worker O receives its assignment and is able to read the most-recent task count record for connector C from the config topic, instantiate a transactional producer for task T, and then begin processing data from that task
  8. Worker F becomes unblocked, instantiates a transactional producer for task T, and then begins processing data for that task

In this scenario, if a source partition initially assigned to task T is reassigned to a different task during the reconfiguration in step 5, the task instance created by worker F in step 8 will begin producing duplicate data and exactly-once delivery guarantees will be compromised.

This is possible because it is not guaranteed at the time of the fencing performed by the leader that all producers that could be active for a connector have been created yet. If a producer has not yet been created, it cannot be fenced out; if it is created after the fencing occurs, a different approach must be taken to ensure that no duplicate data is produced.

This case should be covered by the additional read to the end of the config topic by the worker after it has brought up a transactional producer for a source task. If the connector is reconfigured after the worker became blocked and a round of producer fencing has occurred, then the worker will bring up a transactional producer for its source task, but then discover the connector reconfiguration and abort startup. If the worker has brought up a transactional producer and then become blocked before completing its read to the end of the config topic, the round of producer fencing by the leader should fence out that producer and any subsequent restart attempts will block until/unless the worker is able to complete a read to the end of the config topic (and handle any rebalances necessitated by new information read from the topic in the process).

Accidental task commit of dropped messages

If a transactional producer is successfully instantiated, it will not attempt to contact the broker again until an attempt is made to actually send a record. This can lead to interesting behavior: the producer is brought up and able to make initial contact with the transaction coordinator, then the entire Kafka cluster dies, then the producer begins and commits a transaction without attempting to send any records, and no exception or other indication of failure is generated by the producer at any point in this sequence of events.

When might this occur? If a task is configured with an aggressive SMT that drops all the records in a given batch, its producer will never attempt to send any records before committing the current transaction. And, once this occurs, the worker will invoke SourceTask::commit, which may cause the task to drop the data from the upstream system (for example, by acknowledging a batch of messages from a JMS broker). Even if this occurs, it should not be a problem: it’s fine to drop messages from the upstream system that are meant to be dropped by the connector, regardless of whether source offsets for those records have been committed to Kafka, as the end result is the same either way.

Permitted failure scenarios

These scenarios can compromise a cluster’s exactly-once delivery guarantees.

Heterogeneous clusters

If a worker is active and does not have support for exactly-once delivery, the entire cluster’s ability to provide exactly-once guarantees will be compromised. There is no way to fence out non-compliant workers. Even if one is developed, the problem would only be slightly transformed: if a worker is active that cannot be fenced out by other workers in the cluster, we’d be in the exact same place as before.

This failure scenario will need to be called out in public documentation so that users understand that they are responsible for the operational burden of ensuring that no non-compliant workers are active.

Non-worker producer to config topic

If anything besides a worker ends up writing to the config topic, all assumptions that the cluster relies upon to determine whether it is safe to start tasks and whether a fencing is necessary during rebalance can be invalidated. This should never happen in a reasonably-secured Connect cluster and if it does, compromised delivery guarantees may pale in comparison to the other consequences of corrupting the config topic.

Compatibility, Deprecation, and Migration Plan

Interoperability with rebalancing protocols

Exactly-once support will be possible with both the eager and incremental cooperative rebalancing protocols. The additional design complexity to support both is minimal, and given that both protocols are still supported for the Connect framework, it’s likely that some users will still be using eager rebalancing. They should not have to upgrade to the newer rebalancing protocol in order to gain access to this feature if the work to support both protocols is not prohibitive.

Worker principal permissions

Producer

Before enabling exactly-once source support on a worker, its producer principal must be given the following permissions on the Kafka cluster it writes to:

...

Operation

...

Resource Type

...

Resource Name

...

Write

...

TransactionalId

...

connect-cluster-${groupId} , where ${groupId } is the group ID of the cluster

...

IdempotentWrite

...

Cluster

...

Kafka cluster targeted by the Connect cluster

 (yet) besides those inherited from AbstractOptions
}

Implementation

Under the hood, this will be implemented by finding the transaction coordinator for each transactional ID via the FindCoordinator protocol, then initializing a new transaction with that ID by invoking the InitProducerId protocol with no producer ID and no producer epoch. In the words of KIP-98, this:

  1. Bumps up the epoch of the PID, so that the any previous zombie instance of the producer is fenced off and cannot move forward with its transaction.
  2. Recovers (rolls forward or rolls back) any transaction left incomplete by the previous instance of the producer.

Limitations

  • Distributed mode is required; standalone mode will not be supported (yet) for exactly-once source connectors
  • Exactly-once source support can only be enabled for all source connectors or none; it cannot be toggled on a per-connector basis
  • In order to be viable for exactly-once delivery, connectors must assign source partitions to at most one task at a time (otherwise duplicate writes may occur)
  • In order to be viable for exactly-once delivery, connectors must use the Connect source offset API for tracking progress (otherwise duplicate writes or dropped records may occur)

Addressed failure/degradation scenarios

These scenarios will not compromise a cluster’s exactly-once delivery guarantees.

Leader cannot write task count record to config topic during zombie fencing

The leader will try indefinitely during a round of zombie fencing to write the task count to the config topic and verify that it has been written by reading it back from the topic after writing.

If this takes too long, the worker may fall out of the group and a new leader may be elected. If the (now-dethroned) leader becomes unblocked during this process and attempts to write to the config topic, it will have been fenced out by its replacement. If it has been blocked on reading from the config topic, it will wake up and finish serving a response. In the next iteration of the herder tick thread, it will discover that it has fallen out of the group and attempt to rejoin.

If the attempt to read from/write to the config topic does not block but instead fails for some reason, the leader will immediately leave the group and then attempt to rejoin, serving an HTTP 500 ERROR response containing a stack trace for the failure to interact with the config topic.

Cannot fence out producers during zombie fencing

The leader may be unable to fence out producers during a round of zombie fencing if, for example, it is unable to reach or locate a transaction coordinator, or the connector's admin principal lacks the ACLs necessary to fence out the producers for its tasks.

If this happens, the leader will serve an HTTP 500 ERROR response containing a stack trace for the failure to fence out the producer(s).

For some errors (such as those caused by insufficient ACLs), the stack trace will include a helpful error message containing instructions on how to remediate the failure (such as granting the connector's admin principal permission to fence out producers with a list or description of applicable transactional IDs).

The worker that receives this 500 response will mark the Connector object FAILED in the status topic, and use the stack trace contained in the response to populate the trace  field for the status message.

Manual restart of fenced-out tasks

If a task fails because its producer has been fenced out, the user may try to manually restart that task through the REST API. If the worker’s view of the config topic is up-to-date, this will simply result in the restarted task replacing any existing instances of the same task, and exactly-once delivery guarantees will be preserved. However, if the worker has an outdated view of the config topic, restarting that task may compromise exactly-once guarantees if that task ends up producing data from a source partition that has already been assigned to a different task in the latest set of task configurations for the connector.

In order to prevent manual user restarts from compromising exactly-once delivery guarantees, Connect workers will read to the end of the config topic before proceeding with any source task restarts. If the worker is unable to read to the end of the config topic, the restart request will be met with an HTTP 500 TIMEOUT response. If the worker discovers during its read to the end of the topic that the task no longer exists (because the connector has been deleted or its task count reduced), it serve the usual HTTP 404 NOT FOUND response. If a rebalance becomes expected because of new information read from the config topic, an HTTP 409 CONFLICT response will be served.

Worker lag after being assigned source task but before instantiating producer

If a worker is assigned a source task and sees a task count record in the config topic after the latest set of task configurations for that connector, it comes with the assumption that all producers for all prior task generations of that connector have been fenced out. And, at the time of the fencing performed by the leader, this assumption likely holds. However, it's possible that a worker may be assigned a source task, observe a task count record in the config topic that indicates that it is safe to start the task, then block for some large period of time before it is able to construct a producer for that task. For example, consider a scenario with workers F (follower), O (other follower), and L (leader), operating on task T of source connector C:

  1. Connector C is reconfigured and new task configurations for it are written to the config topic
  2. A rebalance is triggered and worker L is able to fence out all possibly-active producers for connector C, write a task count record for C to the config topic, and then assign task T to worker F
  3. Worker F receives its assignment and is able to read the most-recent task count record for connector C from the config topic
  4. Worker F blocks, before it is able to instantiate a transactional producer for task T
  5. Connector C is reconfigured and new task configurations for it are written to the config topic
  6. A rebalance is triggered and worker L is able to fence out all possibly-active producers for connector C, write a task count record for C to the config topic, and then assign task T to worker O as F has fallen out of the group
  7. Worker O receives its assignment and is able to read the most-recent task count record for connector C from the config topic, instantiate a transactional producer for task T, and then begin processing data from that task
  8. Worker F becomes unblocked, instantiates a transactional producer for task T, and then begins processing data for that task

In this scenario, if a source partition initially assigned to task T is reassigned to a different task during the reconfiguration in step 5, the task instance created by worker F in step 8 will begin producing duplicate data and exactly-once delivery guarantees will be compromised.

This is possible because it is not guaranteed at the time of the fencing performed by the leader that all producers that could be active for a connector have been created yet. If a producer has not yet been created, it cannot be fenced out; if it is created after the fencing occurs, a different approach must be taken to ensure that no duplicate data is produced.

This case should be covered by the additional read to the end of the config topic by the worker after it has brought up a transactional producer for a source task. If the connector is reconfigured after the worker became blocked and a round of producer fencing has occurred, then the worker will bring up a transactional producer for its source task, but then discover the connector reconfiguration and abort startup. If the worker has brought up a transactional producer and then become blocked before completing its read to the end of the config topic, the round of producer fencing by the leader should fence out that producer and any subsequent restart attempts will block until/unless the worker is able to complete a read to the end of the config topic (and handle any rebalances necessitated by new information read from the topic in the process).

Accidental task commit of dropped messages

If a transactional producer is successfully instantiated, it will not attempt to contact the broker again until an attempt is made to actually send a record. This can lead to interesting behavior: the producer is brought up and able to make initial contact with the transaction coordinator, then the entire Kafka cluster dies, then the producer begins and commits a transaction without attempting to send any records, and no exception or other indication of failure is generated by the producer at any point in this sequence of events.

When might this occur? If a task is configured with an aggressive SMT that drops all the records in a given batch, its producer will never attempt to send any records before committing the current transaction. And, once this occurs, the worker will invoke SourceTask::commit, which may cause the task to drop the data from the upstream system (for example, by acknowledging a batch of messages from a JMS broker). Even if this occurs, it should not be a problem: it’s fine to drop messages from the upstream system that are meant to be dropped by the connector, regardless of whether source offsets for those records have been committed to Kafka, as the end result is the same either way.

Permitted failure scenarios

These scenarios can compromise a cluster’s exactly-once delivery guarantees.

Heterogeneous clusters

If a worker is active and does not have support for exactly-once delivery, the entire cluster’s ability to provide exactly-once guarantees will be compromised. There is no way to fence out non-compliant workers. Even if one is developed, the problem would only be slightly transformed: if a worker is active that cannot be fenced out by other workers in the cluster, we’d be in the exact same place as before.

This failure scenario will need to be called out in public documentation so that users understand that they are responsible for the operational burden of ensuring that no non-compliant workers are active.

Non-worker, non-leader producer to config topic

If anything besides a valid leader worker ends up writing to the config topic, all assumptions that the cluster relies upon to determine whether it is safe to start tasks and whether a fencing is necessary can be invalidated. This should never happen in a reasonably-secured Connect cluster and if it does, compromised delivery guarantees may pale in comparison to the other consequences of corrupting the config topic.

Compatibility, Deprecation, and Migration Plan

Interoperability with rebalancing protocols

Exactly-once support will be possible with both the eager and incremental cooperative rebalancing protocols. The additional design complexity to support both is minimal, and given that both protocols are still supported for the Connect framework, it’s likely that some users will still be using eager rebalancing. They should not have to upgrade to the newer rebalancing protocol in order to gain access to this feature if the work to support both protocols is not prohibitive.

Worker principal permissions

Producer

Before enabling exactly-once source support on a worker, its producer principal must be given the following permissions on the Kafka cluster it writes to:

Operation

Resource Type

Resource Name

Write

TransactionalId

connect-cluster-${groupId} , where ${groupId } is the group ID of the cluster

IdempotentWrite

Cluster

Kafka cluster targeted by the Connect cluster

Connector principal permissions

Producer

Each source connector’s producer principal must be given the following permissions on the Kafka cluster it writes to:

Operation

Resource Type

Resource Name

Write

TransactionalId

${groupId}-${connector}-${taskId}, for each task that the connector will create, where ${groupId} is the group ID of the Connect cluster, ${connector} is the name of the connector, and ${taskId} is the ID of the task (starting from zero). A wildcarded prefix of ${groupId}-${connector}* can be used for convenience if there is no risk of conflict with other transactional IDs or if conflicts are acceptable to the user.

Write

TransactionalId

${groupId}-${connector}. Only necessary if the connector uses a separate offsets topic.

IdempotentWrite

Cluster

Kafka cluster targeted by the Connect cluster

Write

Topic

Offsets topic used by the connector, which is either the value of the offsets.storage.topic property in the connector’s configuration if provided, or the value of the offsets.storage.topic property in the worker’s configuration if not.

Consumer

Each source connector’s consumer principal must be given the following permissions on the Kafka cluster it reads offsets from:

Operation

Resource Type

Resource Name

Read

Topic

Offsets topic used by the connector, which is either the value of the offsets.storage.topic property in the connector’s configuration if provided, or the value of the offsets.storage.topic property in the worker’s configuration if not.

Note that there is currently no use case for Connect that requires a consumer principal to be configured for source connectors. As a result, this proposed change technically introduces "new" configuration properties for source connectors: consumer-level overrides prefixed with consumer.override. , as described in KIP-458: Connector Client Config Override Policy.

Admin

If the offsets topic for a connector does not exist yet, the connector’s admin

Connector principal permissions

Producer

Each source connector’s producer principal must be given the following permissions on the Kafka cluster it writes toreads offsets from:

if there is no risk of conflict with other transactional IDs or if conflicts are acceptable to the user.

Operation

Resource Type

Resource Name

WriteTransactionalId${groupId}-${connector}-${taskId}, for each task that the connector will create, where ${groupId} is the group ID of the Connect cluster, ${connector} is the name of the connector, and ${taskId} is the ID of the task (starting from zero). A wildcarded prefix of ${groupId}-${connector}* can be used for convenience

Write

TransactionalId

${groupId}-${connector}. Only necessary if the connector uses a separate offsets topic.

IdempotentWrite

Cluster

Kafka cluster targeted by the Connect cluster

Write

Topic

Offsets topic used by the connector, which is either the value of the offsets.storage.topic property in the connector’s configuration if provided, or the value of the offsets.storage.topic property in the worker’s configuration if not.

Consumer

Each source connector’s consumer principal must be given the following permissions on the Kafka cluster it reads offsets from:

if there is no risk of conflict with other transactional IDs or if conflicts are acceptable to the user.

Create

Operation

Resource Type

Resource Name

Read

Topic

Offsets topic used by the connector, which is either the value of the offsets.storage.topic property in the connector’s configuration if provided, or the value of the offsets.storage.topic property in the worker’s configuration if not.

Note that there is currently no use case for Connect that requires a consumer principal to be configured for source connectors. As a result, this proposed change technically introduces "new" configuration properties for source connectors: consumer-level overrides prefixed with consumer.override. , as described in KIP-458: Connector Client Config Override Policy.

Admin

the value of the offsets.storage.topic property in the connector’s configuration. If not provided, this ACL is not needed, as the worker’s shared offsets topic should be created automatically before any connectors are started.

Additionally, the connector's If the offsets topic for a connector does not exist yet, the connector’s admin principal must be given the following permissions on the Kafka cluster it reads offsets from no matter what:

OperationResource TypeResource Name
WriteTransactionalId${groupId}-${connector}-${taskId}, for each task that the connector will create, where ${groupId} is the group ID of the Connect cluster, ${connector} is the name of the connector, and ${taskId} is the ID of the task (starting from zero). A wildcarded prefix of ${groupId}-${connector}* can be used for convenience if there is no risk of conflict with other transactional IDs or if conflicts are acceptable to the user.

Create

Topic

Offsets topic used by the connector, which is the value of the offsets.storage.topic property in the connector’s configuration. If not provided, this ACL is not needed, as the worker’s shared offsets topic should be created automatically before any connectors are started.

Additionally, the connector's admin principal must be given the following permissions on the Kafka cluster it reads offsets from no matter what:

...

DescribeTopicOffsets topic used by the connector, which is the value of the offsets.storage.topic property in the connector’s configuration. If not provided, this ACL is not needed, as the worker’s shared offsets topic should be created automatically before any connectors are started.

This is necessary in order to ensure that the Connect worker has read up to the end of the offsets topic, even when there are open transactions on the topic at the start of the read to the end. This will be accomplished by listing the end offsets for the offsets topic using an admin client with the read_uncommitted  isolation level, and then consuming from that topic with the read_committed  isolation level until at least those offsets listed by the admin client.

Mass (non-rolling) upgrade

Rolling upgrades to enable exactly-once support on a cluster will not work and instead a mass upgrade will be required (or at least, heavily recommended).

There are two reasons for this:

  1. The new internal endpoint will not be available during a rolling upgrade if a worker has been upgraded to a version of Connect that supports exactly-once source connectors, but the leader has not. The Connector will then fail, and no tasks for it will be allowed to start until/unless the user manually intervenes and restarts the Connector after the leader has been upgraded.
  2. A not-yet-upgraded zombie leader may write to the config topic during a rolling upgrade and corrupt its contents in a way that damages delivery guarantees.

Additionally, it becomes possible that some source tasks won't begin running until after the upgrade is complete (depending on when the leader of the cluster and the owner of their Connector object have both been upgraded). With this in mind, it's possible it may actually be less painful to perform a mass upgrade than a rolling upgrade, even if no risks to delivery guarantees were present.

The officially-recommended upgrade process to

This is necessary in order to ensure that the Connect worker has read up to the end of the offsets topic, even when there are open transactions on the topic at the start of the read to the end. This will be accomplished by listing the end offsets for the offsets topic using an admin client with the read_uncommitted  isolation level, and then consuming from that topic with the read_committed  isolation level until at least those offsets listed by the admin client.

Rolling upgrade

Rolling upgrades that enable exactly-once support on a cluster will be possible. Users can then do the following for each worker, one by one: stop the worker, upgrade to a later version if necessary, set exactly.once.source.enabled to true  in the config, then restart.

This will have the effect that, until the leader is upgraded, no upgraded workers will start source tasks. This is necessary in order to guarantee that a task count record has been written to the config topic before starting a task with a transactional producer.

...

:

  1. Stop all Connect workers
  2. Modify the config file for each worker to set exactly.once.source.enabled  to true , and upgrade each worker to a later version if necessary
  3. Restart all workers for the cluster all at once (one-by-one will work as well, but runs the risk of a single worker being overloaded with all of the connectors/tasks for the entire cluster)

Downgrade

Two kinds of downgrade are possible. Exactly-once support for a cluster can be disabled by setting exactly.once.source.enabled to false for workers in the cluster (a “soft” downgrade), or workers can be reverted to use older versions of the Connect framework that does not support exactly-once sources at all (a “hard” downgrade).

...

Summary: instead of proactively fencing out task producers during rebalanceafter Connector startup, allow each task to bring up its own producer and let it fence out older instances of itself in the process.

...