Status
Current state: Under Discussion
Discussion thread: here
JIRA:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Source Connectors in Kafka Connect poll for data from sources like databases etc, convert them into a SourceRecord and push them to a Kafka topic. In this process, offsets are written periodically by the connect framework to an offsets topic. These offsets are critical for source connectors since these act as checkpoints signifying the point from where to start on the source side when a connector is restarted. Offsets could be things like id from a specific database table or some identifier in the change logs of databases for CDC based connectors. Keeping this offsets updated is important for 2 reasons:
- It allows the number of events that need to be reprocessed on connector restarts to remain as minimal as possible.
- For CDC based connectors in particular, if the offsets are updated regularly then the database doesn't need to retain change logs for a higher duration. Because in the event that the offset(like BinlogPosition in a BinlogFile in case of MySQL or LSN in Postgres WAL) from which the connector wants to start doesn't exist anymore due to a shorter retention time of change logs on the database, then the connector can't proceed without dataloss or would need to start capturing from the beginning again.
The catch to all of this is that for the source connector offsets to be able to updated regularly, then the connector should keep producing data regularly. However lot of times, the source from which the change events are being captured mayn't be receiving changes regularly. Taking the example of databases again, if the database is being used for batch workloads where in a huge volume of data is ingested for 2-3 hours and then there's no activity for the rest of the day. Another possible scenario where this might be a problem is when there are a lot of updates in the source but the connector is tracking changes from a subset of the entities in the source which aren't getting a lot of updates. In such cases, if the offset gets purged from the source side then we could land into issues similar to point number 2 above.
Proposed Changes
This KIP aims to address the problem described in the motivation section by providing the connector with an option within the framework to be able to update offsets without it needing to necessarily produce actual source data. The aim of the KIP is to provide this capability in a way which is non-invasive to the connector's core functionality, backward compatible, easy to toggle on/off.
To achieve this, this KIP proposes to add a new method updateOffsets(Map<Map<String, Object>, Map<String, Object> offsets)
in SourceTask
which would provide an opportunity to the tasks to update their source offsets. A more detailed description of this new method is provided in the Javadocs, but one important thing to note is that, this method would supply all the offsets that can be committed. Note that this could mean different things based upon the mode in which the connector is running and has been discussed in detail below.
By default this method would return an Optional.empty()
object meaning no offsets would be updated. The tasks are free to modify the offsets based on any logic they have. What does that mean?
- If a source partition is missing in the offsets map, the tasks can add that source partition along with it's offsets that the task thinks it should commit to the offsets.
- If the task wants to update the source offset for a given source partition, it can do so in the offsets map.
- A task can in theory even remove source partition(s) from the offsets map. Since this doesn't seem like a practical use case for such cases, this would be a no-op i.e any partitions removed from the passed offsets map wouldn't have any impact on the offsets that get committed. Either ways, when the size of the input map is more than the size of the returned map, a WARNING would be printed highlighting the missing partitions.
- If a task returns an Optional of a null object or an Optional of an empty map, even for such cases the behaviour would would be disabled.
The offsets passed to the updateOffsets
method would be the offset from the latest source record amongst all source records per partition. This way, if the source offset for a given source partition is updated, that offset is the one that gets committed for the source partition.
Invoking updateOffsets
Source connectors run in 2 modes: Atleast Once and Exactly Once. Based upon which mode is being run, the updateOffsets method would be invoked from different parts in the code. Let's look at them one by one.
Atleast Once Semantics
In this mode, the connector uses the offset.flush.interval.ms
config to try committing offsets. Currently, within the execute
method in AbstractWorkerSourceTask
, which runs in an infinite loop, at the beginning of every iteration, beginSendIteration
is invoked. This method acts as a hook for any custom periodic behaviour to be performed at the beginning of every iteration. It is in this method where WorkerSourceTask
when run in ALOS mode, invokes updateCommittableOffsets
method to find which offsets are committable and updates a snapshot of it. This computed snapshot of committableOffsets is what is used by SourceTaskOffsetCommitter
to commit offsets periodically. So, for the offsets of partitions for which no records were produced, they must be part of this snapshot. The committableOffsets are found by iterating through a map of Source Partitions and a list of submitted records against it, the new method updateOffsets
would be invoked just before committableOffsets are computed. This way, those source partitions can automatically be considered for committing offsets.
One important aspect about committableOffsets computation is that a source partition is eligible for offset commit only if atleast one of the records belonging to it has been acknowledged by Kafka Producer. This acknowledgment is done if either the record failed to be sent or was sent successfully. But the point is, only source partitions having acknowledged messages are considered committable. The number of acknowledged messages are also tracked in a synchronised manner by the framework to avoid double ack-ing of messages. Keeping all these things in mind, the offsets returned by the updateOffsets
method would be:
- Pushed directly to the
records
map inSubmittedRecords.
- Considered acknowledged automatically so that whenever offset flush is about to happen, these offsets would get committed.
These details would also be explained in the Javadocs of updateOffsets
method. Note that it is safe to invoke updateOffsets
from beginSendIteration
since it is a no-op for EOS and is invoked only in Atleast Once Semantics cases.
Exactly Once Semantics
EOS mode for source connectors has 3 different mechanisms for committing transactions defined by transaction.boundary
config with possible values: (POLL, INTERVAL and CONNECTOR). These modes define when should a producer transaction be started and committed. What is relevant for this KIP is the fact that the offsets written to the offsets topic would also be done in the same transaction. Also, the offsets committed are written to a worker's global offsets topic using a non transactional producer and Worker principal.
What the above means is that, the invocation of the updateOffsets
method should also happen in a way that the transactional boundaries are honoured. In EOS mode, whenever a record or a batch of records are transformed, converted and dispatched successfully to the producer, either recordDispatched
or batchDispatched
methods are invoked to ensure these records are not processed again. It is in these methods where it is also checked that if it is safe to commit the transaction based upon the transaction boundary. As already discussed, since the offsets supplied should honour the transaction boundary, the updateOffsets
method would be invoked just before an offset flush is attempted. Note that the semantics of the input to updateOffsets
changes in EOS mode. While in ALOS mode, it just meant the mapping between Source partitions and their corresponding Source offsets as per the output of the poll()
method, in EOS mode, it would mean the mapping between Source partitions and their corresponding Source offsets in the current transactional context. Also:
- The offsets to be sent to the
updateOffsets
fromcommitTransaction
would be retrieved by getting the unflushed offsets so far fromOffsetStorageWriter
. - Any offsets supplied back by the tasks, would be written directly to the unflushed offsets
These details would also be explained in the Javadocs of updateOffsets
method. Note that it is safe to invoke updateOffsets
from commitTransaction
as that is invoked from recordDispatched
and batchDispatched
for EOS modes which are no-ops for ALOS.
Public Interfaces
SourceTask
class would be enhanced to add a new method called updateOffsets(Map<Map<String, Object>, Map<String, Object> offsets)
.
/** * Hook to update the offsets for source partitions before offsets are committed. Source tasks can use this * hook to update the offsets for any source partition which isn't part of the offsets about to be committed or update * the offsets for any source partition. If any source partition is dropped, then it has no effect on the offsets committed. * The offsets passed as input per partition would be of the latest SourceRecord amongst all the records accumulated for it. * @param offsets the offsets that are about to be committed. Since there could be multiple SourceRecords * Note that this could mean different things based upon the mode: * At Least Once Mode: The offsets that are about to committed based on the previous poll * EOS: The offsets about to be committed based on the transaction boundary. * * @return An optional map of updated Source partitions and offsets. */ protected Optional<Map<Map<String, Object>, Map<String, Object>>> updateOffsets(Map<Map<String, Object>, Map<String, Object>> offsets) { return Optional.empty(); }
Migration Plan and Compatibility
The changes are totally backward compatible since the changes are off by default. Only if a connector chooses to override the updateOffsets
is when the connector becomes eligible for updating offsets. None of the existing connectors should be impacted even when this feature is available and deployed.
Rejected Alternatives
- a SourceRecord object with a null (or special) topic, or a null key and value
- Didn't want to rely on null semantics to avoid confusion.
- Another topic adds to the operational burden.
- a SourceRecord object with a null key and value
- Same, didn't want to rely on null semantics.
- a subclass of SourceRecord that denotes this particular kind of request.
- This could have been passed as part of output of poll() method but as such SourceRecord don't have sub-classes and yet is specialised. Extending it doesn't seem the right thing to do in this case. This would have also needed to throw a
ClassNotFoundException
for older runtimes which is already being done but it won't be needed if the approach from the KIP is followed.
- This could have been passed as part of output of poll() method but as such SourceRecord don't have sub-classes and yet is specialised. Extending it doesn't seem the right thing to do in this case. This would have also needed to throw a
- Changing the signature of
poll()
to accept aSourceRecordReceiver
parameter- This breaks backward compatibility
- Creating a dedicated topic where records are being send over periodic time intervals.
- This was the original approach prescribed in the KIP but it was shelved because:
- Another topic adds to the operational burden.
- Since the mechanism was akin to heartbeat records, the general consensus was to have heartbeat mechanism by not needing to produce records explictly.
- This was the original approach prescribed in the KIP but it was shelved because:
- That ticket has a lot of comments/suggestions and some open PRs as well. This KIP chose to use the option which seemed the most non-invasive.
has been open for a long time and discusses a lot of ideas. Some of the ideas are:- a SourceRecord object with a null (or special) topic, or a null key and value