You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

Status

Current state: Draft

Discussion thread: TBD

JIRA: N/A

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Motivation 

Source Connectors in Kafka Connect poll for data from sources like databases etc, convert them into a SourceRecord and push them to a Kafka topic. In this process, offsets are written at regular intervals(driven by `offset.flush.interval.ms`) by the connect framework to an offsets topic. These offsets are critical for source connectors since these act as checkpoints signifying the point from where to start on the source side when a connector is restarted. Offsets could be things like id from a specific database table or some identifier in the change logs of databases for CDC based connectors. Keeping this offsets updated is important for 2 reasons: 

  • It allows the number of events that need to be reprocessed on connector restarts to remain as minimal as possible.
  • For CDC based connectors in particular, if the offsets are updated regularly then the database doesn't need to retain change logs for a higher duration. Because in the event that the offset(like BinlogPosition in a BinlogFile in case of MySQL or LSN in Postgres WAL)  from which the connector wants to start doesn't exist anymore due to a shorter retention time of change logs on the database, then the connector can't proceed without dataloss or would need to start capturing from the beginning again. 

The catch to all of this is that for the source connector offsets to be able to updated regularly, then the connector should keep producing data regularly. However lot of times, the source from which the change events are being captured mayn't be receiving changes regularly.  Taking the example of databases again, if the database is being used for batch workloads where in a huge volume of data is ingested for 2-3 hours and then there's no activity for the rest of the day. Another possible scenario where this might be a problem is when there are a lot of updates in the source but the connector is tracking changes from a subset of the entities in the source which aren't getting a lot of updates. In such cases, if the offset gets purged from the source side then we could land into issues similar to point number 2 above.


Proposed Changes

This KIP aims to address the problem described in the motivation section by providing the connector with an option within the framework to be able to update offsets without it needing to necessarily produce actual source data. The aim of the KIP is to provide this capability in a way which is non-invasive to the connector's core functionality, backward compatible, easy to toggle on/off.

To achieve this, this KIP would add the following capabilities to the source connector in the connect framework:

  1. A new method would be added to the SourceTask  which would provide the opportunity to the connector tasks to produce a SourceRecord. Implementing connectors of this method can provide any SourceRecord which would help the connector to move it's offset by producing a list of  SourceRecords. Since this logic is very specific to each connector, it is being kept open ended so that each connector can write it's own logic. This method would also have a default implementation in SourceTask  returning an empty list thereby meaning there is no behavioural change for any connector.
  2. A new topic configuration would be added where the heartbeat records being produced via the newly introduced method (described above) would be pushed to. This would be a worker level config and also a connector level config. If no connector level config is supplied for a connector then the connector would write to the topic configured in the worker level config.
  3. A new configuration would be added to control how frequently any task would produce heartbeat records. This config would be used to enable/disable this feature. By default this feature would be disabled.

The actual names of the methods and configurations described above would be provided in the Public Interfaces section. Before that, let's review how this would work end to end.

Each connector task runs in it's own thread managed by WorkerTask . The core logic to poll the source is in AbstractWorkerSourceTask#execute  method which runs in an infinite loop until asked to stop. Before poll()  which is the method called repeatedly in the infinite loop , prepareToPollTask()  is invoked which is a useful hook for health checks, metrics etc. This KIP chooses to use the prepareToPollTask()  hook to check if the heartbeat timer has expired. If it has, then it would invoke the newly created method in 




  • No labels