ID | IEP-104 |
Author | |
Sponsor | |
Created | 26/05/2023 |
Status | |
Motivation
IEP-59 Change Data Capture defines CDC that runs in near realtime. The background process ignite-cdc
awaits WAL segments to be archived for data capturing. The awaiting leads to the lag between the moment event happens and consumer is notified about it. This lag can be relatively big (1s-10s seconds). It's proposed to provide opportunity to capture data and notify consumers directly from the Ignite node process. It will minimize the lag by cost of additional memory usage.
Description
User paths
Enable realtime CDC on cluster:
- Configure CDC in Ignite (cdcEnabled=true, set up CdcConsumer)
- Start Ignite node
- Start background process
ignite-cdc
(it automatically switches to the PASSIVE
mode)
Ignite node restart after failure:
- Start Ignite node as usual (Ignite automatically recovers the CDC state)
Run CDC with only ignite-cdc.sh process:
./control.sh –cdc realtime off
Command stops Ignite internal cdc process, CDC relies on ignite-cdc only (it automatically switches to the ACTIVE state).
Try restart realtime CDC after working with online ignite-cdc.sh:
./control.sh --cdc realtime on
Command will return immediately, but it doesn't guarantee success of the switch. It might fallback to using the ignite-cdc
only again. User should check logs and metrics here.
User interface
Ignite
IgniteConfiguration#CdcConfiguration
- CdcConsumer, keepBinary.DataStorageConfiguration#cdcBufSize
- by default (walSegments * walSegmentSize). it’s now 640 MB by default. - All non-archived segments are fitted in memory. If realtime CDC requires more space than it, it looks like ordinary CDC process should be used instead.
- Logs:
- initialization (amount of records read during the restore)
- failure
- buffer is full
- switch between modes.
- Metrics:
- ordinary cdc metrics (count of wal segments, wal entries)
- current buffer size
- mode of CDC
- last committed WALPointer
- lag between buffer and WAL archive (segments)
- lag between WAL and CDC consumer (milliseconds).
ignite-cdc:
- Logs:
- clearing cdc dir, switch state.
- Metrics:
- current state.
control.sh
- CdcRealtime subcommand
- ./control.sh --cdc realtime [ on | off ]
Segments
Note, there is a confusion of using “segment” word:
- WAL segments are represented as numerated files. Size of WAL segments is configured with
DataStorageConfiguration#walSegmentSize
. ReadSegment
is a slice of the mmap WAL segment. It contains WAL records to sync with the actual file. Size of the segment differs from time to time and its maximum can be configured with DataStorageConfiguration#walBuffSize
.
Initialization
On Ignite start during memory restore (in the main thread):
- If
CdcConfiguration#cdcConsumer
is not null, then create CdcProcessor
. CdcProcessor
read from the Metastorage the last persisted CdcConsumerState
.CdcState#enabled
is false then skip initialization.- If
CdcState == null
then initialize.
- Initialization - collect logical updates from the CdcState#committedPtr until the end of WAL. See
GridCacheDatabaseSharedManager#performBinaryMemoryRestore
.
Realtime capturing of WALRecords
Entrypoint for WALRecords to be captured by CDC. Options are:
- During read of SegmentedRingByteBuffer after fsync is invoked. It is a multi-producer/single-consumer data structure, then the only place to built-in is read operations (invoked at the moment of fsync).
- + Relying on the consumer workflow we can guarantee order of events.
- + Consumer is a background thread, capturing records doesn't affect performance of transactional threads
- - No opportunity to filter physical records at the entrypoint (might waste the buffer space). Will filter them before actual sending.
- - The consumer is triggered by a schedule - every 500ms by default.
- - Logic has some differences depending on the WAL settings (mmap true/false, FULL_SYNC)
- Capturing in
FileWriteAheadLogManager#log(WALRecord).
- + Capture logical records only
- + Common logic for all WAL settings
- - Captures record in buffer in transactional threads - might affect performance
- - CDC process must sort events by WALPointer by self - maintain concurrent ordering data structure, and implementing waiting for closing WAL gaps before sending.
- - Send events before they actually flushed in local Ignite node - lead to inconsistency between main and stand-by clusters.
First option is proposed to use.
CdcWorker
CdcWorker is a thread responsible for collecting WAL records and submitting them to a CdcConsumer
. The worker collects records in the queue.
Capturing from the buffer (wal-sync-thread):
- wal-sync-thread (the only reader of mmap WAL), under the lock that synchronizes preparing ReadSegment and rolling the WAL segment, to guarantee there are no changes in the underlying buffer.
- Offers a deep copy of flushing ReadSegments to the CdcWorker.
- CdcWorker checks remaining capacity and the buffer size:
- If the size fits the capacity then store the offered buffer data into the Queue.
Otherwise, stop realtime CDC:
- Persist actual
CdcConsumerState
with (enabled=false, last send WALPointer) - Write
StopRealtimeCdcRecord
into WAL (use the prepared CdcConsumerState). - Clear the buffer, stop CdcWorker.
- Optimization: thread might filter ReadSegments by record type, and store only logical records.
Body loop (cdc-worker-thread):
- Checks metadata (mappings, binary_meta, caches - can check inside Ignite, not reading files), prepare updates if any.
- Polls the Queue, transforms ReadSegment data to Iterator<CdcEvent>, pushes them to CdcConsumer.
- If CdcConsumer#onEvents returns true:
- Persists CdcConsumerState.
- Write
RealtimeCdcRecord
record to WAL with the WALPointer.
- Optimization: transform segment buffers to CdcEvents in background (to reduce the buffer usage). CdcConsumer should be async then?
Try switch to the realtime mode:
- User sends the command to switch modes
- Ignite does initialization - CdcWorker, buffer
- Writes
TryStartRealtimeCdcRecord
into WAL and rollover current segment (since this record realtime cdc becomes active again). - Ignite monitors the CDC directory, awaits while segment with the record cleaned - it means ignite-cdc.sh reach the record and stops capturing the data.
- If buffer is not overflowed in this moment - Ignite enables CDCConsumer and starts sending the records
- Otherwise, ordinal stop is invoked (with writing StopRealtimeCdc record)
RealtimeCdcRecord extends WALRecord {
private WALPointer last;
}
StopRealtimeCdcRecord extends WALRecord {
private WALPointer last;
}
TryStartRealtimeCdcRecord extends WALRecord {
}
ignite-cdc in PASSIVE mode
- Parses WAL records, looking for
RealtimeCdcRecord
and StopRealtimeCdcRecord
- For Realtime
CdcRecord
- clears obsolete links from CDC directory - For
StopRealtimeCdcRecord
- switch to ACTIVE mode, start capturing from the last WALPointer (from previous RealtimeCdcRecord).
ignite-cdc in ACTIVE mode
- Capturing WAL records
- Looking for
TryStartRealtimeCdcRecord
- after reaching it, persist CdcConsumerState locally, switch to PASSIVE mode.
- Realtime CDC - ON / OFF
- Committed pointer (confirmed by CdcConsumer).
class CdcWorker {
private final CdcConsumer consumer;
private final long checkFreq;
// Invoked in wal-sync-thread.
public void offer(ReadSegment seg) {
// Check capacity, adding segment to the queue.
}
// online-cdc-thread
public void body() {
// Polling queue, push to CdcConsumer, writing CdcState to MetaStorage.
}
}
Risks and Assumptions
Discussion Links
// Links to discussions on the devlist, if applicable.
Reference Links
// Links to various reference documents, if applicable.
Tickets
type |
key |
summary |
assignee |
reporter |
priority |
status |
resolution |
created |
updated |
due |
JQL and issue key arguments for this macro require at least one Jira application link to be configured
|