Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Otherwise, the worker will emit a warning log message and refuse to bring the task up until a rebalance occurs and it has a chance to check again for a task count record after the most recent set of task configurations for the connector.

Leader access to config topic

After a rebalance, a worker may discover that it has become the leader of the cluster. When this happens, the worker will instantiate a transactional producer whose transactional ID is, by default, the group ID of the cluster (but may be overwritten by users using the transactional.id worker property). The worker will use this producer for all writes it performs on the config topic. It will begin and commit a transaction for every record it writes. This may seem unusual--why use a transactional producer if transactions aren’t really necessary?--but it ensures that only the most recent leader is capable of producing to the config topic and that zombie leaders (rare though they may be) will not be able to.

Once a worker has instantiated a producer for a source task, it will read to the end of the config topic once more, and if a new set of task configurations for that connector has been generated, it will abort startup of the task.

To summarize, the startup process for a task will be:

  1. Worker is assigned a source task during rebalance
  2. If there is no task count record present in the config topic for that connector, or there is a set of task configurations present after the latest task count record, refuse to bring up the task and await a subsequent rebalance
  3. Otherwise:
    1. Instantiate a transactional producer for the task
    2. Read to the end of the config topic
    3. If a new set of task configurations has since been generated for the connector, abort startup
    4. Otherwise, begin polling the task for data

Leader access to config topic

After a rebalance, a worker may discover that it has become the leader of the cluster. When this happens, the worker will instantiate a transactional producer whose transactional ID is, by default, the group ID of the cluster (but may be overwritten by users using the transactional.id worker property). The worker will use this producer for all writes it performs on the config topic. It will begin and commit a transaction for every record it writes. This may seem unusual--why use a transactional producer if transactions aren’t really necessary?--but it ensures that only the most recent leader is capable of producing to the config topic and that zombie leaders (rare though they may be) will not be able to.

If, for example, a leader stalls while attempting If, for example, a leader stalls while attempting to fence out a prior generation of task producers for a given connector during a rebalance, it may fall out of the group and become a zombie. If another worker is elected leader at this point and the connector is then reconfigured, it’s possible that the zombie leader may become unblocked and then attempt to write a task count record to the config topic after a new set of task configurations are written to the topic. This would corrupt the state of the config topic and violate a key assumption: a task count record present after task configurations for a connector means that it is not necessary to fence out a prior generation of task producers for that connector’s tasks before starting them.By using a transactional producer on the leader, we can guarantee that a leader will only be able a given connector during a rebalance, it may fall out of the group and become a zombie. If another worker is elected leader at this point and the connector is then reconfigured, it’s possible that the zombie leader may become unblocked and then attempt to write a task count record to the config topic if no other workers have become the leader and written new task configurations for the connector after a new set of task configurations are written to the topic since then.

Addressed failure/degradation scenarios

These scenarios will not compromise a cluster’s exactly-once delivery guarantees.

Leader cannot write task count record to config topic during rebalance

. This would corrupt the state of the config topic and violate a key assumption: a task count record present after task configurations for a connector means that it is not necessary to fence out a prior generation of producers for that connector’s tasks before starting them.

By using a transactional producer on the leader, we can guarantee that a leader will only be able to write a task count record to the config topic if no other workers have become the leader and written new task configurations for the connector to the topic since then.

Addressed failure/degradation scenarios

These scenarios will not compromise a cluster’s exactly-once delivery guarantees.

Leader cannot write task count record to config topic during rebalance

The leader will try indefinitely to write the task count to the config topic during rebalance and verify that it has been written by reading it The leader will try indefinitely to write the task count to the config topic during rebalance and verify that it has been written by reading it back from the topic after writing.

...

In order to prevent manual user restarts from compromising exactly-once delivery guarantees, Connect workers will Connect workers will read to the end of the config topic before proceeding with any source task restarts. If the worker is unable to read to the end of the config topic, the restart request will be met with a 500 timeout response. If the worker discovers during its read to the end of the topic that the task no longer exists (because the connector has been deleted or its task count reduced), it will return the usual 404 not found response. If a rebalance becomes expected because of new information read from the config topic, a 409 conflict response will be given.

Worker lag after being assigned source task but before instantiating producer

If a worker is assigned a source task and sees a task count record in the config topic after the latest set of task configurations for that connector, it comes with the assumption that all producers for all prior task generations of that connector have been fenced out. And, at the time of the fencing performed by the leader, this assumption likely holds. However, it's possible that a worker may be assigned a source task, observe a task count record in the config topic that indicates that it is safe to start the task, then block for some large period of time before it is able to construct a producer for that task. For example, consider a scenario with workers F (follower), O (other follower), and L (leader), operating on task T of source connector C:

  1. Connector C is reconfigured and new task configurations for it are written to the config topic
  2. A rebalance is triggered and worker L is able to fence out all possibly-active producers for connector C, write a task count record for C to the config topic, and then assign task T to worker F
  3. Worker F receives its assignment and is able to read the most-recent task count record for connector C from the config topic
  4. Worker F blocks, before it is able to instantiate a transactional producer for task T
  5. Connector C is reconfigured and new task configurations for it are written to the config topic
  6. A rebalance is triggered and worker L is able to fence out all possibly-active producers for connector C, write a task count record for C to the config topic, and then assign task T to worker O as F has fallen out of the group
  7. Worker O receives its assignment and is able to read the most-recent task count record for connector C from the config topic, instantiate a transactional producer for task T, and then begin processing data from that task
  8. Worker F becomes unblocked, instantiates a transactional producer for task T, and then begins processing data for that task

In this scenario, if a source partition initially assigned to task T is reassigned to a different task during the reconfiguration in step 5, the task instance created by worker F in step 8 will begin producing duplicate data and exactly-once delivery guarantees will be compromised.

This is possible because it is not guaranteed at the time of the fencing performed by the leader that all producers that could be active for a connector have been created yet. If a producer has not yet been created, it cannot be fenced out; if it is created after the fencing occurs, a different approach must be taken to ensure that no duplicate data is produced.

This case should be covered by the additional read to the end of the config topic before proceeding with any by the worker after it has brought up a transactional producer for a source task restarts. If the worker is unable to read to the end of the config topic, the restart request will be met with a 500 timeout response. If the worker discovers during connector is reconfigured after the worker became blocked and a round of producer fencing has occurred, then the worker will bring up a transactional producer for its source task, but then discover the connector reconfiguration and abort startup. If the worker has brought up a transactional producer and then become blocked before completing its read to the end of the config topic that the task no longer exists (because the connector has been deleted or its task count reduced), it will return the usual 404 not found response. If a rebalance becomes expected because of , the round of producer fencing by the leader should fence out that producer and any subsequent restart attempts will block until/unless the worker is able to complete a read to the end of the config topic (and handle any rebalances necessitated by new information read from the config topic, a 409 conflict response will be giventopic in the process).

Accidental task commit of dropped messages

...