Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Name

Type

Default

Importance

Doc

exactly.once.source.enabled 

BOOLEAN 

false 

HIGH 

Whether to enable exactly-once support for source connectors in the cluster by writing source records and their offsets in a Kafka transaction, and by proactively fencing out old task generations before bringing up new ones. Note that this must be enabled on every worker in a cluster in order for exactly-once delivery to be guaranteed, and that some source connectors may still not be able to provide exactly-once delivery guarantees even with this support enabled.

And a single per-connector configuration property will be added:

Name

Type

Default

Importance

Doc

offsets.storage.topic 

STRING 

null 

LOW 

The name of a separate offsets topic to use for this connector. If empty or not specified, the worker’s global offsets topic name will be used. If specified, the offsets topic will be created if it does not already exist on the Kafka cluster targeted by this connector (which may be different from the one used for the worker's global offsets topic if the bootstrap.servers property of the connector's producer has been overridden from the worker's).

Proposed Changes

Atomic offset writes

...

Only the name of the per-connector offsets topic will be configurable by users. Other properties, such as the number of partitions and the replication factor, will be derived from the worker config using the behavior described in KIP-605.

Migration

If a connector is configured to use a separate offsets topic but no data is present in that topic yet, the worker that is assigned the Connector instance for that connector can automatically migrate any offsets it finds for that connector in the global offsets topic into the connector’s separate offsets topic. The process for this will be:

  1. Worker is assigned Connector
  2. Worker checks connector configuration and observes that it should use a separate offsets topic
  3. Worker attempts to create it using the connector’s principal (no check for topic existence is made; if the topic already exists, the resulting error response from the broker is silently discarded in the same way that it is when a worker tries to create one of its internal topics today)
  4. Worker reads to the end of the custom offsets topic, failing the Connector if the offsets topic doesn’t exist by this point
  5. If no data is present in the custom offsets topic:
    1. Worker refreshes its view of the global offsets topic by reading to the end
    2. Worker begins a producer transaction on the custom offsets topic
    3. Worker copies all offset information for the connector in its latest view of the global offsets topic into the custom offsets topic
    4. If no offset information is available in the global offsets topic for the connector, worker writes a sentinel value into the offsets topic indicating that a migration has taken place
    5. Worker commits the producer transaction
    6. Worker reads to the end of the custom offsets topic (again)
  6. Worker starts the Connector

If a source connector is configured to use a separate offsets topic, the worker will block task startup for as long as necessary (unless interrupted due to task shutdown) until the topic is created. Once the topic exists (after being created by the worker that owns the Connector, if necessary), the worker will read to the end of the offsets topic before starting the task. If there are no offsets for the connector present in the topic and there is no sentinel value in the offsets topic indicating that a migration attempt was made and found no data for the task, the worker will continue to read to the end of the topic until either offset information for the connector or a sentinel value is found. Once this step is completed, the worker will be free to start the task. During this window, the task will either not be present in the REST API's status endpoints (if no prior instances of it had been created), or its state will be UNASSIGNED  (if prior instances had been created and then shut down).

An example of a sentinel offsets record for a connector named “reddit-source” would look like:

    key: [“reddit-source”]
value: {“migrated”: true}

Workers are already very permissive about the content of their offsets topics and will not break or even log a warning message if they encounter anything that doesn’t appear to be a source offset. For the sake of cleanliness, we choose a key format that closely mirrors the existing key format for source offsets, which consists of an array containing the name of the connector and the source partition. The value only needs to be non-null so that record of a migration can be kept even after topic compaction has occurred; for readability’s sake (if the topic ever needs to be inspected for debugging, for example), we make it more explicit that this record refers to offset migration.

Zombie Fencing

...

Hosting Kafka cluster

Regardless of whether exactly.once.source.enabled  is set to true  for the worker, if a connector configuration contains a value for the offsets.storage.topic  property, it will use an offsets topic with that name on the Kafka cluster that it produces data to (which may be different from the one that hosts the worker's global offsets topic).

Implicit usage

A per-connector offsets topic might be implicitly configured under certain circumstances. Specifically, this will occur for connectors whose configurations do not contain the offsets.storage.topic  property, but do contain an overridden bootstrap.servers  value that causes the connector to target a different Kafka cluster than the one that hosts the worker's global offsets topic.

Connectors like this will be said to be "implicitly configured" to use a separate offsets topic.

Creation

If a connector is explicitly or implicitly configured to use a separate offsets topic but that topic does not exist yet, task and connector instances will automatically try to create the topic before startup.

Smooth migration

When a separate offsets topic is created for the first time, it will naturally be empty. In order to avoid losing offset information that may be stored in the global offsets topic, when a connector or task instance requests offsets from the worker, it will be given a combined view of the offset information present in both its separate offsets topic and the worker's global offsets topic. Precedence will be given to offset information present in the separate offsets topic.

For example, if the offsets stored in the global offsets topic for a connector are:

Code Block
languagejs
titleOffsets present in worker's global offsets topic
{
  "partition": {
    "subreddit": "apachekafka"
  },
  "offset": {
    "timestamp": "4761"
  }
}
{
  "partition": {
    "subreddit": "CatsStandingUp"
  },
  "offset": {
    "timestamp": "2112"
  }
}

And the offsets in the connector's separate offsets topic are:

Code Block
languagejs
titleOffsets present in separate offsets topic
{
  "partition": {
    "subreddit": "CatsStandingUp"
  },
  "offset": {
    "timestamp": "2169"
  }
}
{
  "partition": {
    "subreddit": "grilledcheese"
  },
  "offset": {
    "timestamp": "489"
  }
}

The offsets passed to the connector by the worker will be:

Code Block
languagejs
titleOffsets presented to the task
{
  "partition": {
    "subreddit": "apachekafka"
  },
  "offset": {
    "timestamp": "4761" // No offset for this partition was present in the separate offsets topic, so the one in the global offsets topic is used instead
  }
}
{
  "partition": {
    "subreddit": "CatsStandingUp"
  },
  "offset": {
    "timestamp": "2169" // Preference is given to the offset for this partition that comes from the separate offsets topic
  }
}
{
  "partition": {
    "subreddit": "grilledcheese"
  },
  "offset": {
    "timestamp": "489"
  }
}

Zombie Fencing

Task count records

A new type of record, a “task count record”, will be used in the config topic. This record explicitly tracks the number of task producers that will have to be fenced out if a connector is reconfigured before bringing up any tasks with the new set of task configurations, and will implicitly track whether it is necessary to perform a round of fencing before starting tasks for a connector.

...

If the latest task count record for a connector comes after its latest set of task configurations in the config topic, it will be safe for workers will to create and run tasks for that connector. For example, if the contents of the config topic (represented by the keys of each record) are as follows:

    offset 0: task-reddit-source-0
offset 1: task-reddit-source-1
offset 2: commit-reddit-source
offset 3: tasks-count-reddit-source

then workers will be able to freely bring up tasks for the reddit-source connector.

However, if the contents of the config topic are this(represented by the keys of each record) are as follows:

    offset 0: tasks-counttask-reddit-source-0
offset 1: task-reddit-source-01
offset 2: taskcommit-reddit-source-1
offset 3: committasks-count-reddit-source

then workers will not bring up tasks for the connector.

Preparation for rebalance

When a rebalance is triggered, before rejoining the cluster group, all workers will preemptively stop all tasks of all source connectors for which task configurations are present in the config topic after the latest task count record. This step is not necessary to preserve exactly-once delivery guarantees, but should provide a reasonable opportunity for tasks to shut down gracefully before being forcibly fenced out.

Stopping these tasks will be done in parallel (as en-masse connector/task stop/start is already done for distributed workers), and any tasks that the worker cannot stop within the graceful task shutdown timeout period (which defaults to five seconds but can be controlled via the task.shutdown.graceful.timeout.ms  worker property) will be abandoned and allowed to continue running. They will be disabled later on if/when the leader fences out their producers during a round of zombie fencing (described below).

be able to safely bring up tasks for the reddit-source connector.

However, if the contents of the config topic are this:

    offset 0: tasks-count-reddit-source
offset 1: task-reddit-source-0
offset 2: task-reddit-source-1
offset 3: commit-reddit-source

then workers will not be able to safely bring up tasks for the connector and will require a round of zombie fencing first (described below)Because this work will be done in parallel, the default timeout for task shutdown is fairly low (five seconds), and the current number of threads used to perform en-masse task stops is eight, it is highly unlikely that this will hinder a worker's ability to rejoin a group before the rebalance timeout (which defaults to sixty seconds) times out.

Zombie fencing by the leader

...

This endpoint will be secured by the session key mechanism introduced in KIP-507: Securing Internal Connect REST Endpoints and will only be used for inter-worker communication; users should not query it directly. It will be available regardless of whether exactly.once.source.enabled  is set to true in the worker.

When a worker receives a request to this endpoint, it will:

...

* - The check for task counts is done to avoid unnecessary fencing work for permanently single-task connectors, such as Debezium's CDC source connectors. If the most recent task count record for a connector shows one task, there is only one task that needs to be fenced out. And, if the new configuration for that connector contains one task, that new task will automatically fence out its single predecessor as they will use the same transactional ID.

Requesting zombie fencing from the leader

On source connector startup

When a source Connector is started (as a result of creation by the user, reconfiguration by the user, manual restart by the user, reassignment during rebalance, or restart after an eager rebalance), the worker that owns that Connector will request a round of zombie fencing for that connector from the leader.

The sequence of connector startup events will now be:

  1. Instantiate the Connector object, its ConnectorContext , its OffsetStorageReader , etc.
  2. Invoke Connector::start .
  3. Request a set of task configs from the connector using Connector::taskConfigs 
  4. If those task configs differ from the latest set of task configs in the config topic:
    1. Write those task configs to the config topic (or, if not the leader, forward those to the leader, who will write them to the config topic)
  5. Otherwise (new step):
    1. Request a new round of zombie fencing for the connector from the leader

With this startup sequence, if a round of zombie fencing should fail after, e.g., connector reconfiguration, the user will be able to manually trigger a new round of zombie fencing by restarting the connector.

On new source task configs

When a new set of task configs for a source connector is read from the config topic, the worker that owns the Connector object for that connector will automatically request a round of zombie fencing for the connector after the ensuing rebalance is complete.

This will give workers that own task instances for the connector a chance to gracefully shut them down before rejoining the group during the rebalance, instead of being ungracefully fenced out as soon as the Connector generates a new set of task configs.

Source task startup

When a worker is assigned a source task, it will check to see if a task count record is present in the config topic after the most recent task configs for that connector.

If that is the case, the worker can assume that all producers for all prior generations of tasks for that connector have been fenced out, and it will bring up the source tasks it has been assigned. 

, if the new configuration for that connector contains one task, that new task will automatically fence out its single predecessor as they will use the same transactional ID.

Preparation for rebalance

When exactly.once.source.enabled  is set to true , workers will preemptively stop source tasks when a new set of task configurations for their connector is detected. In greater detail:

When a rebalance is triggered, before rejoining the cluster group, workers will preemptively stop all tasks of all source connectors for which task configurations are present in the config topic after the latest task count record. This step is not necessary to preserve exactly-once delivery guarantees, but should provide a reasonable opportunity for tasks to shut down gracefully before being forcibly fenced out.

Stopping these tasks will be done in parallel (as en-masse connector/task stop/start is already done for distributed workers), and any tasks that the worker cannot stop within the graceful task shutdown timeout period (which defaults to five seconds but can be controlled via the task.shutdown.graceful.timeout.ms  worker property) will be abandoned and allowed to continue running. They will be disabled later on if/when the leader fences out their producers during a round of zombie fencing (described below).

Because this work will be done in parallel, the default timeout for task shutdown is fairly low (five seconds), and the current number of threads used to perform en-masse task stops is eight, it is highly unlikely that this will hinder a worker's ability to rejoin a group before the rebalance timeout (which defaults to sixty seconds) times out.

Source task startup

When exactly.once.source.enabled  is set to true  for a worker, extra steps will be taken to ensure that tasks are only run when it is safe to do so.

Before a worker is starts a source task, it will first send a request to the leader's internal zombie fencing endpoint for the task's connector. If that request fails for any reason, the task will be marked as FAILED  and startup will be abortedOtherwise, the worker will emit a warning log message and refuse to bring the task up until a task count record for the connector is read from the config topic.

Once a worker has instantiated a producer for a source task, it will read to the end of the config topic once more, and if a new set of task configurations for that connector has been generated, it will abort startup of the task.

To summarize, the startup process for a task will be:

  1. Worker begins to start a source task (due to manual restart by the user, rebalance, reconfiguration, etc.)
  2. Worker issues a fencing request to the leader; if this request fails for any reason, the task is marked as FAILED  and startup is aborted
  3. Worker reads to the end of the config topic and verifies that a task count record is now present for the connector after the latest set of task configurations for the connector; if this is not the case, task startup is abandoned *
  4. If the connector is implicitly or explicitly configured to use a separate offsets topic:
    1. Worker attempts to create the topic (silently swallowing the error if the topic already exists); if this fails for a non-acceptable reason (using the same logic as the worker already does when creating its internal topics to differentiate between acceptable and non-acceptable errors), the task is marked as FAILED and startup is aborted
  5. Worker instantiates
  6. Worker is assigned a source task during rebalance
  7. If there is no task count record present in the config topic for that connector, or there is a set of task configurations present after the latest task count record, refuse to bring up the task and await a task count record for the connector to appear in the config topic before bringing up the task
  8. If the connector is configured to use a custom offsets topic, wait for the topic to become available (as described in 153817402)
  9. Otherwise:Instantiate a transactional producer for the task
  10. Read Worker reads to the end of the config topic
  11. If a new set of task configurations has since been generated for the connector, abandon startup task startup is abandoned *
    1. Otherwise, begin polling the task for data

* - If this happens, a new task will be automatically brought up in place of this task, in response to the new set of task configurations in the config topic. No action (such as restarting the task) will be necessary on the part of the user.

...

in response to the new set of task configurations in the config topic. No action (such as restarting the task) will be necessary on the part of the user.

Leader access to config topic

Regardless of whether exactly.once.source.enabled  is set to true  for a worker, if the worker is the leader of the cluster, it will now use a transactional producer to guarantee that at most one worker is capable of writing to the config topic at any time. In greater detail:

After a rebalance, if a worker discovers that it has become the leader of the cluster, it will instantiate a transactional producer whose transactional ID is connect-cluster-${groupId} , where ${groupId } is the group ID of the cluster. If users try to override this by setting the transactional.id property in their worker config, the user-provided value will be ignored and a warning message will be logged notifying them of this fact. The worker will use this producer for all writes it performs on the config topic. It will begin and commit a transaction for every record it writes. This may seem unusual--why use a transactional producer if transactions aren’t really necessary?--but it ensures that only the most recent leader is capable of producing to the config topic and that zombie leaders (rare though they may be) will not be able to.

...

By using a transactional producer on the leader, we can guarantee that a leader will only be able to write a task count record to the config topic if no other workers have become the leader and written new task configurations for the connector to the topic since then. We also address existing edge cases in Connect where a zombie leader may write incorrect information to the config topic.

Admin API to Fence out Transactional Producers

...

The worker that receives this 500 response will mark the Connector Task object FAILED in the status topic, and use the stack trace contained in the response to populate the trace  field for the status message.

...

If a worker is assigned a source task and sees a task count record in the config topic after the latest set of task configurations for that connector, it comes with the assumption that all producers for all prior task generations of that connector have been fenced out. And, at the time of the fencing performed by the leader, this assumption likely holds. However, it's possible that a worker may be assigned a source task, observe a task count record in the config topic that indicates that it is safe to start the task, then block for some large period of time before it is able to construct a producer for that task. For example, consider a scenario with workers F (follower), O (other follower), and L (leader), operating on task T of source connector C:

  1. Connector C is reconfigured and new task configurations for it are written to the config topicA rebalance is triggered and worker L is able to fence out all possibly-active producers for connector C, write a task count record for C C is reconfigured and new task configurations for it are written to the config topic, and then assign
  2. A rebalance is triggered and worker L assigns task T to worker F
  3. Worker F receives its assignment and, after requesting a successful round of fencing from worker L, is able to read the most-recent task count record for connector C from the config topic
  4. Worker F blocks, before it is able to instantiate a transactional producer for task T
  5. Connector C is reconfigured and new task configurations for it are written to the config topic
  6. A rebalance is triggered and worker L is able to fence out all possibly-active producers for connector C, write a task count record for C to the config topic, and then assign assigns task T to worker O as F has fallen out of the group
  7. Worker O receives its assignment and, after requesting a successful round of fencing from worker L, is able to read the most-recent task count record for connector C from the config topic, instantiate a transactional producer for task T, and then begin processing data from that task
  8. Worker F becomes unblocked, instantiates a transactional producer for task T, and then begins processing data for that task

...

When might this occur? If a task is configured with an aggressive SMT that drops all the records in a given batch, its producer will never attempt to send any records before committing the current transaction. And, once this occurs, the worker will invoke SourceTask::commit, which may cause the task to drop the data from the upstream system (for example, by acknowledging a batch of messages from a JMS broker). Even if this occurs, it should not be a problem: it’s fine to drop messages from the upstream system that are meant to be dropped by the connector, regardless of whether source offsets for those records have been committed to Kafka, as the end result is the same either way.

Permitted failure scenarios

These scenarios can compromise a cluster’s exactly-once delivery guarantees.

Heterogeneous clusters

If a worker is active and does not have support for exactly-once delivery, the entire cluster’s ability to provide exactly-once guarantees will be compromised. There is no way to fence out non-compliant workers. Even if one is developed, the problem would only be slightly transformed: if a worker is active that cannot be fenced out by other workers in the cluster, we’d be in the exact same place as before.

This failure scenario will need to be called out in public documentation so that users understand that they are responsible for the operational burden of ensuring that no non-compliant workers are active.

Non-worker, non-leader producer to config topic

If anything besides a valid leader worker ends up writing to the config topic, all assumptions that the cluster relies upon to determine whether it is safe to start tasks and whether a fencing is necessary can be invalidated. This should never happen in a reasonably-secured Connect cluster and if it does, compromised delivery guarantees may pale in comparison to the other consequences of corrupting the config topic.

Compatibility, Deprecation, and Migration Plan

the task to drop the data from the upstream system (for example, by acknowledging a batch of messages from a JMS broker). Even if this occurs, it should not be a problem: it’s fine to drop messages from the upstream system that are meant to be dropped by the connector, regardless of whether source offsets for those records have been committed to Kafka, as the end result is the same either way.

Permitted failure scenarios

These scenarios can compromise a cluster’s exactly-once delivery guarantees.

Heterogeneous clusters

If a worker is active and does not have support for exactly-once delivery, the entire cluster’s ability to provide exactly-once guarantees will be compromised. There is no way to fence out non-compliant workers. Even if one is developed, the problem would only be slightly transformed: if a worker is active that cannot be fenced out by other workers in the cluster, we’d be in the exact same place as before.

This failure scenario will need to be called out in public documentation so that users understand that they are responsible for the operational burden of ensuring that no non-compliant workers are active.

Non-worker, non-leader producer to config topic

If anything besides a valid leader worker ends up writing to the config topic, all assumptions that the cluster relies upon to determine whether it is safe to start tasks and whether a fencing is necessary can be invalidated. This should never happen in a reasonably-secured Connect cluster and if it does, compromised delivery guarantees may pale in comparison to the other consequences of corrupting the config topic.

Compatibility, Deprecation, and Migration Plan

Breaking changes

Worker principal permissions

Before upgrading a worker to 3.0, its producer principal must be given the following permissions on the Kafka cluster it writes to:

Operation

Resource Type

Resource Name

Write

TransactionalId

connect-cluster-${groupId} , where ${groupId } is the group ID of the cluster

Describe

TransactionalId

connect-cluster-${groupId} , where ${groupId } is the group ID of the cluster

IdempotentWrite

Cluster

Kafka cluster targeted by the Connect cluster

These new ACLs will be required regardless of whether exactly once source support is enabled on the worker.

Interoperability with rebalancing protocols

Exactly-once support will be possible with both the eager and incremental cooperative rebalancing protocols. The additional design complexity to support both is minimal, and given that both protocols are still supported for the Connect framework, it’s likely that some users will still be using eager rebalancing. They should not have to upgrade to the newer rebalancing protocol in order to gain access to this feature if the work to support both protocols is not prohibitive.

...

Connector principal permissions

Producer

Connector principal permissions

Producer

Before enabling With exactly-once source support on a worker, its producer principal must be given the following permissions on the Kafka cluster it writes to:

...

Operation

...

Resource Type

...

Resource Name

...

Write

...

TransactionalId

...

connect-cluster-${groupId} , where ${groupId } is the group ID of the cluster

...

Describe

...

TransactionalId

...

connect-cluster-${groupId} , where ${groupId } is the group ID of the cluster

...

IdempotentWrite

...

Cluster

...

Kafka cluster targeted by the Connect cluster

Each support enabled, each source connector’s producer principal must be given the following permissions on the Kafka cluster it writes to:

...

Note that there is currently no use case for Connect that requires a consumer principal to be configured for source connectors. As a result, this proposed change technically introduces "new" configuration properties for source connectors: consumer-level overrides prefixed with consumer.override. , as described in KIP-458: Connector Client Config Override Policy.

Admin

If the offsets topic for a connector does not exist yetWith exactly-once source support enabled, the connector’s admin principal must be given the following permissions on the Kafka cluster it reads offsets from:

Operation

Resource Type

Resource Name

WriteTransactionalId${groupId}-${connector}-${taskId}, for each task that the connector will create, where ${groupId} is the group ID of the Connect cluster, ${connector} is the name of the connector, and ${taskId} is the ID of the task (starting from zero). A wildcarded prefix of ${groupId}-${connector}* can be used for convenience if there is no risk of conflict with other transactional IDs or if conflicts are acceptable to the user.
DescribeTransactionalId${groupId}-${connector}-${taskId}, for each task that the connector will create, where ${groupId} is the group ID of the Connect cluster, ${connector} is the name of the connector, and ${taskId} is the ID of the task (starting from zero). A wildcarded prefix of ${groupId}-${connector}* can be used for convenience if there is no risk of conflict with other transactional IDs or if conflicts are acceptable to the user.

Create

Topic

Offsets topic used by the connector, which is the value of the offsets.storage.topic property in the connector’s configuration. If not provided, this ACL is not needed, as the worker’s shared offsets topic should be created automatically before any connectors are started.

Additionally, the connector's If the (implicitly- or explicitly-configured) offsets topic for a connector does not exist yet, its admin principal must be given the following permissions on the Kafka cluster it reads offsets from no matter what:that will host the offsets topic:

Create

OperationResource TypeResource NameDescribe

Topic

Offsets topic used by the connector, which is the value of the offsets.storage.topic property in the connector’s configuration. If not provided and the connector targets the same Kafka cluster that the worker uses for its internal topics, this ACL is not needed, as the worker’s shared offsets topic should

be created automatically before any connectors are started.

This is necessary in order to ensure that the Connect worker has read up to the end of the offsets topic, even when there are open transactions on the topic at the start of the read to the end. This will be accomplished by listing the end offsets for the offsets topic using an admin client with the read_uncommitted  isolation level, and then consuming from that topic with the read_committed  isolation level until at least those offsets listed by the admin client.

Mass (non-rolling) upgrade

Rolling upgrades to enable exactly-once support on a cluster will not work and instead a mass upgrade will be required (or at least, heavily recommended).

There are two reasons for this:

  1. The new internal endpoint will not be available during a rolling upgrade if a worker has been upgraded to a version of Connect that supports exactly-once source connectors, but the leader has not. The Connector will then fail, and no tasks for it will be allowed to start until/unless the user manually intervenes and restarts the Connector after the leader has been upgraded.
  2. A not-yet-upgraded zombie leader may write to the config topic during a rolling upgrade and corrupt its contents in a way that damages delivery guarantees.

Additionally, it becomes possible that some source tasks won't begin running until after the upgrade is complete (depending on when the leader of the cluster and the owner of their Connector object have both been upgraded). With this in mind, it's possible it may actually be less painful to perform a mass upgrade than a rolling upgrade, even if no risks to delivery guarantees were present.

The officially-recommended upgrade process to enable exactly-once support on a cluster will be:

...

be created automatically before any connectors are started.

Additionally, the connector's admin principal must be given the following permissions on the Kafka cluster it reads offsets from no matter what:

OperationResource TypeResource Name
DescribeTopicOffsets topic used by the connector, which is the value of the offsets.storage.topic property in the connector’s configuration. If not provided and the connector targets the same Kafka cluster that the worker uses for its internal topics, this ACL is not needed, as the worker’s shared offsets topic should be created automatically before any connectors are started.

This is necessary in order to ensure that the Connect worker has read up to the end of the offsets topic, even when there are open transactions on the topic at the start of the read to the end. This will be accomplished by listing the end offsets for the offsets topic using an admin client with the read_uncommitted  isolation level, and then consuming from that topic with the read_committed  isolation level until at least those offsets listed by the admin client.

Rolling upgrade(s)

At most two rolling upgrades will be required to enable exactly-once source support on a Connect cluster.

The first rolling upgrade will be to upgrade every worker to a version of Connect that can provide exactly-once source support.

The second rolling upgrade will be to actually enable exactly-once source support on each worker (by setting exactly.once.source.enabled  to true ).

Two upgrades will be required in order to ensure that the internal fencing endpoint is available on every worker before it is required by any of them.

...

Downgrade

Two kinds of downgrade are possible. Exactly-once support for a cluster can be disabled by setting exactly.once.source.enabled to false for workers in the cluster (a “soft” downgrade), or workers can be reverted to use older versions of the Connect framework that does not support exactly-once sources at all (a “hard” downgrade).

...

If a re-upgrade is desirable at this point, any separate per-connector offsets topics may need to be deleted beforehand. Otherwise, on startup, the worker will read from give precedence to the existing separate offsets topic, see that there is offset data for the connector present in it, and not perform a migration, causing all data that was produced by the connector after the downgrade to be duplicatedeven if the data in that topic is stale and there is actually newer information present in the global offsets topic.

Additional configuration properties

...

Rejected because: in the worst case, the connector is never recreated and a worker (presumably the leader) will have expended unnecessary effort on fencing out producers that aren’t going to compromise delivery guarantees and won’t have any bigger consequences than such tasks would already have on a Connect cluster today. In the best case, the connector is recreated immediately, in which case, a fencing would be performed anyways if no task count tombstone record were written to the config topic. Ultimately, no work is saved by doing this fencing on connector deletion, and some work may be unnecessary.

Full migration of offsets topics

Summary: instead of falling back on the global offsets topic, when creating a per-connector offsets topic for the first time (or if there are no offsets for a connector present in a pre-existing per-connector offsets topic), copy all relevant offset information over from the global offsets topic into the new per-connector offsets topic, and rely solely on the content of the per-connector offsets topic.

Rejected because: this approach was significantly more complex, involved duplicating existing data, and provided little if any advantages over the current proposal.

Future Work

Finer control over offset commits

...