Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • CDC utility will be started and automatically restarted in the case of failure by the OS or some external tools to provide stable change event processing.
  • CDC feature may be used for the deployment that has WAL only.
  • At the start of the CDC first consumed event will be the first event available in the WAL archive.
  • The lag between the record change and CDC consumer notification will depend on segment archiving timeout and requires additional configuration from the user.
  • CDC failover depends on the WAL archive segment count. If the CDC application will be down a relatively long time it possible that Ignite deletes certain archive segments,
    therefore consumer can't continue to receive changed records and must restart from the existing segments.

Online (real-time) CDC

In case of using separate process for capturing data changes from WAL archives makes the lag between CDC event happens and consumer notified about it is relatively big. It's proposed to provide opportunity to capture data and notify consumers directly from Ignite process. It helps minimize the lag by cost of additional memory usage.

User paths

Enable OnlineCDC on cluster:

  1. Configure Ignite to enable OnlineCDC.
  2. Configure ignite-cdc process (set BACKUP mode in CdcConfiguration)
  3. Start Ignite cluster in ACTIVE_READ_ONLY mode.
  4. Start background process ignite-cdc in BACKUP mode.
    1. ./ignite-cdc.sh cdc-config.xml
  5. Explicit start OnlineCDC on Ignite:
    1. ./control.sh –cdc online –start
  6. Move Ignite cluster to ACTIVE state.

Note, that ignite-cdc.sh can be run in 2 modes - BACKUP, ACTIVE (default):

  1. BACKUP is used as backup process for OnlineCDC, and then such process may fetch CDC configuration from IgniteConfiguration. Case is async replication between master and stand-by clusters.
  2. ACTIVE is used as independent process that doesn’t rely on OnlineCDC, has its own configuration. Case is filling a cold data lake.

Ignite node restart after failure:

...

Stop OnlineCDC and use ignite-cdc instead:

...

Stop both CDC - Online and ignite-cdc:

...

User interface

Ignite

  1. IgniteConfiguration#OnlineCdcConfiguration - CdcConsumer, keepBinary.
  2. DataStorageConfiguration#onlineCdcBufSize - by default (walSegments * walSegmentSize). it’s now 640 MB by default.
    1. All non-archived segments are fitted in memory. If OnlineCDC requires more space than it, it looks like ordinary CDC process should be used instead.
  3. DataRegionConfiguration#cdcMode - BACKGROUND, ONLINE (default is BACKGROUND)
    1. BACKGROUND - make hard links of archived segments into cdc directory, that is watched by the background ignite-cdc process.
    2. ONLINE - OnlineCDC enabled + do BACKGROUND job (ignite-cdc runs in BACKUP mode).
  4. Logs: 
    1. initialization (amount of records read during the restore)
    2. failure 
    3. buffer is full
  5. Metrics: 
    1. ordinary cdc metrics (count of wal segments, wal entries)
    2. current buffer size
    3. status of CDC (on/off)
    4. last committed WALPointer
    5. lag between buffer and WAL archive (segments)
    6. lag between WAL and CDC consumer (milliseconds).

ignite-cdc

  1. CdcConfiguration#mode - ACTIVE, BACKUP (default ACTIVE)
  2. Logs: 
    1. clearing cdc dir, switch state.
  3. Metrics:
    1. current state.

control.sh

  1. CdcOnline subcommand - enable/disable.

Segments

Note, there is a confusion of using “segment” word:

  1. WAL segments are represented as numerated files. Size of WAL segments is configured with DataStorageConfiguration#walSegmentSize.
  2. ReadSegment is a slice of the mmap WAL segment. It contains WAL records to sync with the actual file. Size of the segment differs from time to time and its maximum can be configured with DataStorageConfiguration#walBuffSize.

Initialization

On Ignite start during memory restore (in the main thread):

  1. If DataRegionConfiguration#cdcMode == ONLINE, then create CdcProcessor.
  2. CdcProcessor read from the Metastorage the last persisted CdcConsumerState.
    1. If CdcState == null OR CdcState#enabled is false then skip initialization.
  3. Initialization - collect logical updates from the CdcState#committedPtr until the end of WAL. See GridCacheDatabaseSharedManager#performBinaryMemoryRestore.

Online capturing of WALRecords

  1. Entrypoint for WALRecords to be captured by CDC. Options are:
    1. SegmentedRingByteBuffer (represents WAL segment) is multi-producer/single-consumer data structure.
      1. + Relying on the consumer workflow we can guarantee order of events.
      2. + Consumer is a background thread, capturing records doesn't affect performance of transactional threads
      3. - Can't filter physical records at the entrypoint (might waste the buffer space). Must deserialize and filter them later before actual sending to a CdcConsumer
      4. - The consumer is triggered by a schedule - every 500ms by default.
      5. - Logic has some differences depending on the WAL settings (mmap true/false, FULL_SYNC) 
    2. Capturing in FileWriteAheadLogManager#log(WALRecord).
      1. + Capture logical records only
      2. + Common logic for all WAL settings  
      3. - Captures record in buffer in transactional threads - might affect performance
      4. - CDC process must sort events by WALPointer by self - maintain concurrent ordering data structure, and implementing waiting for closing WAL gaps before sending.
      5. - Send events before they actually flushed in local Ignite node - lead to inconsistency between main and stand-by clusters.

CdcWorker

CdcWorker is a thread responsible for collecting WAL records, transforming them to CdcEvents and submitting them to a CdcConsumer. The worker collects records in the queue.

Capturing from the buffer (wal-sync-thread):

...

Otherwise, stop online CDC:

...

Body loop (cdc-worker-thread):

  1. Checks metadata (mappings, binary_meta, caches), prepare updates if any.
  2. Polls the Queue, transforms ReadSegment data to Iterator<CdcEvent>, pushes them to CdcConsumer.
  3. If CdcConsumer#onEvents returns true:
    1. Persists CdcConsumerState.
    2. Write OnlineCdcRecord record to WAL with the WALPointer.
  4. Optimization: transform segment buffers to CdcEvents in background (to reduce the buffer usage). CdcConsumer should be async then?
Code Block
languagejava
titleWAL records
OnlineCdcRecord extends WALRecord {
	private WALPointer last;
}

StopOnlineCdcRecord extends WALRecord {
	private WALPointer last;
}

ignite-cdc in BACKUP mode

  1. Parses WAL records, looking for OnlineCdcRecord and StopOnlineCdcRecord
  2. For OnlineCdcRecord - clears obsolete links from CDC directory
  3. For StopOnlineCdcRecord - switch to ACTIVE mode, start capturing from the last WALPointer (from previous OnlineCdcRecord).

Meta Storage

  1. Online CDC status - ON / OFF
  2. Committed pointer (confirmed by CdcConsumer).
Code Block
languagejava
titleCdcWorker
class CdcWorker {
	private final CdcConsumer consumer;
	
	private final long checkFreq;
	
	// Invoked in wal-sync-thread.
	public void offer(ReadSegment seg) {
		// Check capacity, adding segment to the queue.
	} 

	// online-cdc-thread
	public void body() {
		// Polling queue, push to CdcConsumer, writing CdcState to MetaStorage.
	}
}

Discussion Links

http://apache-ignite-developers.2346864.n4.nabble.com/DISCUSSION-IEP-59-CDC-Capture-Data-Change-tc49677.html

...