Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Offset reads

When exactly.once.source.enabledsupport is set to trueenabled for a worker, the consumers used by that worker to read source offsets from offsets topics will be configured with a new default property, isolation.level=read_committed. This will cause them to ignore any records that are part of a non-committed transaction, but still consume records that are not part of a transaction at all.

...

Offset (and record) writes

When exactly.once.source.enabled is support is set to trueenabled for a worker, all source tasks created by that worker will use a transactional producer to write to Kafka. All source records that they provide to the worker will be written to Kafka inside a transaction. Once it comes time for that transaction to be committed, offset information for the current batch will be written to the offsets topic, inside the same transaction. Then, the transaction will be committed. This will ensure that source offsets will be committed to Kafka if and only if the source records for that batch were also written to Kafka.

...

Per-connector offsets topics

Whether or not Regardless of the value for the exactly.once.source.enabled is set to true for support property on a worker, source connectors will be allowed to use custom offsets topics, configurable via the offsets.storage.topic property.

...

Hosting Kafka cluster

Regardless of whether the value for the exactly.once.source.enabled  is set to true  for the workersupport property on a worker, if a connector configuration contains a value for the offsets.storage.topic  property, it will use an offsets topic with that name on the Kafka cluster that it produces data to (which may be different from the one that hosts the worker's global offsets topic).

...

This endpoint will be secured by the session key mechanism introduced in KIP-507: Securing Internal Connect REST Endpoints and will only be used for inter-worker communication; users should not query it directly. It will be available regardless of the value of whether the exactly.once.source.enabled  is set to true in preparing  value for the worker.

When a worker receives a request to this endpoint, it will:

...

Preparation for rebalance

When exactly.once.source.enabledsupport  is set to trueto enabled  and a new set of task configurations for a connector is detected, then workers will preemptively stop source tasks for that connector. In greater detail:

When a rebalance is triggered, before rejoining the cluster group, workers will preemptively stop all tasks of all source connectors for which task configurations are present in the config topic after the latest task count record. This step is not necessary to preserve exactly-once delivery guarantees, but should provide a reasonable opportunity for tasks to shut down gracefully before being forcibly fenced out.

...

Source task startup

When exactly.once.source.enabledsupport  is set to true enabled  for a worker, extra steps will be taken to ensure that tasks are only run when it is safe to do so.

...

Leader access to config topic

Regardless of whether When exactly.once.source.enabledsupport  is set to truepreparing  for or enabled  for a worker, if the worker is the leader of the cluster, it will now use a transactional producer to guarantee that at most one worker is capable of writing to the config topic at any time. In greater detail:

...

If a worker is active and does not have support for exactly-once delivery (either because exactly.once.source.enabledsupport  is not set to false enabled , or the worker is running an older version of Connect for which the feature is not available), the entire cluster’s ability to provide exactly-once guarantees will be compromised. There is no way to fence out non-compliant workers. Even if one is developed, the problem would only be slightly transformed: if a worker is active that cannot be fenced out by other workers in the cluster, we’d be in the exact same place as before.

...

Compatibility, Deprecation, and Migration Plan

Breaking changes

Worker principal permissions

Interoperability with rebalancing protocols

Exactly-once support will be possible with both the eager and incremental cooperative rebalancing protocols. The additional design complexity to support both is minimal, and given that both protocols are still supported for the Connect framework, it’s likely that some users will still be using eager rebalancing. They should not have to upgrade to the newer rebalancing protocol in order to gain access to this feature if the work to support both protocols is not prohibitive.

Worker principal permissions

Before setting exactly.once.source.support  to preparing  or enabled , the producer principal for the worker must be given the following permissions on the Before upgrading a worker to 3.0, its producer principal must be given the following permissions on the Kafka cluster it writes to:

Operation

Resource Type

Resource Name

Write

TransactionalId

connect-cluster-${groupId} , where ${groupId } is the group ID of the cluster

Describe

TransactionalId

connect-cluster-${groupId} , where ${groupId }

 is the group ID of the cluster

IdempotentWrite

Cluster

Kafka cluster targeted by the Connect cluster

These new ACLs will be required regardless of whether exactly once source support is enabled on the worker.

Interoperability with rebalancing protocols

...

 is the group ID of the cluster

IdempotentWrite

Cluster

Kafka cluster targeted by the Connect cluster

Connector principal permissions

...

The first rolling upgrade will be to upgrade every worker to a version of Connect that can provide exactly-once source support, and to set exactly.once.source.support  to preparing .

The second rolling upgrade will be to actually enable exactly-once source support on each worker (by setting exactly.once.source.enabled  to true support  to enabled).

Two upgrades will be required in order to ensure that the internal fencing endpoint is available on every worker before it is required by any of them, and that the no non-leader workers are able to write to the config topic during the upgrade.

Downgrade

Two kinds of downgrade are possible. Exactly-once support for a cluster can be disabled by setting exactly.once.source.enabled to false for support  to either disabled  or preparing  for workers in the cluster (a “soft” downgrade), or workers can be reverted to use older versions of the Connect framework that does not support exactly-once sources at all (a “hard” downgrade).

...

The worker-level exactly.once.source.enabledsupport  and connector-level offsets.storage.topic , transaction.boundary , exactly.once.support , and transaction.boundary.interval.ms  configuration properties introduced here should not violate backwards compatibility, as they both all come with default values that preserve existing behavior. Technically, an existing connector may expose a offsets.storage.topic  configuration property that will now conflict with this newly-introduced framework property, but the risk is low enough to be acceptable.

...