Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. If DataRegionConfiguration#cdcMode == ONLINE, then create CdcProcessor.
  2. CdcProcessor read from the Metastorage the last persisted CdcConsumerState.
    1. If CdcState == null OR CdcState#enabled is false then skip initialization.
  3. Initialization - collect logical updates from the CdcState#committedPtr until the end of WAL. See GridCacheDatabaseSharedManager#performBinaryMemoryRestore.
Code Block
languagejava
titleCdcState
class CdcProcessor implements MetastorageLifecycleListener {
	private final Queue<ReadSegment> queue;

	void onReadyForRead() {
		// Read CdcState, if enabled then replay WAL here for collecting logical records.
	}

	void boolean onKernalStart() {
		// start CdcWorker.
	}
}

class CdcConsumerState {
    private final WALPointer committedPtr;

    private final boolean enabled;
}


Online capturing of WALRecords

...

  1. wal-sync-thread (the only reader of mmap WAL), under the lock that synchronizes preparing ReadSegment and rolling the WAL segment, to guarantee there are no changes in the underlying buffer.
  2. Offers a deep copy of flushing ReadSegments to the CdcWorker.
  3. CdcWorker checks remaining capacity and the buffer size:
    1. If the size fits the capacity then store the offered buffer data into the Queue. 
  4. Otherwise, stop online CDC, write StopOnlineCdcRecord to WAL
    1. Otherwise, stop online CDC:

      1. Persist actual CdcConsumerState with (enabled=false, last send WALPointer)
      2. Write StopOnlineCdcRecord into WAL (use the prepared CdcConsumerState).
      3. Clear the buffer, stop CdcWorker.
  5. Optimization: thread might filter ReadSegments by record type, and store only logical records.

...

  1. Checks metadata (mappings, binary_meta, caches), prepare updates if any.
  2. polls Polls the Queue, transforms ReadSegment data to Iterator<CdcEvent>, pushes them to CdcConsumer.
  3. If CdcConsumer#onEvents returns true:
    1. Persists CdcConsumerState.
    2. Write OnlineCdcRecord record to WAL with the WALPointer.
  4. Optimization: transform segment buffers to CdcEvents in background (to reduce the buffer usage). CdcConsumer should be async then?


Code Block
languagejava
titleWAL records
OnlineCdcRecord extends WALRecord {
	private WALPointer last;
}

StopOnlineCdcRecord extends WALRecord {
	private WALPointer last;
}

ignite-cdc in BACKUP mode

...