You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 18 Next »

Status

Current state: Under discussion

Discussion threadhere

JIRA: Unable to render Jira issues macro, execution error. Unable to render Jira issues macro, execution error.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Note: This KIP was earlier published with the title "Atomic commit of source connector records and offsets", which corresponded to KAFKA-10000. It has now been expanded to also include general EoS support described in KAFKA-6080.

Motivation

It's been possible for sink connectors to implement exactly-once delivery for a while now, and in systems with features like unique key constraints, even relatively easy. However, given Kafka's lack of such features that allow for easy remediation and/or prevention of duplicates, the same cannot be said for source connectors, and without framework-level adaptations, exactly-once delivery for source connectors remains impossible.

There are two key reasons for this:

Source offset accuracy: The Connect framework periodically writes source task offsets to an internal Kafka topic at a configurable interval, once the source records that they correspond to have been successfully sent to Kafka. This provides at-least-once delivery guarantees for most source connectors, but allows for the possibility of duplicate writes in cases where source records are written to Kafka but the worker is interrupted before it can write the offsets for those records to Kafka.

Zombies: Some scenarios can cause multiple tasks to run and produce data for the same source partition (such as a database table or a Kafka topic partition) at the same time. This can also cause the framework to produce duplicate records.

In order to support exactly-once delivery guarantees for source connectors, the framework should be expanded to atomically write source records and their source offsets to Kafka, and to prevent zombie tasks from producing data to Kafka.

With these changes, any source connector for which each source partition is assigned to at most one task at a time, and which is capable of resuming consumption from the upstream source on startup based on the source offsets that prior tasks have provided to the framework, should be capable of exactly-once delivery.

Public Interfaces

A single worker-level configuration property will be added for distributed mode:

Name

Type

Default

Importance

Doc

exactly.once.source.enabled 

BOOLEAN 

false 

HIGH 

Whether to enable exactly-once support for source connectors in the cluster by writing source records and their offsets in a Kafka transaction, and by proactively fencing out old task generations before bringing up new ones. Note that this must be enabled on every worker in a cluster in order for exactly-once delivery to be guaranteed.

And a single per-connector configuration property will be added:

Name

Type

Default

Importance

Doc

offsets.storage.topic 

STRING 

null 

LOW 

The name of a separate offsets topic to use for this connector. If empty or not specified, the worker’s global offsets topic name will be used.

Proposed Changes

Atomic offset writes

Offset reads

When exactly.once.source.enabled is set to true for a worker, the consumers used by that worker to read source offsets from offsets topics will be configured with a new default property, isolation.level=read_committed. This will cause them to ignore any records that are part of a non-committed transaction, but still consume records that are not part of a transaction at all.

Users will be not able to explicitly override this in the worker configuration with the property consumer.isolation.level  or in the connector configuration with the property consumer.override.isolation.level. If they attempt to do so, the user-provided value will be ignored and a message will be logged notifying them of this fact.

Offset (and record) writes

When exactly.once.source.enabled is set to true for a worker, all source tasks created by that worker will use a transactional producer to write to Kafka. All source records that they provide to the worker will be written to Kafka inside a transaction. Once it comes time for offsets to be committed for a task, offset information for the current batch will be written to the offsets topic, inside the same transaction. Then, the transaction will be committed. This will ensure that source offsets will be committed to Kafka if and only if the source records for that batch were also written to Kafka.

The timing for offset commits will remain the same as it is today; they will be triggered periodically on a fixed interval that users can adjust via the offset.flush.interval.ms property. Transaction boundaries and record batches will be defined by these offset commits; expanded support for transaction boundary definition can be added later and is noted in the "future work" section.

Once an offset commit is complete, if the connector is configured with a custom offsets topic, the committed offsets will also be written to the worker’s global offsets topic using a non-transactional producer and the worker’s principal. This will be handled on a separate thread from the task’s work and offset commit threads, and should not block or interfere with the task at all. If the worker fails to write these offsets for some reason, it will retry indefinitely, but not fail the task. This will be done in order to facilitate "hard" downgrades and cases where users switch from per-connector offsets topics back to the global offsets topic.

Task producers will be given a transactional ID of ${groupId}-${connector}-${taskId}, where ${groupId} is the group ID of the Connect cluster, ${connector} is the name of the connector, and ${taskId} is the ID of the task (starting from zero). Users will not be able to override this with the worker-level producer.transactional.id  or connector-level producer.override.transactional.id  property. If they attempt to do so, the user-provided value will be ignored and a message will be logged notifying them of this fact.

SourceTask record commit API

The SourceTask::commit and SourceTask::commitRecord methods will be invoked immediately after each successful offset commit. First, commitRecord will be invoked for every record in the committed batch, followed by a call to commit. It is possible that the worker or task may suddenly die in between a successful offset commit and the completion of these calls.

Because of this, it cannot be guaranteed that these methods will be invoked after every successful offset commit. However, as long as a task is able to accurately recover based solely off of the offset information provided to the Connect framework with each record it has produced, this should not compromise exactly-once delivery guarantees. These methods can and should still be used to clean up and relinquish resources after record delivery is guaranteed, but using them to track offsets will prevent a connector from being viable for exactly-once support.

Per-connector offsets topics

Whether or not exactly.once.source.enabled is set to true for a worker, source connectors will be allowed to use custom offsets topics, configurable via the offsets.storage.topic property.

Motivation

First, with the advent of KIP-458, it is now possible to bring up a single Connect cluster whose connectors each target a different Kafka cluster (via the producer.override.bootstrap.servers, consumer.override.bootstrap.servers, and/or admin.override.bootstrap.servers connector properties). At the moment, source task offsets are still tracked in a single global offsets topic for the entire Connect cluster; however, if the producer for a task is used to write source offsets, and that producer targets a different Kafka cluster than the one the Connect worker uses for its internal topics, that will no longer be possible. In order to support this case well, per-connector offsets topics must be reasoned about, to allow for smooth upgrades and downgrades where large gaps in offsets data are not created unnecessarily (see “Migration” below). Although users do not necessarily need to be given control over the name of the offsets topic for each connector in this case (the worker could simply create an offsets topic with the same name on the connector’s overridden Kafka cluster, for example), exposing this level of control adds very little complexity to this design once the necessity for per-connector offsets topics is realized. As a benefit, it should make it easier for users to configure connectors in secured environments where the connector principal may have limited access on the Kafka cluster it targets. For users and developers familiar with Kafka Streams, this is the major differentiating factor that makes a global offsets topic impossible.

Second, it closes a potential security loophole where malicious connectors can corrupt the offsets information available to other connectors on the cluster. This is preventable at the moment by configuring connectors with separate principals from that of the worker, and only granting write access to the offsets topic to the worker principal. However, since the same producer will now be used to both write offset data and write source task records, that approach will no longer work. Instead, allowing connectors to use their own offsets topics should allow administrators to maintain the security of their cluster, especially in multitenant environments.

Finally, it allows users to limit the effect that hanging transactions on an offsets topic will have. If tasks A and B use the same offsets topic, and task A initiates a transaction on that offsets topic right before task B starts up, then task A dies suddenly without committing its transaction, task B will have to wait for that transaction to time out before it can read to the end of the offsets topic. If the transaction timeout is set very high for task A (to accommodate bursts of high throughput, for example), this will block task B from processing any data for a long time. Although this scenario may be unavoidable in some cases, using a dedicated offsets topic for each connector should allow cluster administrators to isolate the blast radius of a hanging transaction on an offsets topic. This way, although tasks of the same connector may still interfere with each other, they will at least not interfere with tasks of other connectors. This should be sufficient for most multitenant environments.

Finally

Migration

If a connector is configured to use a separate offsets topic but no data is present in that topic yet, the worker that is assigned the Connector instance for that connector can automatically migrate any offsets it finds for that connector in the global offsets topic into the connector’s separate offsets topic. The process for this will be:

  1. Worker is assigned Connector
  2. Worker checks connector configuration and observes that it should use a separate offsets topic
  3. Worker attempts to create it using the connector’s principal (no check for topic existence is made; if the topic already exists, the resulting error response from the broker is silently discarded in the same way that it is when a worker tries to create one of its internal topics today)
  4. Worker reads to the end of the custom offsets topic, failing the Connector if the offsets topic doesn’t exist by this point
  5. If no data is present in the custom offsets topic:
    1. Worker refreshes its view of the global offsets topic by reading to the end
    2. Worker begins a producer transaction on the custom offsets topic
    3. Worker copies all offset information for the connector in its latest view of the global offsets topic into the custom offsets topic
    4. If no offset information is available in the global offsets topic for the connector, worker writes a sentinel value into the offsets topic indicating that a migration has taken place
    5. Worker commits the producer transaction
    6. Worker reads to the end of the custom offsets topic (again)
  6. Worker starts the Connector

If a source connector is configured to use a separate offsets topic, the worker will block task startup for as long as necessary (unless interrupted due to task shutdown) until the topic is created. Once the topic exists (after being created by the worker that owns the Connector, if necessary), the worker will read to the end of the offsets topic before starting the task. If there are no offsets for the connector present in the topic and there is no sentinel value in the offsets topic indicating that a migration attempt was made and found no data for the task, the worker will continue to read to the end of the topic until either offset information for the connector or a sentinel value is found. Once this step is completed, the worker will be free to start the task.

An example of a sentinel offsets record for a connector named “reddit-source” would look like:

    key: [“reddit-source”]
value: {“migrated”: true}

Workers are already very permissive about the content of their offsets topics and will not break or even log a warning message if they encounter anything that doesn’t appear to be a source offset. For the sake of cleanliness, we choose a key format that closely mirrors the existing key format for source offsets, which consists of an array containing the name of the connector and the source partition. The value only needs to be non-null so that record of a migration can be kept even after topic compaction has occurred; for readability’s sake (if the topic ever needs to be inspected for debugging, for example), we make it more explicit that this record refers to offset migration.

Admin API to Fence out Transactional Producers

Java API additions

The Admin interface will be expanded to include new methods for fencing out producers by transactional ID:

Admin.java
public interface Admin {

    /**
     * Fence out all active producers that use any of the provided transactional IDs.
     *
     * @param transactionalIds The IDs of the producers to fence.
     * @param options  		   The options to use when fencing the producers.
     * @return The FenceProducersResult.
     */
    FenceProducersResult fenceProducers(Collection<String> transactionalIds,
                                        FenceProducersOptions options);

    /**
     * Fence out all active producers that use any of the provided transactional IDs, with the default options.
     * <p>
     * This is a convenience method for {@link #fenceProducers(Collection, FenceProducersOptions)}
     * with default options. See the overload for more details.
     *
     * @param transactionalIds The IDs of the producers to fence.
     * @return The FenceProducersResult.
     */
    default FenceProducersResult fenceProducers(Collection<String> transactionalIds) {
        return fenceProducers(groupIds, new FenceProducersOptions());
    }

}

New classes for the API will be added as well.

The FenceProducersResult  class:

FenceProducersResult.java
package org.apache.kafka.clients.admin;

/**
 * The result of the {@link Admin#fenceProducers(Collection)} call.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class FenceProducersResult {

    /**
     * Return a map from transactional ID to futures which can be used to check the status of
     * individual fencings.
     */
    public Map<String, KafkaFuture<Void>> fencedProducers() {
		// Implementation goes here
    }

	/**
	 * Returns a future that provides the producer ID generated while initializing the given transaction when the request completes.
	 */
	public KafkaFuture<Long> producerId(String transactionalId) {
		// Implementation goes here
	}

	/**
 	 * Returns a future that provides the epoch ID generated while initializing the given transaction when the request completes.
	 */
	public KafkaFuture<Short> epochId(String transactionalId) {
		// Implementation goes here
	}

    /**
     * Return a future which succeeds only if all the producer fencings succeed.
     */
    public KafkaFuture<Void> all() {
		// Implementation goes here
    }

}

And the FenceProducersOptions  class:

FenceProducersOptions.java
package org.apache.kafka.clients.admin;

/**
 * Options for {@link AdminClient#fenceProducers(Collection, FenceProducersOptions)}
 *
 * The API of this class is evolving. See {@link AdminClient} for details.
 */
@InterfaceStability.Evolving
public class FenceProducersOptions extends AbstractOptions<FenceProducersOptions> {
    // No options (yet) besides those inherited from AbstractOptions
}

Implementation

Under the hood, this will be implemented by finding the transaction coordinator for each transactional ID via the FindCoordinator protocol, then initializing a new transaction with that ID by invoking the InitProducerId protocol with no producer ID and no producer epoch. In the words of KIP-98, this:

  1. Bumps up the epoch of the PID, so that the any previous zombie instance of the producer is fenced off and cannot move forward with its transaction.
  2. Recovers (rolls forward or rolls back) any transaction left incomplete by the previous instance of the producer.

Zombie Fencing

With exactly.once.source.enabled set to true for a worker, the worker will begin taking steps to ensure that zombie source tasks in the cluster do not compromise exactly-once delivery support.

Task count records

A new type of record, a “task count record”, will be used in the config topic. This record explicitly tracks the number of task producers that will have to be fenced out if a connector is reconfigured before bringing up any tasks with the new set of task configurations, and will implicitly track whether it is necessary to perform a round of fencing before starting tasks for a connector.

An example of this record for a connector named “reddit-source” with 11 tasks would look like:

    key: “tasks-count-reddit-source”
value: {“tasks”: 11}

If the latest task count record for a connector comes after its latest set of task configurations in the config topic, workers will be allowed to create and run tasks for that connector. For example, if the contents of the config topic (represented by the keys of each record) are as follows:

    offset 0: task-reddit-source-0
offset 1: task-reddit-source-1
offset 2: commit-reddit-source
offset 3: tasks-count-reddit-source

then workers will be able to freely bring up tasks for the reddit-source connector.

However, if the contents of the config topic are this:

    offset 0: tasks-count-reddit-source
offset 1: task-reddit-source-0
offset 2: task-reddit-source-1
offset 3: commit-reddit-source

then workers will not bring up tasks for the connector.

Fencing during rebalance

All workers

When a rebalance is triggered, before (re-)joining the cluster group, all workers will preemptively stop all tasks of all source connectors for which task configurations are present in the config topic after the latest task count record. This step is not necessary to preserve exactly-once delivery guarantees, but should provide a reasonable opportunity for tasks to shut down gracefully before being forcibly fenced out.

Leader

During rebalance, the leader will take on a few extra responsibilities. Once all group members have joined and it is time to perform assignment the leader will:

  1. For each to-be-assigned source connector, if there is a set of task configurations present in the config topic after the most recent task count record for that connector:
    1. If there is an existing task count record for the connector in the config topic, and either the task count in that record is above 1 or the new number of tasks for the connector is above 1*:
      1. Fence out all producers that may still be active for prior task instances of this connector by instantiating an admin client using the connector principal and invoking Admin::fenceProducers. The transactional IDs will be the IDs that each task of the connector would have used, assuming as many tasks were active as the most recent task count record for the connector.** 
    2. Write the new task count record for all of the fenced-out connectors to the config topic
  2. Read to the end of the config topic to verify that all task count records have been written successfully, and use the new latest offset of the config topic as the offset in the assignment given out to members of the cluster.

* - The check for task counts is done to avoid unnecessary fencing work for permanently single-task connectors, such as Debezium's CDC source connectors. If the most recent task count record for a connector shows one task, there is only one task that needs to be fenced out. And, if the new configuration for that connector contains one task, that new task will automatically fence out its single predecessor as they will use the same transactional ID. 

**  - This step will be done in parallel for all connectors

Source task startup

When a worker is assigned a source task, it will check to see if a task count record is present in the config topic after the most recent task configs for that connector.

If that is the case, the worker can assume that all producers for all prior generations of tasks for that connector have been fenced out, and it will bring up the source tasks it has been assigned. 

Otherwise, the worker will emit a warning log message and refuse to bring the task up until a rebalance occurs and it has a chance to check again for a task count record after the most recent set of task configurations for the connector.

Once a worker has instantiated a producer for a source task, it will read to the end of the config topic once more, and if a new set of task configurations for that connector has been generated, it will abort startup of the task.

To summarize, the startup process for a task will be:

  1. Worker is assigned a source task during rebalance
  2. If there is no task count record present in the config topic for that connector, or there is a set of task configurations present after the latest task count record, refuse to bring up the task and await a subsequent rebalance
  3. Otherwise:
    1. Instantiate a transactional producer for the task
    2. Read to the end of the config topic
    3. If a new set of task configurations has since been generated for the connector, abort startup
    4. Otherwise, begin polling the task for data

Leader access to config topic

After a rebalance, a worker may discover that it has become the leader of the cluster. When this happens, the worker will instantiate a transactional producer whose transactional ID is connect-cluster-${groupId} , where ${groupId } is the group ID of the cluster. If users try to override this by setting the transactional.id worker property, the user-provided value will be ignored and a message will be logged notifying them of this fact. The worker will use this producer for all writes it performs on the config topic. It will begin and commit a transaction for every record it writes. This may seem unusual--why use a transactional producer if transactions aren’t really necessary?--but it ensures that only the most recent leader is capable of producing to the config topic and that zombie leaders (rare though they may be) will not be able to.

If, for example, a leader stalls while attempting to fence out a prior generation of task producers for a given connector during a rebalance, it may fall out of the group and become a zombie. If another worker is elected leader at this point and the connector is then reconfigured, it’s possible that the zombie leader may become unblocked and then attempt to write a task count record to the config topic after a new set of task configurations are written to the topic. This would corrupt the state of the config topic and violate a key assumption: a task count record present after task configurations for a connector means that it is not necessary to fence out a prior generation of producers for that connector’s tasks before starting them.

By using a transactional producer on the leader, we can guarantee that a leader will only be able to write a task count record to the config topic if no other workers have become the leader and written new task configurations for the connector to the topic since then.

Addressed failure/degradation scenarios

These scenarios will not compromise a cluster’s exactly-once delivery guarantees.

Leader cannot write task count record to config topic during rebalance

The leader will try indefinitely to write the task count to the config topic during rebalance and verify that it has been written by reading it back from the topic after writing.

If this takes too long, the rebalance timeout will elapse and a new leader will be elected. If the (now-dethroned) leader becomes unblocked during this process and attempts to write to the config topic, it will have been fenced out by its replacement. If it has been blocked on reading from the config topic, it will wake up and attempt to complete the rebalance, only to be informed by the group coordinator that it is no longer a member of the group. It will then attempt to rejoin the group.

If this does not block but instead fails for some reason, the leader will immediately leave the group and then attempt to rejoin.

Cannot fence out producers during rebalance

The leader may be unable to fence out producers during a rebalance if, for example, it is unable to reach or locate a transaction coordinator.

Automatically failing tasks for that connector may seem appealing at first, but the user’s first instincts in this situation are likely to be to restart those tasks, which could cause them to come up without zombies from a prior generation being fenced out first.

Marking the entire rebalance as failed and forcing a follow-up round would be simpler to design and less likely to lead to compromised delivery guarantees, but it would also impact any other connectors and tasks that were due to be reassigned in that round.

To compromise, the leader will limit the time it blocks for producer fencing to complete to be at most half of the configured session timeout for the cluster. If that time elapses and the leader has not yet fenced out the producers for some tasks, it will continue with the rebalance and happily assign all connectors and tasks as normal. No worker will start up new task instances for not-yet-fenced connectors as there will be no task count record present in the config topic after the most recent set of task configurations. Once the leader is able to successfully complete fencing out all producers for a given connector, it will write the task count record to the config topic, read it back, and force a rebalance by rejoining the group. At this point, all workers will be able to (re-)start tasks for the newly-fenced-out connector.

Manual restart of fenced-out tasks

If a task fails because its producer has been fenced out, the user may try to manually restart that task through the REST API. If the worker’s view of the config topic is up-to-date, this will simply result in the restarted task replacing any existing instances of the same task, and exactly-once delivery guarantees will be preserved. However, if the worker has an outdated view of the config topic, restarting that task may compromise exactly-once guarantees if that task ends up producing data from a source partition that has already been assigned to a different task in the latest set of task configurations for the connector.

In order to prevent manual user restarts from compromising exactly-once delivery guarantees, Connect workers will read to the end of the config topic before proceeding with any source task restarts. If the worker is unable to read to the end of the config topic, the restart request will be met with a 500 timeout response. If the worker discovers during its read to the end of the topic that the task no longer exists (because the connector has been deleted or its task count reduced), it will return the usual 404 not found response. If a rebalance becomes expected because of new information read from the config topic, a 409 conflict response will be given.

Worker lag after being assigned source task but before instantiating producer

If a worker is assigned a source task and sees a task count record in the config topic after the latest set of task configurations for that connector, it comes with the assumption that all producers for all prior task generations of that connector have been fenced out. And, at the time of the fencing performed by the leader, this assumption likely holds. However, it's possible that a worker may be assigned a source task, observe a task count record in the config topic that indicates that it is safe to start the task, then block for some large period of time before it is able to construct a producer for that task. For example, consider a scenario with workers F (follower), O (other follower), and L (leader), operating on task T of source connector C:

  1. Connector C is reconfigured and new task configurations for it are written to the config topic
  2. A rebalance is triggered and worker L is able to fence out all possibly-active producers for connector C, write a task count record for C to the config topic, and then assign task T to worker F
  3. Worker F receives its assignment and is able to read the most-recent task count record for connector C from the config topic
  4. Worker F blocks, before it is able to instantiate a transactional producer for task T
  5. Connector C is reconfigured and new task configurations for it are written to the config topic
  6. A rebalance is triggered and worker L is able to fence out all possibly-active producers for connector C, write a task count record for C to the config topic, and then assign task T to worker O as F has fallen out of the group
  7. Worker O receives its assignment and is able to read the most-recent task count record for connector C from the config topic, instantiate a transactional producer for task T, and then begin processing data from that task
  8. Worker F becomes unblocked, instantiates a transactional producer for task T, and then begins processing data for that task

In this scenario, if a source partition initially assigned to task T is reassigned to a different task during the reconfiguration in step 5, the task instance created by worker F in step 8 will begin producing duplicate data and exactly-once delivery guarantees will be compromised.

This is possible because it is not guaranteed at the time of the fencing performed by the leader that all producers that could be active for a connector have been created yet. If a producer has not yet been created, it cannot be fenced out; if it is created after the fencing occurs, a different approach must be taken to ensure that no duplicate data is produced.

This case should be covered by the additional read to the end of the config topic by the worker after it has brought up a transactional producer for a source task. If the connector is reconfigured after the worker became blocked and a round of producer fencing has occurred, then the worker will bring up a transactional producer for its source task, but then discover the connector reconfiguration and abort startup. If the worker has brought up a transactional producer and then become blocked before completing its read to the end of the config topic, the round of producer fencing by the leader should fence out that producer and any subsequent restart attempts will block until/unless the worker is able to complete a read to the end of the config topic (and handle any rebalances necessitated by new information read from the topic in the process).

Accidental task commit of dropped messages

If a transactional producer is successfully instantiated, it will not attempt to contact the broker again until an attempt is made to actually send a record. This can lead to interesting behavior: the producer is brought up and able to make initial contact with the transaction coordinator, then the entire Kafka cluster dies, then the producer begins and commits a transaction without attempting to send any records, and no exception or other indication of failure is generated by the producer at any point in this sequence of events.

When might this occur? If a task is configured with an aggressive SMT that drops all the records in a given batch, its producer will never attempt to send any records before committing the current transaction. And, once this occurs, the worker will invoke SourceTask::commit, which may cause the task to drop the data from the upstream system (for example, by acknowledging a batch of messages from a JMS broker). Even if this occurs, it should not be a problem: it’s fine to drop messages from the upstream system that are meant to be dropped by the connector, regardless of whether source offsets for those records have been committed to Kafka, as the end result is the same either way.

Permitted failure scenarios

These scenarios can compromise a cluster’s exactly-once delivery guarantees.

Heterogeneous clusters

If a worker is active and does not have support for exactly-once delivery, the entire cluster’s ability to provide exactly-once guarantees will be compromised. There is no way to fence out non-compliant workers. Even if one is developed, the problem would only be slightly transformed: if a worker is active that cannot be fenced out by other workers in the cluster, we’d be in the exact same place as before.

This failure scenario will need to be called out in public documentation so that users understand that they are responsible for the operational burden of ensuring that no non-compliant workers are active.

Non-worker producer to config topic

If anything besides a worker ends up writing to the config topic, all assumptions that the cluster relies upon to determine whether it is safe to start tasks and whether a fencing is necessary during rebalance can be invalidated. This should never happen in a reasonably-secured Connect cluster and if it does, compromised delivery guarantees may pale in comparison to the other consequences of corrupting the config topic.

Compatibility, Deprecation, and Migration Plan

Interoperability with rebalancing protocols

Exactly-once support will be possible with both the eager and incremental cooperative rebalancing protocols. The additional design complexity to support both is minimal, and given that both protocols are still supported for the Connect framework, it’s likely that some users will still be using eager rebalancing. They should not have to upgrade to the newer rebalancing protocol in order to gain access to this feature if the work to support both protocols is not prohibitive.

Worker principal permissions

Producer

Before enabling exactly-once source support on a worker, its producer principal must be given the following permissions on the Kafka cluster it writes to:

Operation

Resource Type

Resource Name

Write

TransactionalId

connect-cluster-${groupId} , where ${groupId } is the group ID of the cluster

IdempotentWrite

Cluster

Kafka cluster targeted by the Connect cluster

Connector principal permissions

Producer

Each source connector’s producer principal must be given the following permissions on the Kafka cluster it writes to:

Operation

Resource Type

Resource Name

Write

TransactionalId

${groupId}-${connector}-${taskId}, for each task that the connector will create, where ${groupId} is the group ID of the Connect cluster, ${connector} is the name of the connector, and ${taskId} is the ID of the task (starting from zero). A wildcarded prefix of ${groupId}-${connector}* can be used for convenience if there is no risk of conflict with other transactional IDs or if conflicts are acceptable to the user.

Write

TransactionalId

${groupId}-${connector}. Only necessary if the connector uses a separate offsets topic.

IdempotentWrite

Cluster

Kafka cluster targeted by the Connect cluster

Write

Topic

Offsets topic used by the connector, which is either the value of the offsets.storage.topic property in the connector’s configuration if provided, or the value of the offsets.storage.topic property in the worker’s configuration if not.

Consumer

Each source connector’s consumer principal must be given the following permissions on the Kafka cluster it reads offsets from:

Operation

Resource Type

Resource Name

Read

Topic

Offsets topic used by the connector, which is either the value of the offsets.storage.topic property in the connector’s configuration if provided, or the value of the offsets.storage.topic property in the worker’s configuration if not.

Admin

If the offsets topic for a connector does not exist yet, the connector’s admin principal must be given the following permissions on the Kafka cluster it reads offsets from:

Operation

Resource Type

Resource Name

WriteTransactionalId${groupId}-${connector}-${taskId}, for each task that the connector will create, where ${groupId} is the group ID of the Connect cluster, ${connector} is the name of the connector, and ${taskId} is the ID of the task (starting from zero). A wildcarded prefix of ${groupId}-${connector}* can be used for convenience if there is no risk of conflict with other transactional IDs or if conflicts are acceptable to the user.

Create

Topic

Offsets topic used by the connector, which is the value of the offsets.storage.topic property in the connector’s configuration. If not provided, this ACL is not needed, as the worker’s shared offsets topic should be created automatically before any connectors are started.

Additionally, the connector's admin principal must be given the following permissions on the Kafka cluster it reads offsets from no matter what:

OperationResource TypeResource Name
DescribeTopicOffsets topic used by the connector, which is the value of the offsets.storage.topic property in the connector’s configuration. If not provided, this ACL is not needed, as the worker’s shared offsets topic should be created automatically before any connectors are started.

This is necessary in order to ensure that the Connect worker has read up to the end of the offsets topic, even when there are open transactions on the topic at the start of the read to the end. This will be accomplished by listing the end offsets for the offsets topic using an admin client with the read_uncommitted  isolation level, and then consuming from that topic with the read_committed  isolation level until at least those offsets listed by the admin client.

Rolling upgrade

Rolling upgrades that enable exactly-once support on a cluster will be possible. Users can stop each worker, upgrade to a later version if necessary, set exactly.once.source.enabled to true in the config, then restart, one by one.

This will have the effect that, until the leader is upgraded, no upgraded workers will start source tasks. This is necessary in order to guarantee that a task count record has been written to the config topic before starting a task with a transactional producer.

Additionally, no tasks for connectors configured to use separate offsets topics will start until that topic exists and the tasks can confirm that a migration has taken place (either implicitly, via the presence of offsets for that connector, or explicitly, via a sentinel migration record for the connector).

Downgrade

Two kinds of downgrade are possible. Exactly-once support for a cluster can be disabled by setting exactly.once.source.enabled to false for workers in the cluster (a “soft” downgrade), or workers can be reverted to use older versions of the Connect framework that does not support exactly-once sources at all (a “hard” downgrade).

Soft downgrade

Soft downgrades should be possible via a cluster roll, where each worker is shut down, its configuration is altered to disable exactly-once source support, and then it is restarted. During the process, some workers that are still configured with exactly-once source support enabled may refuse to start tasks for connectors if a task count record is not found for those connectors after the most recent set of task configurations. This isn’t especially likely given that the most common cause of this would be connector reconfiguration during downgrade, but even if it occurs, it will only affect workers that have yet to be downgraded. There should be no long-lasting impact on the cluster once all workers have been downgraded.

Hard downgrade

Hard downgrades can be performed in the same way as soft downgrades. Because source task offsets on upgraded workers are still written to the worker’s global offsets topic, even if a downgraded worker does not support per-connector offsets topics, it will still pick up on relatively-recent source offsets for its connectors. Some of these offsets may be out-of-date or not match the ones in the connector’s separate offsets topic, but the only consequence of this would be duplicate writes by the connector, which will be possible on any cluster without exactly-once support enabled. Without any writes to the global offsets topic, all records processed by a connector since a switch to a dedicated offsets topic would be re-processed after the downgrade and would likely result in a flood of duplicates. While technically permissible given that the user in this case will have knowingly switched to a version of the Connect framework that doesn't support exactly-once source connectors (and is therefore susceptible to duplicate delivery of records), the user experience in this case could be quite bad, so a little extra effort on the part of Connect to significantly reduce the fallout of downgrades in this case is warranted.

If a re-upgrade is desirable at this point, any separate per-connector offsets topics may need to be deleted beforehand. Otherwise, on startup, the worker will read from the existing separate offsets topic, see that there is offset data for the connector present in it, and not perform a migration, causing all data that was produced by the connector after the downgrade to be duplicated.

Additional configuration properties

The two user-facing configuration properties introduced here should not violate backwards compatibility, as they both come with default values that preserve existing behavior.

New Admin API

The new Admin  methods for fencing producers will be implemented using existing protocols (specifically, FindCoordinator and InitProducerId), so no changes to the Kafka binary protocol will be required. As a result, these new methods should work on any broker that supports those protocols.

Rejected Alternatives

Connector-defined transaction boundaries

Summary: allow connectors to dictate when a transaction should be started, committed, rolled back, etc.

Rejected because: out of scope; can be pursued as an additional feature later on.

Per-connector exactly-once property

Summary: either instead of or in addition to allowing exactly-once support to be configured on a worker-wide basis, allow it to be enabled on a per-connector basis.

Rejected because: even if well-behaved workers close the producers for tasks that lag on shutdown, zombie workers may still run tasks that the cluster has since reassigned to other workers. In this case, while the worker remains a zombie, manual intervention is required from the cluster administrator to shut down the tasks hosted on the zombie worker, either by shutting down the worker (possible today) or by explicitly revoking tasks from the worker (not possible today, but another option that would work). Requiring timely manual intervention on the part of the user in order to preserve delivery guarantees should be avoided if possible.

If a non-exactly-once source task were to become a zombie because it, its converter, one of its transforms, etc. became blocked, or because its worker became a zombie, then its connector were reconfigured to enable exactly-once support, the zombie task might eventually cause duplicate writes to occur. To make this scenario less likely, exactly-once support is provided in an all-or-nothing fashion that makes it impossible for non-transactional producers to stay lying around in a cluster after all of the workers have been upgraded and configured to provide exactly-once source support.

Deduplication of connector-provided records

Summary: track messages produced by source connectors and perform deduplication automatically.

Rejected because: this requires every message to be uniquely and deterministically identifiable, which is not possible in all cases (although this is likely possible in all cases where connectors properly utilize the offsets API provided by the Connect framework). When it is possible, deduplication can already be implemented by a downstream application such as Kafka Streams. Finally, it’s unclear how all prior IDs for records produced by a connector might be tracked and used efficiently by a Connect cluster without incurring a noticeable performance penalty.

Per-source partition producers

Summary: instead of using one producer for each task, use a producer for each source partition of the task.

Rejected because: although this would likely guarantee exactly-once delivery for the same connectors that the proposed design would (specifically, connectors that allocate each source partition to at most one task at a time), it would come with a moderate drawback. The number of producers created for a connector could grow quite large, at which point, allocating enough buffer memory, broker connections, and other resources for each of these may have adverse impacts on the performance of the worker. This would also go against the pattern established by the Connect framework that, to scale a connector, its task count should be increased, as it would make the number of producers set up for a connector the same no matter if it’s running a single task or hundreds. Finally, it would increase the operational burden on users in security-conscious environments, as the transactional IDs required to run a connector would be difficult to predict and could dynamically expand at runtime depending on the implementation of the connectors being run.

Cleanup of task count records on connector deletion

Summary: if a connector is deleted, perform a final round of producer fencing for its tasks before writing a tombstone task count record for the connector to the config topic.

Rejected because: in the worst case, the connector is never recreated and a worker (presumably the leader) will have expended unnecessary effort on fencing out producers that aren’t going to compromise delivery guarantees and won’t have any bigger consequences than such tasks would already have on a Connect cluster today. In the best case, the connector is recreated immediately, in which case, a fencing would be performed anyways if no task count tombstone record were written to the config topic. Ultimately, no work is saved by doing this fencing on connector deletion, and some work may be unnecessary.

Future Work

Finer control over offset commits

The framework periodically commits source offsets at a fixed interval. Since source offset commits are now tied to the commits of the transactions in which source records are sent, Connect users and connector developers may want to trigger offset commits differently. Options for this include allowing source tasks to define their own transaction boundaries, allowing per-connector source offset intervals, or allowing offsets to be committed after a certain number of records.

Standalone mode support

Since the design for atomic writes of source records and their offsets relies on source offsets being stored in a Kafka topic, standalone mode is not eligible. If there is sufficient demand, we may add this capability to standalone mode in the future.

  • No labels