Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In case of using separate process for capturing data changes from WAL archives makes the lag between CDC event happens and consumer notified about it is relatively big. It's proposed to provide opportunity to capture data and notify consumers directly from Ignite process. It helps minimize the lag by cost of additional memory usage.

Issues to solve

  1. Entrypoint for WALRecords to be captured by CDC. Options are:
    1. SegmentedRingByteBuffer (represents WAL segment) is multi-producer/single-consumer data structure.
      1. + Relying on the consumer workflow we can guarantee order of events.
      2. + Consumer is a background thread, capturing records doesn't affect performance of transactional threads
      3. - Can't filter physical records at the entrypoint (might waste the buffer space). Must deserialize and filter them later before actual sending to a CdcConsumer
      4. - The consumer is triggered by a schedule - every 500ms by default.
      5. - Logic has some differences depending on the WAL settings (mmap true/false, FULL_SYNC) 
    2. Capturing in FileWriteAheadLogManager#log(WALRecord).
      1. + Capture logical records only
      2. + Common logic for all WAL settings  
      3. - Captures record in buffer in transactional threads - might affect performance
      4. - CDC process must sort events by WALPointer by self - maintain concurrent ordering data structure, and implementing waiting for closing WAL gaps before sending.
      5. - Send events before they actually flushed in local Ignite node - lead to inconsistency between main and stand-by clusters.
  2. Behavior after the CDC buffer is full. Options are:
    1. Stop online CDC and delegate capturing to the ignite-cdc.sh process (CdcMain, based on WAL archives)
    2. Temporary switch to the CdcMain, and switch back to online CDC after closing the gap.
  3. From which point online CDC starts capturing:
    1. Check local persisted OnlineCdcConsumerState - find last captured WALPointer.
    2. What if the pointer is less than any pointer recovered during Ignite node startup?
    3. What if ignite-cdc.sh streamed before node stop?

User interface

User paths

Enable OnlineCDC on cluster:

  1. Configure Ignite to enable OnlineCDC.
  2. Configure ignite-cdc process.
  3. Start Ignite cluster in ACTIVE_READ_ONLY mode.
  4. Start background process ignite-cdc in BACKUP mode.
    1. ./ignite-cdc.sh –backup
  5. Explicit start OnlineCDC on Ignite:
    1. ./control.sh –cdc online –start
  6. Move Ignite cluster to ACTIVE state.

Ignite node restart after failure:

  1. Start Ignite node as usual (OnlineCDC automatically recovers itself, ignite-cdc waits for the recovering)

Stop OnlineCDC and use ignite-cdc instead:

  1. Explicit stop OnlineCDC on Ignite (ignite-cdc automatically switches to active mode and starts capturing) 
    1. ./control.sh –cdc online –stop

Stop both CDC - Online and ignite-cdc:

  1. Explicit stop ignite-cdc.sh
  2. Explicit stop OnlineCDC

User interface

Ignite

  1. IgniteConfiguration#CdcConsumer - from extensions (kafka, thin client).
  2. DataStorageConfiguration#onlineCdcBufSize - by default (walSegments * walSegmentSize). it’s now 640 MB by default.
    1. All non-archived segments are fitted in memory. If OnlineCDC requires more space than it, it looks like ordinary CDC process should be used instead.
  3. DataStorageConfiguration#onlineCdcKeepBinary - default true.
  4. DataRegionConfiguration#cdcMode - BACKGROUND, ONLINE (default BACKGROUND)
    1. BACKGROUND - make hard links of archived segments into cdc directory, that is watched by the background ignite-cdc process.
    2. ONLINE - OnlineCDC enabled + do BACKGROUND job (ignite-cdc runs in BACKUP mode).
  5. Logs: 
    1. initialization (amount of records read during the restore)
    2. failure 
    3. buffer is full
  6. Metrics: 
    1. ordinary cdc metrics (count of wal segments, wal entries)
    2. current buffer size
    3. status of CDC (on/off)
    4. last committed WALPointer
    5. lag
  7. IgniteConfiguration#cdcConsumer - implementation of the CdcConsumer interface.
  8. IgniteConfiguration#cdcBufSize - size of the buffer used by CDC to store captured changes. Default is (walSegCount * walSegSize), for the default values it is 640MB.
  9. Logs: 
    1. Initialization info.
    2. Switch between working modes.
  10. metrics: 
  11. Ordinary CDC metrics (count of captured WAL segments and entries).
  12. Current working mode.
  13. Used buffer space.
  14. Lag
    1. between buffer and WAL archive (segments)
  15. .
  16. Lag
    1. lag between
  17. writing to
    1. WAL and
  18. capturing by
    1. CDC consumer (milliseconds).
  19. Last captured WALPointer.

Segments

Note, there is a confusion of using “segment” word:

  1. WAL segments are represented as numerated files. Size of WAL segments is configured with DataStorageConfiguration#walSegmentSize.
  2. ReadSegment is a slice of the mmap WAL segment. It contains WAL records to sync with the actual file. Size of the segment differs from time to time and its maximum can be configured with DataStorageConfiguration#walBuffSize.

CdcWorker

CdcWorker is a thread responsible for collecting WAL records, transforming them into cdc events, submitting them to the CdcConsumer. The worker has 2 modes:

  1. BUFFER_MODE - consumes WAL records from the CdcBufferQueue, that is filled directly from the WAL manager.
  2. ARCHIVE_MODE - consumes WAL records from archived WAL segments.
    1. Note, that the CdcBufferQueue is being filled in background in this mode.

Initialization:

  1. CdcWorker initialized with CdcConsumerState#loadWalState.
  2. Initial mode is ARCHIVE_MODE. It switches to the CdcBufferQueue after:
    1. The loaded pointer is not reached in the archive.
    2. OR the head of the buffer queue is less than the loaded pointer. 

Capturing from the buffer (wal-sync-thread):

  1. In wal-sync-thread (the only reader of mmap WAL), under the lock that synchronizes preparing ReadSegment and rolling the WAL segment, to guarantee there are no changes in the underlying buffer.
  2. Offers a deep copy of flushing ReadSegments to the CdcWorker.
  3. CdcWorker checks remaining capacity and the buffer size.
  4. If the size fits the capacity then store the offered buffer data into the Queue. 
  5. Otherwise: 
    1. Remove from the queue tail segments to free space for the offered buffer.
    2. Store the head of the offered buffer as nextHead (WALPointer).
    3. It captures data from the Queue while nextHead is not reached.
    4. Switch to the ARCHIVE_MODE.

Body loop (cdc-worker-thread):

...

  1. Polls the Queue, transforms ReadSegment data to Iterator<CdcEvent>, pushes them to CdcConsumer.
  2. Optimization: transform segment buffers to CDC events in background (to reduce the buffer usage). CdcConsumer should be async then?

...

  1. Similar to CdcMain - await archived segments.
  2. Submits the read WAL records to the CdcConsumer.
  3. For every segment/record checks a condition to switch to the bufferMode:
    1. Check the loaded WALPointer after initialization.
    2. OR while nextHead is not reached.

...

ignite-cdc

  1. CdcConfiguration#modeOnStart - ACTIVE, BACKUP (default ACTIVE)
    1. ACTIVE - ignite-cdc sends data from cdc dir to consumer
    2. BACKUP - ignite-cdc polls Ignite OnlineCDC metrics, clears cdc dir from hardlinks, starts working in ACTIVE mode after OnlineCDC fails. If Ignite node fails ignite-cdc waits for it to start to check OnlineCDC status.
  2. CdcConfiguration#checkOnlineFrequency - period of polling metrics of OnlineCDC from Ignite to make sure it still works, and clean obsolete links from cdc dir.
  3. Logs: 
    1. check of online cdc metrics, clearing cdc dir, switch state.
  4. Metrics:
    1. current state.

control.sh

  1. CdcOnline subcommand - enable/disable.


Segments

Note, there is a confusion of using “segment” word:

  1. WAL segments are represented as numerated files. Size of WAL segments is configured with DataStorageConfiguration#walSegmentSize.
  2. ReadSegment is a slice of the mmap WAL segment. It contains WAL records to sync with the actual file. Size of the segment differs from time to time and its maximum can be configured with DataStorageConfiguration#walBuffSize.

Initialization

On Ignite start during memory restore (in the main thread):

  1. If DataRegionConfiguration#cdcMode == ONLINE, then create CdcProcessor.
  2. CdcProcessor read from the Metastorage the last persisted CdcConsumerState.
    1. If CdcState == null OR CdcState#enabled is false then skip initialization.
  3. Initialization - collect logical updates from the CdcState#committedPtr until the end of WAL. See GridCacheDatabaseSharedManager#performBinaryMemoryRestore.


Code Block
languagejava
titleCdcState
class CdcProcessor implements MetastorageLifecycleListener {
	private final Queue<ReadSegment> queue;

	void onReadyForRead() {
		// Read CdcState, if enabled then replay WAL here for collecting logical records.
	}

	void boolean onKernalStart() {
		// start CdcWorker.
	}
}

class CdcConsumerState {
    private final WALPointer committedPtr;

    private final boolean enabled;
}

Online capturing of WALRecords

  1. Entrypoint for WALRecords to be captured by CDC. Options are:
    1. SegmentedRingByteBuffer (represents WAL segment) is multi-producer/single-consumer data structure.
      1. + Relying on the consumer workflow we can guarantee order of events.
      2. + Consumer is a background thread, capturing records doesn't affect performance of transactional threads
      3. - Can't filter physical records at the entrypoint (might waste the buffer space). Must deserialize and filter them later before actual sending to a CdcConsumer
      4. - The consumer is triggered by a schedule - every 500ms by default.
      5. - Logic has some differences depending on the WAL settings (mmap true/false, FULL_SYNC) 
    2. Capturing in FileWriteAheadLogManager#log(WALRecord).
      1. + Capture logical records only
      2. + Common logic for all WAL settings  
      3. - Captures record in buffer in transactional threads - might affect performance
      4. - CDC process must sort events by WALPointer by self - maintain concurrent ordering data structure, and implementing waiting for closing WAL gaps before sending.
      5. - Send events before they actually flushed in local Ignite node - lead to inconsistency between main and stand-by clusters.

CdcWorker

CdcWorker is a thread responsible for collecting WAL records, transforming them to CdcEvents and submitting them to a CdcConsumer. The worker collects records in the queue.

Capturing from the buffer (wal-sync-thread):

  1. wal-sync-thread (the only reader of mmap WAL), under the lock that synchronizes preparing ReadSegment and rolling the WAL segment, to guarantee there are no changes in the underlying buffer.
  2. Offers a deep copy of flushing ReadSegments to the CdcWorker.
  3. CdcWorker checks remaining capacity and the buffer size:
    1. If the size fits the capacity then store the offered buffer data into the Queue. 
    2. Otherwise, stop online CDC.

Body loop (cdc-worker-thread):

  1. polls queue, transforms ReadSegment data to Iterator<CdcEvent>, pushes them to CdcConsumer.
  2. Optimization: transform segment buffers to CdcEvents in background (to reduce the buffer usage). CdcConsumer should be async then?
  3. Persists CdcConsumerState. Commit the progress on CdcConsumer#onEvents returns true.

Meta Storage

  1. Online CDC status - ON / OFF
  2. Committed pointer (confirmed by CdcConsumer).


Code Block
languagejava
titleCdcWorker
class CdcWorker {
	private final CdcConsumer consumer;
	
	private final long checkFreq;
	
	// Invoked in wal-sync-thread.
	public void offer(ReadSegment seg) {
		// Check capacity, adding segment to the queue.
	} 

	// online-cdc-thread
	public void body() {
		// Polling queue, push to CdcConsumer, writing CdcState to MetaStorage.
	}
}


Discussion Links

http://apache-ignite-developers.2346864.n4.nabble.com/DISCUSSION-IEP-59-CDC-Capture-Data-Change-tc49677.html

...