Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

IDIEP-104
Author
Sponsor
Created 26/05/2023
Status
Status
colourGrey
titleDRAFT


Table of Contents

Motivation

IEP-59 Change Data Capture defines CDC that runs in near realtime. The background process ignite-cdc awaits WAL segments to be archived for data capturing. The awaiting leads to the lag between the moment event happens and consumer is notified about it. This lag can be relatively big (1s-10s seconds). It's proposed to provide opportunity to capture data and notify consumers directly from the Ignite node process. It will minimize the lag by cost of additional memory usage.

Description

User paths

Enable realtime CDC on cluster:

...

    ./control.sh --cdc realtime on

Command will return immediately, but it doesn't guarantee success of the switch. It might fallback to using the ignite-cdc only again. User should check logs and metrics here.

User interface

Ignite

  1. IgniteConfiguration#CdcConfiguration - CdcConsumer, keepBinary.
  2. DataStorageConfiguration#cdcBufSize - by default (walSegments * walSegmentSize). it’s now 640 MB by default.
    1. All non-archived segments are fitted in memory. If realtime CDC requires more space than it, it looks like ordinary CDC process should be used instead.
  3. Logs: 
    1. initialization (amount of records read during the restore)
    2. failure 
    3. buffer is full
    4. switch between modes.
  4. Metrics: 
    1. ordinary cdc metrics (count of wal segments, wal entries)
    2. current buffer size
    3. mode of CDC
    4. last committed WALPointer
    5. lag between buffer and WAL archive (segments)
    6. lag between WAL and CDC consumer (milliseconds).

...

  1. CdcRealtime subcommand
    1. ./control.sh --cdc realtime [ on | off ] 

Segments

Note, there is a confusion of using “segment” word:

  1. WAL segments are represented as numerated files. Size of WAL segments is configured with DataStorageConfiguration#walSegmentSize.
  2. ReadSegment is a slice of the mmap WAL segment. It contains WAL records to sync with the actual file. Size of the segment differs from time to time and its maximum can be configured with DataStorageConfiguration#walBuffSize.

Initialization

On Ignite start during memory restore (in the main thread):

  1. If CdcConfiguration#cdcConsumer is not null, then create CdcProcessor.
  2. CdcProcessor read from the Metastorage the last persisted CdcConsumerState.
    1. CdcState#enabled is false then skip initialization.
    2. If CdcState == null then initialize.
  3. Initialization - collect logical updates from the CdcState#committedPtr until the end of WAL. See GridCacheDatabaseSharedManager#performBinaryMemoryRestore.

Realtime capturing of WALRecords

Entrypoint for WALRecords to be captured by CDC. Options are:

...

First option is proposed to use.

CdcWorker

CdcWorker is a thread responsible for collecting WAL records and submitting them to a CdcConsumer. The worker collects records in the queue.

...

Code Block
languagejava
titleWAL records
RealtimeCdcRecord extends WALRecord {
	private WALPointer last;
}

StopRealtimeCdcRecord extends WALRecord {
	private WALPointer last;
}

TryStartRealtimeCdcRecord extends WALRecord {
	
}

ignite-cdc in PASSIVE mode

  1. Parses WAL records, looking for RealtimeCdcRecord and StopRealtimeCdcRecord
  2. For RealtimeCdcRecord - clears obsolete links from CDC directory
  3. For StopRealtimeCdcRecord - switch to ACTIVE mode, start capturing from the last WALPointer (from previous RealtimeCdcRecord).

ignite-cdc in ACTIVE mode

  1. Capturing WAL records
  2. Looking for TryStartRealtimeCdcRecord - after reaching it, persist CdcConsumerState locally, switch to PASSIVE mode.

Meta Storage

  1. Realtime CDC - ON / OFF
  2. Committed pointer (confirmed by CdcConsumer).

...

Code Block
languagejava
titleCdcWorker
class CdcWorker {
	private final CdcConsumer consumer;
	
	private final long checkFreq;
	
	// Invoked in wal-sync-thread.
	public void offer(ReadSegment seg) {
		// Check capacity, adding segment to the queue.
	} 

	// online-cdc-thread
	public void body() {
		// Polling queue, push to CdcConsumer, writing CdcState to MetaStorage.
	}
}

Risks and Assumptions

// Describe project risks, such as API or binary compatibility issues, major protocol changes, etc.

Discussion Links

// Links to discussions on the devlist, if applicable.

Reference Links

// Links to various reference documents, if applicable.

Tickets

// Links or report with relevant JIRA tickets.