Status
Current state: Under Discussion
Discussion thread: here
JIRA:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Source Connectors in Kafka Connect poll for data from sources like databases etc, convert them into a SourceRecord and push them to a Kafka topic. In this process, offsets are written periodically by the connect framework to an offsets topic. These offsets are critical for source connectors since these act as checkpoints signifying the point from where to start on the source side when a connector is restarted. Offsets could be things like id from a specific database table or some identifier in the change logs of databases for CDC based connectors. Keeping this offsets updated is important for 2 reasons:
- It allows the number of events that need to be reprocessed on connector restarts to remain as minimal as possible.
- For CDC based connectors in particular, if the offsets are updated regularly then the database doesn't need to retain change logs for a higher duration. Because in the event that the offset(like BinlogPosition in a BinlogFile in case of MySQL or LSN in Postgres WAL) from which the connector wants to start doesn't exist anymore due to a shorter retention time of change logs on the database, then the connector can't proceed without dataloss or would need to start capturing from the beginning again.
The catch to all of this is that for the source connector offsets to be able to updated regularly, then the connector should keep producing data regularly. However lot of times, the source from which the change events are being captured mayn't be receiving changes regularly. Taking the example of databases again, if the database is being used for batch workloads where in a huge volume of data is ingested for 2-3 hours and then there's no activity for the rest of the day. Another possible scenario where this might be a problem is when there are a lot of updates in the source but the connector is tracking changes from a subset of the entities in the source which aren't getting a lot of updates. In such cases, if the offset gets purged from the source side then we could land into issues similar to point number 2 above.
Proposed Changes
This KIP aims to address the problem described in the motivation section by providing the connector with an option within the framework to be able to update offsets without it needing to necessarily produce actual source data. The aim of the KIP is to provide this capability in a way which is non-invasive to the connector's core functionality, backward compatible, easy to toggle on/off.
To achieve this, this KIP would add the following capabilities to the source connector in the connect framework:
- A new method would be added to the
SourceTask
which would provide an opportunity to the connector tasks to update their source offsets. This new method would be supplying a Map of Source partition and Source offsets to the task and the connector task can return a map with source partition/offset mapping. By default, the method would return an empty map for backward compatibility reasons.
Note on the usage of word heartbeat. The heartbeat records being sent in this case is different from the heartbeats in the Kafka consumer. Also, there are some connectors (like Debezium) which provide this functionality from within the connector and it is called heartbeat records in case of Debezium as well. Hopefully with this KIP, connectors don't need to bake in a lot of logic into connectors and should be able to get around the problems described above by help from the Connect framework.
The actual names of the methods and configurations described above would be provided in the Public Interfaces section. Before that, let's review how this would work end to end.
Each connector task runs in it's own thread managed by WorkerTask
. The core logic to poll the source is in AbstractWorkerSourceTask#execute
method which runs in an infinite loop until asked to stop. Before invoking poll()
, which is the method called repeatedly in the infinite loop, this KIP would add another method to produce heartbeat records. This is how the flow would look like:
- In
AbstractWorkerSourceTask#execute
method, before invoking poll, we would check if heartbeat mechanism is enabled for this task and if the timer has expired. If this mechanism is not enabled, then rest of the steps are no-op. - If the heartbeat mechanism is enabled, then it is checked if the heartbeat timer has expired. Expiry of the timer essentially means that the current time has exceeded the next scheduled time for heartbeat record production for this task.
- If the heartbeat timer hasn't expired, then nothing happens and the rest of the flow executes as is.
- If the heartbeat timer has expired then the
produceHeartbeatRecords
method of the task would be invoked. Note that this method by default returns an empty list of SourceRecords. If a task has overridden this method, only then the tasks' method would be invoked. Either ways, the heartbeat timer would be reset to the next scheduled interval which is calculated as currentTimeMs + heartbeatIntervalMs.
Public Interfaces
SourceTask
class would be enhanced to add a new method called producedHeartbeatRecords()
.
/** * Heartbeat Records provide a mechanism for the connector's task to move the source offsets without necessarily * needing to produce data via the {@link #poll()} method. This method would be called at regular intervals * if the heartbeat mechanism has been enabled. * Note that, by default this method returns an empty list of {@link SourceRecord} which means that even if the * heartbeat mechanism is enabled for the connector task via the config, the task would still not emit any heartbeat * records. A connector task would need to override this method based on the logic relevant for * that source to be able to send heartbeat records without needing to produce records. * * @return a list of source records */ public List<SourceRecord> produceHeartbeatRecords() { return new ArrayList<>(); }
heartbeat.records.topic
A new configuration specifying the topic where heartbeat records would be stored if the heartbeat mechanism is enabled. This configuration is at a worker level and would be applicable for all the connector tasks running on that cluster unless overridden at a connector level.
- Type: String
- Valid Values: Non-empty string
- Default Value: connect-heartbeats
- Importance: low
heartbeat.records.topic
This configuration would be available at a source connector level and would override the topic to which the heartbeat records would be sent only for this connector. If the value of this config is empty or is not specified, then the global heartbeat topic would be used to send the heartbeat records for this connector if the heartbeat mechanism is enabled. If specified, the heartbeats topic will be created if it does not already exist on the Kafka cluster targeted by this connector (which may be different from the one used for the worker’s global heartbeats topic if the bootstrap.servers property of the connector’s producer has been overridden from the worker’s). Note that, this description is similar to the offsets.storage.topic
configuration.
- Type: String
- Valid Values: Non-empty string
- Default Value: null
- Importance: low
heartbeat.interval.ms
The interval after which heartbeat records would be sent. By default this value would be 0 in which case no heartbeat records would be sent. This configuration is at a worker level and is applicable for all connector tasks running on that worker. When set, it should be set to a value in the order of minutes to hours to avoid a lot of records being sent.
- Type: int
- Valid Values: Positive integer
- Default Value: 0
- Importance: low
heartbeat.interval.ms
The interval after which heartbeat records would be sent for this connector task. By default this value would be 0 in which case no heartbeat records would be sent. This configuration is at a connector level and overrides the worker level property. When set, it should be set to a value in the order of minutes to hours to avoid a lot of records being sent.
- Type: int
- Valid Values: Positive integer
- Default Value: 0
- Importance: low
Migration Plan and Compatibility
The changes are totally backward compatible since the changes are off by default. Only if a connector chooses to override the produceHeartbeatRecords
is when the connector becomes eligible for sending heartbeat records. None of the existing connectors should be impacted even when this feature is available and deployed.
Rejected Alternatives
- a SourceRecord object with a null (or special) topic, or a null key and value
- The special topic is already being used but didn't want to rely on null semantics to avoid confusion.
- a SourceRecord object with a null key and value
- Same, didn't want to rely on null semantics.
- a subclass of SourceRecord that denotes this particular kind of request
- This could have been taken but this KIP chose a more non-invasive option at this point as per me.
- Changing the signature of
poll()
to accept aSourceRecordReceiver
parameter- This breaks backward compatibility
- That ticket has a lot of comments/suggestions and some open PRs as well. This KIP chose to use the option which seemed the most non-invasive.
has been open for a long time and discusses a lot of ideas. Some of the ideas are:- a SourceRecord object with a null (or special) topic, or a null key and value