You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

Status

Current state: Draft

Discussion thread: TBD

JIRA: N/A

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Motivation 

Source Connectors in Kafka Connect poll for data from sources like databases etc, convert them into a SourceRecord and push them to a Kafka topic. In this process, offsets are written at regular intervals(driven by `offset.flush.interval.ms`) by the connect framework to an offsets topic. These offsets are critical for source connectors since these act as checkpoints signifying the point from where to start on the source side when a connector is restarted. Offsets could be things like id from a specific database table or some identifier in the change logs of databases for CDC based connectors. Keeping this offsets updated is important for 2 reasons: 

  • It allows the number of events that need to be reprocessed on connector restarts to remain as minimal as possible.
  • For CDC based connectors in particular, if the offsets are updated regularly then the database doesn't need to retain change logs for a higher duration. Because in the event that the offset(like BinlogPosition in a BinlogFile in case of MySQL or LSN in Postgres WAL)  from which the connector wants to start doesn't exist anymore due to a shorter retention time of change logs on the database, then the connector can't proceed without dataloss or would need to start capturing from the beginning again. 

The catch to all of this is that for the source connector offsets to be able to updated regularly, then the connector should keep producing data regularly. However lot of times, the source from which the change events are being captured mayn't be receiving changes regularly.  Taking the example of databases again, if the database is being used for batch workloads where in a huge volume of data is ingested for 2-3 hours and then there's no activity for the rest of the day. Another possible scenario where this might be a problem is when there are a lot of updates in the source but the connector is tracking changes from a subset of the entities in the source which aren't getting a lot of updates. In such cases, if the offset gets purged from the source side then we could land into issues similar to point number 2 above.


Proposed Changes

This KIP aims to address the problem described in the motivation section by providing the connector with an option within the framework to be able to update offsets without it needing to necessarily produce actual source data. The aim of the KIP is to provide this capability in a way which is non-invasive to the connector's core functionality, backward compatible, easy to toggle on/off.

To achieve this, this KIP would add the following capabilities to the source connector in the connect framework:

  1. A new method would be added to the SourceTask  which would provide the opportunity to the connector tasks to produce a SourceRecord. Implementing connectors of this method can provide any SourceRecord which would help the connector to move it's offset by producing a list of  SourceRecords. Since this logic is very specific to each connector, it is being kept open ended so that each connector can write it's own logic. This method would also have a default implementation in SourceTask  returning an empty list thereby meaning there is no behavioural change for any connector.
  2. A new topic configuration would be added where the heartbeat records being produced via the newly introduced method (described above) would be pushed to. This would be a worker level config and also a connector level config. If no connector level config is supplied for a connector then the connector would write to the topic configured in the worker level config.
  3. A new configuration would be added to control how frequently any task would produce heartbeat records. This config would be used to enable/disable this feature. By default this feature would be disabled.

The actual names of the methods and configurations described above would be provided in the Public Interfaces section. Before that, let's review how this would work end to end.

Each connector task runs in it's own thread managed by WorkerTask . The core logic to poll the source is in AbstractWorkerSourceTask#execute  method which runs in an infinite loop until asked to stop. Before poll()  which is the method called repeatedly in the infinite loop. Before invoking poll, this KIP would add another method to produce heartbeat records. This is how the flow would look like:


  1. In AbstractWorkerSourceTask#execute  method, before invoking poll,  we would check if heartbeat mechanism is enabled for this task and if the timer has expired. If this mechanism is not enabled, then rest of the steps are no-op. 
  2. If the heartbeat mechanism is enabled, then it is checked if the heartbeat timer has expired. Expiry of the timer essentially means that the current time has exceeded the next scheduled time for heartbeat record production for this task.
  3. If the heartbeat timer hasn't expired, then nothing happens and the rest of the flow executes as is.
  4. If the heartbeat timer has expired then the produceHeartbeatRecords  method of the task would be invoked. Note that this method by default returns an empty list of SourceRecords. If a task has overridden this method, only then the tasks' method would be invoked. Either ways, the heartbeat timer would be reset to the next scheduled interval which is calculated as currentTimeMs + heartbeatIntervalMs.

Public Interfaces

SourceTask  class would be enhanced to add a new method called producedHeartbeatRecords .

SourceTask#produceHeartbeatRecords
public List<SourceRecord> produceHeartbeatRecords() {
        return new ArrayList<>();
    }






  • No labels