Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleSourceTask#updateOffsets
/**
     * Hook to update the offsets for source partitions before offsets are committed. Source tasks can use this
     * hook to update the offsets for any source partition which isn't part of the offsets about to be committed.
     * If any source partition is dropped, orthen it has been no effect on the offsets committed. However, if the offset
     * is set to a null offset,
value for any source partition, *a thentombstone itrecord haswould nobe effectwritten onto the offsets committedtopic.
     * @param offsets the offsets that are about to be committed
     *
     * @return A map consisting of offsets that the task wants to update. The default implementation returns {@code null}
     * which means there would not be any changes to the offsets about to be committed.
     * It is important to note that tasks shouldn't modify the original about to be committed offsets map passed as an argument. 
 */
     public Map<Map<String, Object>, Map<String, Object>> updateOffsets(Map<Map<String, Object>, Map<String, Object>> offsets) {
        return null;
    }

...

  1. If a source partition is missing in the offsets map, the tasks can add that source partition along with it's offsets that the task thinks it should commit to the offsets in the output map.
  2. It is also possible that a task might choose to send a tombstone record as an offset. This is not recommended and to prevent connectors shooting themselves in the foot due to this, such offsets won't be committed. An appropriate log line would be added as well. Also, for such cases, In such cases, a tombstone record would be written to the offsets topic. A connector can use this mechanism to remove offsets for partitions which it knows won't need to be processed anymore. This logic would be very specific to the connector and is not a supplement to  KIP-875: First-class offsets support in Kafka Connect should be preferred which allows removal of offsets by Cluster Admins. However, it can be very useful in certain connectors (like File or Object based connectors when a file once processed won't need to be reprocessed and can be safely removed from offsets topic) and hence this feature is being added as part of this KIP.
  3. If a task returns an empty map, the behaviour would would be disabled.

...

  • Jira
    serverASF JIRA
    serverId5aa69414-a9e9-3523-82ec-879b028fb15b
    keyKAFKA-3821
    has been open for a long time and discusses a lot of ideas. Some of the ideas are:Allow writing tombstone offsets to offsets topic when a task sends null offset for a partition. Currently, even though a task might choose to send a null offset for a partition, that won't have any effect on the offsets being flushed
    • a SourceRecord object with a null (or special) topic, or a null key and value
      • Didn't want to rely on null semantics to avoid confusion.
      • Another topic adds to the operational burden.
    • a SourceRecord object with a null key and value
      • Same, didn't want to rely on null semantics.
    • a subclass of SourceRecord that denotes this particular kind of request. 
      • This could have been passed as part of output of poll() method  but as such SourceRecord don't have sub-classes and yet is specialised. Extending it doesn't seem the right thing to do in this case. This would have also needed to throw a ClassNotFoundException  for older runtimes which is already being done but it won't be needed if the approach from the KIP is followed.
    • Changing the signature of poll() to accept a SourceRecordReceiver parameter
      • This breaks backward compatibility
    • Creating a dedicated topic where records are being send over periodic time intervals.
      • This was the original approach prescribed in the KIP but it was shelved because:
        • Another topic adds to the operational burden.
        • Since the mechanism was akin to heartbeat records, the general consensus was to have heartbeat mechanism by not needing to produce records explictly.
    • That ticket has a lot of comments/suggestions and some open PRs as well. This KIP chose to use the option which seemed the most non-invasive.

Future Work

    • .