Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

IDIEP-104
Author
Sponsor
Created 26/05/2023
Status

Status
colour

Grey

Green
title

DRAFT

ACTIVE


Table of Contents

Motivation

IEP-59 Change Data Capture defines CDC that runs in near realtime. The background process ignite-cdc awaits WAL segments to be archived for data capturing. The awaiting leads to the lag between the moment event happens and consumer is notified about it. This lag can be relatively big (1s-10s seconds). It's proposed to provide opportunity to capture data and notify consumers directly from the Ignite node process. It will minimize the lag by cost of additional memory usage.

Description

User paths

Enable realtime CDC on cluster:

  1. Configure CDC in Ignite (cdcEnabled=true, set up CdcConsumer)
  2. Start Ignite node
  3. Start background process ignite-cdc (it automatically switches to the PASSIVE mode)

Ignite node restart after failure:

  1. Start Ignite node as usual (Ignite automatically recovers the CDC state)

Run CDC with only ignite-cdc.sh process:

    ./control.sh –cdc realtime off

Command stops Ignite internal cdc process, CDC relies on ignite-cdc only (it automatically switches to the ACTIVE state).

Try restart realtime CDC after working with online ignite-cdc.sh:

...

User interface

Ignite

  1. IgniteConfiguration#CdcConfiguration - CdcConsumer, keepBinary.
  2. DataStorageConfiguration#cdcBufSize - by default (walSegments * walSegmentSize). it’s now 640 MB by default.
    1. All non-archived segments are fitted in memory. If realtime CDC requires more space than it, it looks like ordinary CDC process should be used instead.
  3. Logs: 
    1. initialization (amount of records read during the restore)
    2. failure 
    3. buffer is full
    4. switch between modes.
  4. Metrics: 
    1. ordinary cdc metrics (count of wal segments, wal entries)
    2. current buffer size
    3. mode of CDC
    4. last committed WALPointer
    5. lag between buffer and WAL archive (segments)
    6. lag between WAL and CDC consumer (milliseconds).

ignite-cdc:

  1. Logs: 
    1. clearing cdc dir, switch state.
  2. Metrics:
    1. current state.

control.sh

  1. CdcRealtime subcommand
    1. ./control.sh --cdc realtime [ on | off ] 

Segments

Note, there is a confusion of using “segment” word:

  1. WAL segments are represented as numerated files. Size of WAL segments is configured with DataStorageConfiguration#walSegmentSize.
  2. ReadSegment is a slice of the mmap WAL segment. It contains WAL records to sync with the actual file. Size of the segment differs from time to time and its maximum can be configured with DataStorageConfiguration#walBuffSize.

Initialization

On Ignite start during memory restore (in the main thread):

  1. If CdcConfiguration#cdcConsumer is not null, then create CdcProcessor.
  2. CdcProcessor read from the Metastorage the last persisted CdcConsumerState.
    1. CdcState#enabled is false then skip initialization.
    2. If CdcState == null then initialize.
  3. Initialization - collect logical updates from the CdcState#committedPtr until the end of WAL. See GridCacheDatabaseSharedManager#performBinaryMemoryRestore.

Realtime capturing of WALRecords

Entrypoint for WALRecords to be captured by CDC. Options are:

  1. During read of SegmentedRingByteBuffer after fsync is invoked. It is a multi-producer/single-consumer data structure, then the only place to built-in is read operations (invoked at the moment of fsync).
    1. + Relying on the consumer workflow we can guarantee order of events.
    2. + Consumer is a background thread, capturing records doesn't affect performance of transactional threads
    3. - No opportunity to filter physical records at the entrypoint (might waste the buffer space). Will filter them before actual sending.
    4. - The consumer is triggered by a schedule - every 500ms by default.
    5. - Logic has some differences depending on the WAL settings (mmap true/false, FULL_SYNC) 
  2. Capturing in FileWriteAheadLogManager#log(WALRecord).
    1. + Capture logical records only
    2. + Common logic for all WAL settings  
    3. - Captures record in buffer in transactional threads - might affect performance
    4. - CDC process must sort events by WALPointer by self - maintain concurrent ordering data structure, and implementing waiting for closing WAL gaps before sending.
    5. - Send events before they actually flushed in local Ignite node - lead to inconsistency between main and stand-by clusters.

First option is proposed to use.

CdcWorker

CdcWorker is a thread responsible for collecting WAL records and submitting them to a CdcConsumer. The worker collects records in the queue.

Capturing from the buffer (wal-sync-thread):

...

Otherwise, stop realtime CDC:

...

Body loop (cdc-worker-thread):

  1. Checks metadata (mappings, binary_meta, caches - can check inside Ignite, not reading files), prepare updates if any.
  2. Polls the Queue, transforms ReadSegment data to Iterator<CdcEvent>, pushes them to CdcConsumer.
  3. If CdcConsumer#onEvents returns true:
    1. Persists CdcConsumerState.
    2. Write RealtimeCdcRecord record to WAL with the WALPointer.
  4. Optimization: transform segment buffers to CdcEvents in background (to reduce the buffer usage). CdcConsumer should be async then?

Try switch to the realtime mode:

...

FileWriteAheadLogManager logs record into mmap files, each is represented as a byte buffer FileWriteHandleImpl#SegmentedRingByteBuffer. The buffer designed for multiple writers, single reader.

The reader is a thread that is responsible for fsync'ing the file content on a disk. This role are performed by the following threads: wal-segment-syncer , db-checkpoint-thread  or user thread in case rollover WAL segment.

It's guaranteed that the reader reads the buffer sequentially from first byte until the buffer full. Then it's safe to notify CDC about new events from the reader.

Performance suggestions:

  1. The reader reads the buffer regularly, the default period is 500ms. The period can be configured with IGNITE_WAL_SEGMENT_SYNC_TIMEOUT.
  2. Single thread for preparing events, the reader buffer is already ordered, no need to spend additional resources for sorting events.
  3. Handling events as ByteBuffer representations is memory efficient: no additional heap usage is required.
  4. Iterating and filtering within such a buffer is fast, as it's required to read only few bytes for type and offset.

Restoring state after a node restart

During start node performs memory restore based on WAL - restore physical state and replay logical updates. Here CDC should collect events from WAL since the CdcConsumerState#walState until the restored pointer.

The restoring the state should be performed before any new events happened.

User paths

Enable realtime CDC on cluster:

  1. Configure CDC in Ignite (cdcEnabled=true, provide implementation of CdcManager)
  2. Start Ignite node
  3. Start background process ignite-cdc (it starts by default in the PASSIVE mode)

Ignite node restart after failure:

  1. Start Ignite node as usual (Ignite should automatically recover the CDC state)

User interface

Ignite

  1. CdcManager interface that provides 

CdcWorker

...

Code Block
languagejava
titleWAL records
RealtimeCdcRecord extends WALRecord {
	private WALPointer last;
}

StopRealtimeCdcRecord extends WALRecord {
	private WALPointer last;
}

TryStartRealtimeCdcRecord extends WALRecord {
	
}

ignite-cdc in PASSIVE mode

  1. Parses WAL records, looking for RealtimeCdcRecord and StopRealtimeCdcRecord
  2. For RealtimeCdcRecord - clears obsolete links from CDC directory
  3. For StopRealtimeCdcRecord - switch to ACTIVE mode, start capturing from the last WALPointer (from previous RealtimeCdcRecord).

ignite-cdc in ACTIVE mode

  1. Capturing WAL records
  2. Looking for TryStartRealtimeCdcRecord - after reaching it, persist CdcConsumerState locally, switch to PASSIVE mode.

Meta Storage

  1. Realtime CDC - ON / OFF
  2. Committed pointer (confirmed by CdcConsumer).

...

languagejava
titleCdcWorker

...

  1. .

...


Risks and Assumptions

// Describe project risks, such as API or binary compatibility issues, major protocol changes, etc.

Discussion Links

// Links to discussions on the devlist, if applicable.

Reference Links

// Links to various reference documents, if applicable.

Tickets

...

Jira
serverASF JIRA
jqlQuerylabels = IEP-104
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
// Links or report with relevant JIRA tickets.