Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In case of using separate process for capturing data changes from WAL archives makes the lag between CDC event happens and consumer notified about it is relatively big. it's proposed to provide opportunity to capture data and notify consumers directly from Ignite process. It helps minimize the lag by cost of additional memory usage.

User interface:

  1. IgniteConfiguration#cdcConsumer - implementation of the CdcConsumer interface.
  2. IgniteConfiguration#cdcBufSize - size of the buffer used by CDC to store captured changes. Default is (walSegCount * walSegSize), for the default values it is 640MB.
  3. Logs: 
    1. initialization Initialization info.
    2. switch Switch between working modes.
  4. metrics: 
    1. ordinary Ordinary CDC metrics (count of captured WAL segments and entries).
    2. current Current working mode.
    3. used Used buffer space.
    4. lag Lag between buffer and WAL archive (segments).
    5. lag Lag between writing to WAL and capturing by CDC (milliseconds).
    6. last Last captured WALPointer.

Segments:

Note, there is a confusion of using “segment” word:

  1. WAL segments are represented as numerated files. Size of WAL segments is configured with DataStorageConfiguration#walSegmentSize.
  2. ReadSegment is a slice of the mmap WAL segment. It contains WAL records to sync with the actual file. Size of the segment differs from time to time and its maximum can be configured with DataStorageConfiguration#walBuffSize.

CdcWorker:

CdcWorker is a thread responsible for collecting WAL records, transforming them into cdc events, submitting them to the CdcConsumer. The worker has 2 modes:

  1. BUFFER_MODE - consumes WAL records from the CdcBufferQueue, that is filled directly from the WAL manager.
  2. ARCHIVE_MODE - consumes WAL records from archived WAL segments.
    1. Note, that the CdcBufferQueue is being filled in background in this mode.

Initialization:

  1. CdcWorker initialized with CdcConsumerState#loadWalState.
  2. Initial mode is ARCHIVE_MODE. It switches to the CdcBufferQueue after:
    1. the The loaded pointer is not reached in the archive.
    2. OR the head of the buffer queue is less than the loaded pointer. 

Capturing from the buffer (wal-sync-thread):

  1. In wal-sync-thread (the only reader of mmap WAL), under the lock that synchronizes preparing ReadSegment and rolling the WAL segment, to guarantee there are no changes in the underlying buffer.
  2. Offers a deep copy of flushing ReadSegments to the CdcWorker.
  3. CdcWorker checks remaining capacity and the buffer size.
  4. If the size fits the capacity then store the offered buffer data into the Queue. 
  5. Otherwise: 
    1. remove Remove from the queue tail segments to free space for the offered buffer.
    2. store Store the head of the offered buffer as nextHead (WALPointer).
    3. It captures data from the Queue while nextHead is not reached.
    4. Switch to the ARCHIVE_MODE.

Body loop (cdc-worker-thread):

  1. BUFFER_MODE:
    1. polls Polls the Queue, transforms ReadSegment data to Iterator<CdcEvent>, pushes them to CdcConsumer.
    2. Optimization: transform segment buffers to CDC events in background (to reduce the buffer usage). CdcConsumer should be async then?
  2. ARCHIVE_MODE:
    1. similar Similar to CdcMain - await archived segments.
    2. submits Submits the read WAL records to the CdcConsumer.
    3. for For every segment/record checks a condition to switch to the bufferMode:
      1. check Check the loaded WALPointer after initialization.
      2. OR while nextHead is not reached.
  3. In both modes it persists CdcConsumerState. Policy for committing the progress: by WAL segment.

...