...
Current state: Under discussion
Discussion thread: TODO here
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
...
And a single per-connector configuration property will be added:
Name | Type | Default | Importance | Doc |
---|---|---|---|---|
atomic.source.offset.commit | BOOLEAN | false | LOW | Whether to commit offsets for this connector's source tasks in a producer transaction with the records produced by those tasks. For some source connectors, this is all that is required for exactly-once delivery. Will only apply in distributed mode; if used in a connector configuration on a standalone worker, will be ignored. |
Proposed Changes
New source offset read behavior
...
Before enabling atomic source offset commit for a connector, its producer principal must be given the following permissions on the Kafka cluster it writes to:
Operation | Resource Type | Resource Name |
---|---|---|
Write | TransactionalId | ${connector}-${taskId} , for each task that the connector will create, where ${connector} is the name of the connector and ${taskId} is the ID of the task (starting from zero). A prefix of ${connector}- can be used for convenience if there is no risk of conflict with other transaction IDs or if conflicts are acceptable to the user. |
IdempotentWrite | Cluster | Kafka cluster targeted by the Connect cluster |
Write | Topic | Internal offsets topic used by the Connect cluster |
Rolling upgrade
Once the necessary permissions are granted to the producers of all source connectors on the cluster that will use atomic source offset commit, it should be possible to enable atomic source offset commit via a single rolling upgrade of the cluster. If enabling atomic source offset for the entire worker, atomic.source.offset.commit=true
should configured on each worker before restart. If enabling for a specific set of connectors, atomic.source.offset.commit=true
can be configured for these connectors either before, during, or after the rolling upgrade; it makes little difference either way.
...