Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

It may take longer than the transaction timeout for a task to flush all of its records to Kafka. In this case, there are three some remedial actions that users can take to nurse their connector back to health:  increase tune their producer configuration for higher throughput, increase the transaction timeout for the producers used by the connector, decrease the offset commit interval, or reconfigure it to produce records with a lower throughput. We will include include these steps in the error message for a task that fails due to producer transaction timeout.

...

If a worker is active and does not have support for exactly-once delivery (either because exactly.once.source.enabled  is set to false , or the worker is running an older version of Connect for which the feature is not available), the entire cluster’s ability to provide exactly-once guarantees will be compromised. There is no way to fence out non-compliant workers. Even if one is developed, the problem would only be slightly transformed: if a worker is active that cannot be fenced out by other workers in the cluster, we’d be in the exact same place as before.

...

Operation

Resource Type

Resource Name

Write

TransactionalId

${groupId}-${connector}-${taskId}, for each task that the connector will create, where ${groupId} is the group ID of the Connect cluster, ${connector} is the name of the connector, and ${taskId} is the ID of the task (starting from zero). A wildcarded prefix of ${groupId}-${connector}* can be used for convenience if there is no risk of conflict with other transactional IDs or if conflicts are acceptable to the user.

Describe

TransactionalId

${groupId}-${connector}-${taskId}, for each task that the connector will create, where ${groupId} is the group ID of the Connect cluster, ${connector} is the name of the connector, and ${taskId} is the ID of the task (starting from zero). A wildcarded prefix of ${groupId}-${connector}* can be used for convenience if there is no risk of conflict with other transactional IDs or if conflicts are acceptable to the user.

Write

TransactionalId

${groupId}-${connector}. Only necessary if the connector uses a separate offsets topic.

Describe

TransactionalId

${groupId}-${connector}. Only necessary if the connector uses a separate offsets topic.

IdempotentWrite

Cluster

Kafka cluster targeted by the Connect clusterconnector.

Write

Topic

Offsets topic used by the connector, which is either the value of the offsets.storage.topic property in the connector’s configuration if provided, or the value of the offsets.storage.topic property in the worker’s configuration if not.

...

Soft downgrades should be possible via a cluster roll, where each worker is shut down, its configuration is altered to disable exactly-once source support, and then it is restarted. During the process, all connectors and their tasks should continue to run and process , some workers that are still configured with exactly-once source support enabled may refuse to start tasks for connectors if a task count record is not found for those connectors after the most recent set of task configurations. This isn’t especially likely given that the most common cause of this would be connector reconfiguration during downgrade, but even if it occurs, it will only affect workers that have yet to be downgraded. There should be no long-lasting impact on the cluster once all workers have been downgraded.

Hard downgrade

data, thanks to the same logic that makes a rolling upgrade possible.

Hard downgrade

Offsets accuracy

Because source task offsets on upgraded workers are still written to the worker’s global offsets topic, even if a downgraded worker does Hard downgrades can be performed in the same way as soft downgrades. Because source task offsets on upgraded workers are still written to the worker’s global offsets topic, even if a downgraded worker does not support per-connector offsets topics, it will can still pick up on relatively-recent source offsets for its connectors. Some of these offsets may be out-of-date or older than the ones in the connector’s separate offsets topic, but the only consequence of this would be duplicate writes by the connector, which will be possible on any cluster without exactly-once support enabled. Without any writes to the global offsets topic, all records processed by a connector since a switch to a dedicated offsets topic would be re-processed after the downgrade and would likely result in a flood of duplicates. While technically permissible given that the user in this case will have knowingly switched to a version of the Connect framework that doesn't support exactly-once source connectors (and is therefore susceptible to duplicate delivery of records), the user experience in this case could be quite bad, so a little extra effort on the part of Connect to significantly reduce the fallout of downgrades in this case is warranted.

If a re-upgrade is desirable at this point, any separate per-connector offsets topics may need to be deleted beforehand. Otherwise, the worker will give precedence to the existing separate offsets topic, even if the data in that topic is stale and there is actually newer information present in the global offsets topic.

Two-step downgrade

The safest way to perform a hard downgrade is to follow the same steps for a rolling upgrade, but in reverse. Specifically:

Perform an initial rolling downgrade wherein exactly.once.source.support  is set to false  for every worker.

Perform a second rolling downgrade where each worker is modified to use an earlier version of Connect.

All connectors and tasks should continue to run and process data, thanks to the same logic that makes a rolling upgrade possible.

Single-step downgrade

If a hard downgrade is performed in a single-step rolling downgrade (i.e., after each worker is shut down, it is immediately downgraded to a lower version of Connect), some tasks may begin to fail as their workers will be unable to reach the internal fencing endpoint on workers that have already been downgraded. Once the downgrade is complete, it should be sufficient to restart these tasks in order to get them running again.

Additional configuration properties

...