Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Although this would provide exactly-once support for most of the same connectors that the proposed design would (specifically, connectors that allocate each source partition to at most one task at a time), there are some cases that this would miss. This is because the assumption that the connector-specified source partitions that it gives to the Connect framework correspond to the actual source partitions that it consumes from (such as database tables) doesn't always hold. If, for example, a connector stores a version field in its source partitions, a change in that version field (either across connector versions or even during runtime for the connector) would allow zombie task instances for that connector to continue running and processing data, compromising exactly-once delivery.
  • Another issue with this approach is that the framework would only be able to instantiate a transactional producer for a source partition after a task communicates its intent to use that partition. There is no explicit mechanism for this communication right now. One implicit mechanism might be to create a transactional producer for a source partition the first time that a task produces data for that partition, but that wouldn't fence out zombie task instances early enough as they might be able to produce data in between when newer task instances read offset data (and choose where to begin consuming from the upstream system) and when they produce the first record for that data, which would then lead to duplicate record delivery. Another implicit mechanism might be to create a transactional producer for a source partition the first time that a task tries to read the offset for that source partition, but this would only provide exactly-once delivery guarantees for tasks that query the framework for source offsets of every source partition they write to before they perform that write. One common use case where this may not happen is if a task discovers a new object in the upstream system that it knows (by creation timestamp, for example) was created after the task was started; in this case, it's possible and even likely that a connector developer wouldn't see a need to check for source offsets for that object since it was just created.
  • The number of producers created for a connector could grow quite large, at which point, allocating enough buffer memory, broker connections, and other resources for each of these may have adverse impacts on the performance of the worker. This would also go against the pattern established by the Connect framework that, to scale a connector, its task count should be increased, as it would make the number of producers set up for a connector the same no matter if it’s running a single task or hundreds. This would be especially problematic if, for example, MirrorMaker 2 were set up to consume from 1000 slowly-moving topics with a single of task; if the average partition count per topic were 5, that'd lead to 5000 producers being created. This approach was initially used by Kafka Streams, but due to the scalability issues it presented, the follow-up KIP-447 has since been introduced as to eliminate excessive construction of producers.
  • It's unclear exactly how a transactional ID could be derived from a source partition, since source partitions consist of structured data (specifically, a Java Map<String, ?> ). We could try to serialize these as JSON and then use the resulting string as the transactional ID (likely in conjunction with the name of the connector in order to avoid collisions with other connectors), but that would lead to some extremely ugly transactional IDs. A hash of the JSON-serialized source partition could be taken, but that would make it even harder to predict, and might even lead to collisions which, though unlikely, would be preferable to avoid.
  • The operational burden on users in security-conscious environments would be significantly heavier, as the transactional IDs required to run a connector would be difficult to predict and could dynamically expand at runtime depending on the implementation of the connectors being run. At best, per-connector documentation would be required to expose a mostly-internal detail so that users could understand exactly which transactional IDs their connector's producer would need.
  • Because no guarantee is made that prior generations of tasks are fenced out, it becomes possible for a task running on a worker with an outdated configuration for the connector to fence out a task running with a correct configuration. This would not compromise delivery guarantees, but may lead to some serious user ire if, for example, the connector were reconfigured with a different transformation chain, converter, or another property that altered the format of data produced by it.

...