You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 39 Next »

IDIEP-59
Author
SponsorAnton Vinogradov
Created14.10.2020 
StatusDRAFT


Motivation

Many use-cases build on observation and processing changed records.

These use-cases include but not limited by

  • Export data into some warehouse, full-text search system, or distributed log system.
  • Online statistics and analytics
  • Wait and respond to some specific events or data changes.

For now, such scenarios are hard to implement in Ignite.

The only solution that can help with it, for now, is a Continuous Query.

Disadvantages of the CQ in described scenarios:

  • CQ requires data to be sent over the network
  • CQ parts (filter, transformer) live inside server node JVM so issues in it may affect server node stability.
  • Slow CQ listener leads to increasing of the memory consumption of the server node.
  • Fails of the CQ listener lead to the loss of the events.

The convenient solution should be:

  • Independence from the server node process (JVM) - issues and failures of the consumer shouldn't lead to server node instability.
  • Notification guarantees and failover - i.e. track and save a pointer to the last consumed record. Continue notification from this pointer in case of restart.
  • Resilience for the consumer - it's not an issue when a consumer temporarily consumes slower than data appear.

Description:

Ignite CDC is a new utility that should be run on the server node host. CDC utility watches by the appearance of the WAL archive segments.

On the segment archiving, the utility iterates it using the existing WAL Iterator and notifications CDC Consumer of each record from the segment.


Design choices:

  • CDC application works as a separate process.
  • CDC relies on the existing Ignite mechanism - WAL.
  • IEP Scope - deliver local data change events to a local consumer.
  • CDC keeps consumer offset in a special file.
    WAL process will start from this offset on restart.
  • To prevent interference between the WAL archive process and CDC Ignite will create a hard link to the newly created segment in a special folder.
    After success processing, CDC will delete this link.
    Note, data will be removed from the disk only after CDC and Ignite will remove the link to a segment from both corresponding folders.
  • To manage minimal event gap new configuration timeout introduced - WalForceArchiveTimeout.
  • Flag to distinguish DataEntry on primary and backup nodes introduced.
  • All public APIs market with @IgniteExperimental to be able to improve it based on real-world usage feedback.
  • CDC consumer will be notified about binary metadata changes (Phase 2).
  • Configuration parameter "Maximum CDC folder size" will be implemented to prevent disk volume exceed.
  • CDC folder resolved using the logic as Ignite node does.
  • CDC application should be restarted by the OS mechanism in case of any error (destination unavailability, for example)
  • Initially, single CDC consumer supported. Support of several concurrently running consumers will be implemented in Phase2.


Public API:


Java API
/** Consumer of data change events. */
@IgniteExperimental
public interface ChangeDataCaptureConsumer {
    /**
     * Starts the consumer.
     */
    public void start();

    /**
     * Handles entry changes events.
     * If this method return {@code true} then current offset will be stored
     * and ongoing notifications after CDC application fail/restart will be started from it.
     *
     * @param events Entry change events.
     * @return {@code True} if current offset should be saved on the disk
     * to continue from it in case any failures or restart.
     */
    public boolean onEvents(Iterator<ChangeDataCaptureEvent> events);

    /**
     * Stops the consumer.
     * This methods can be invoked only after {@link #start()}.
     */
    public void stop();
}

/**
 * Event of single entry change.
 * Instance presents new value of modified entry.
 *
 * @see IgniteCDC
 * @see CaptureDataChangeConsumer
 */
@IgniteExperimental
public interface ChangeDataCaptureEvent extends Serializable {
    /**
     * @return Key for the changed entry.
     */
    public Object key();

    /**
     * @return Value for the changed entry or {@code null} in case of entry removal.
     */
    @Nullable public Object value();

    /**
     * @return {@code True} if event fired on primary node for partition containing this entry.
     * @see <a href="
     * https://ignite.apache.org/docs/latest/configuring-caches/configuring-backups#configuring-partition-backups">
     * Configuring partition backups.</a>
     */
    public boolean primary();

    /**
     * Ignite split dataset into smaller chunks to distribute them across the cluster.
     * {@link ChangeDataCaptureConsumer} implementations can use {@link #partition()} to split changes processing
     * in the same way as it done for the cache.
     *
     * @return Partition number.
     * @see Affinity#partition(Object)
     * @see Affinity#partitions()
     * @see <a href="https://ignite.apache.org/docs/latest/data-modeling/data-partitioning">Data partitioning</a>
     * @see <a href="https://ignite.apache.org/docs/latest/data-modeling/affinity-collocation">Affinity collocation</a>
     */
    public int partition();

    /**
     * @return Version of the entry.
     */
    public CacheEntryVersion version();

    /**
     * @return Cache ID.
     * @see org.apache.ignite.internal.util.typedef.internal.CU#cacheId(String)
     * @see CacheView#cacheId()
     */
    public int cacheId();
}

/**
 * Entry event order.
 * Two concurrent updates of the same entry can be ordered based on {@link ChangeEventOrder} comparsion.
 * Greater value means that event occurs later.
 */
@IgniteExperimental
public interface CacheEntryVersion extends Comparable<CacheEntryVersion>, Serializable {
    /**
     * Order of the update. Value is an incremental counter value. Scope of counter is node.
     * @return Version order.
     */
    public long order();

    /** @return Node order on which this version was assigned. */
    public int nodeOrder();

    /**
     * Cluster id is a value to distinguish updates in case user wants to aggregate and sort updates from several
     * Ignite clusters. {@code clusterId} id can be set for the node using
     * {@link GridCacheVersionManager#dataCenterId(byte)}.
     *
     * @return Cluster id.
     */
    public byte clusterId();

    /** @return Topology version plus number of seconds from the start time of the first grid node. */
    public int topologyVersion();

    /**
     * If source of the update is "local" cluster then {@code null} will be returned.
     * If updated comes from the other cluster using {@link IgniteInternalCache#putAllConflict(Map)}
     * then entry version for other cluster.
     * @return Replication version.
     * @see IgniteInternalCache#putAllConflict(Map)
     * @see IgniteInternalCache#removeAllConflict(Map)
     */
    public CacheEntryVersion otherClusterVersion();
}

Risks and Assumptions

  • CDC utility will be started and automatically restarted in the case of failure by the OS or some external tools to provide stable change event processing.
  • CDC feature may be used for the deployment that has WAL only.
  • At the start of the CDC first consumed event will be the first event available in the WAL archive.
  • The lag between the record change and CDC consumer notification will depend on segment archiving timeout and requires additional configuration from the user.
  • CDC failover depends on the WAL archive segment count. If the CDC application will be down a relatively long time it possible that Ignite deletes certain archive segments,
    therefore consumer can't continue to receive changed records and must restart from the existing segments.

Online (real-time) CDC

In case of using separate process for capturing data changes from WAL archives makes the lag between CDC event happens and consumer notified about it is relatively big. It's proposed to provide opportunity to capture data and notify consumers directly from Ignite process. It helps minimize the lag by cost of additional memory usage.

User paths

Enable OnlineCDC on cluster:

  1. Configure Ignite to enable OnlineCDC.
  2. Configure ignite-cdc process.
  3. Start Ignite cluster in ACTIVE_READ_ONLY mode.
  4. Start background process ignite-cdc in BACKUP mode.
    1. ./ignite-cdc.sh –backup
  5. Explicit start OnlineCDC on Ignite:
    1. ./control.sh –cdc online –start
  6. Move Ignite cluster to ACTIVE state.

Ignite node restart after failure:

  1. Start Ignite node as usual (OnlineCDC automatically recovers itself, ignite-cdc waits for the recovering)

Stop OnlineCDC and use ignite-cdc instead:

  1. Explicit stop OnlineCDC on Ignite (ignite-cdc automatically switches to active mode and starts capturing) 
    1. ./control.sh –cdc online –stop

Stop both CDC - Online and ignite-cdc:

  1. Explicit stop ignite-cdc.sh
  2. Explicit stop OnlineCDC

User interface

Ignite

  1. IgniteConfiguration#CdcConsumer - from extensions (kafka, thin client).
  2. DataStorageConfiguration#onlineCdcBufSize - by default (walSegments * walSegmentSize). it’s now 640 MB by default.
    1. All non-archived segments are fitted in memory. If OnlineCDC requires more space than it, it looks like ordinary CDC process should be used instead.
  3. DataStorageConfiguration#onlineCdcKeepBinary - default true.
  4. DataRegionConfiguration#cdcMode - BACKGROUND, ONLINE (default BACKGROUND)
    1. BACKGROUND - make hard links of archived segments into cdc directory, that is watched by the background ignite-cdc process.
    2. ONLINE - OnlineCDC enabled + do BACKGROUND job (ignite-cdc runs in BACKUP mode).
  5. Logs: 
    1. initialization (amount of records read during the restore)
    2. failure 
    3. buffer is full
  6. Metrics: 
    1. ordinary cdc metrics (count of wal segments, wal entries)
    2. current buffer size
    3. status of CDC (on/off)
    4. last committed WALPointer
    5. lag between buffer and WAL archive (segments)
    6. lag between WAL and CDC consumer (milliseconds).

ignite-cdc

  1. CdcConfiguration#modeOnStart - ACTIVE, BACKUP (default ACTIVE)
    1. ACTIVE - ignite-cdc sends data from cdc dir to consumer
    2. BACKUP - ignite-cdc polls Ignite OnlineCDC metrics, clears cdc dir from hardlinks, starts working in ACTIVE mode after OnlineCDC fails. If Ignite node fails ignite-cdc waits for it to start to check OnlineCDC status.
  2. CdcConfiguration#checkOnlineFrequency - period of polling metrics of OnlineCDC from Ignite to make sure it still works, and clean obsolete links from cdc dir.
  3. Logs: 
    1. check of online cdc metrics, clearing cdc dir, switch state.
  4. Metrics:
    1. current state.

control.sh

  1. CdcOnline subcommand - enable/disable.


Segments

Note, there is a confusion of using “segment” word:

  1. WAL segments are represented as numerated files. Size of WAL segments is configured with DataStorageConfiguration#walSegmentSize.
  2. ReadSegment is a slice of the mmap WAL segment. It contains WAL records to sync with the actual file. Size of the segment differs from time to time and its maximum can be configured with DataStorageConfiguration#walBuffSize.

Initialization

On Ignite start during memory restore (in the main thread):

  1. If DataRegionConfiguration#cdcMode == ONLINE, then create CdcProcessor.
  2. CdcProcessor read from the Metastorage the last persisted CdcConsumerState.
    1. If CdcState == null OR CdcState#enabled is false then skip initialization.
  3. Initialization - collect logical updates from the CdcState#committedPtr until the end of WAL. See GridCacheDatabaseSharedManager#performBinaryMemoryRestore.
CdcState
class CdcProcessor implements MetastorageLifecycleListener {
	private final Queue<ReadSegment> queue;

	void onReadyForRead() {
		// Read CdcState, if enabled then replay WAL here for collecting logical records.
	}

	void boolean onKernalStart() {
		// start CdcWorker.
	}
}

class CdcConsumerState {
    private final WALPointer committedPtr;

    private final boolean enabled;
}

Online capturing of WALRecords

  1. Entrypoint for WALRecords to be captured by CDC. Options are:
    1. SegmentedRingByteBuffer (represents WAL segment) is multi-producer/single-consumer data structure.
      1. + Relying on the consumer workflow we can guarantee order of events.
      2. + Consumer is a background thread, capturing records doesn't affect performance of transactional threads
      3. - Can't filter physical records at the entrypoint (might waste the buffer space). Must deserialize and filter them later before actual sending to a CdcConsumer
      4. - The consumer is triggered by a schedule - every 500ms by default.
      5. - Logic has some differences depending on the WAL settings (mmap true/false, FULL_SYNC) 
    2. Capturing in FileWriteAheadLogManager#log(WALRecord).
      1. + Capture logical records only
      2. + Common logic for all WAL settings  
      3. - Captures record in buffer in transactional threads - might affect performance
      4. - CDC process must sort events by WALPointer by self - maintain concurrent ordering data structure, and implementing waiting for closing WAL gaps before sending.
      5. - Send events before they actually flushed in local Ignite node - lead to inconsistency between main and stand-by clusters.

CdcWorker

CdcWorker is a thread responsible for collecting WAL records, transforming them to CdcEvents and submitting them to a CdcConsumer. The worker collects records in the queue.

Capturing from the buffer (wal-sync-thread):

  1. wal-sync-thread (the only reader of mmap WAL), under the lock that synchronizes preparing ReadSegment and rolling the WAL segment, to guarantee there are no changes in the underlying buffer.
  2. Offers a deep copy of flushing ReadSegments to the CdcWorker.
  3. CdcWorker checks remaining capacity and the buffer size:
    1. If the size fits the capacity then store the offered buffer data into the Queue. 
    2. Otherwise, stop online CDC.

Body loop (cdc-worker-thread):

  1. polls queue, transforms ReadSegment data to Iterator<CdcEvent>, pushes them to CdcConsumer.
  2. Optimization: transform segment buffers to CdcEvents in background (to reduce the buffer usage). CdcConsumer should be async then?
  3. Persists CdcConsumerState. Commit the progress on CdcConsumer#onEvents returns true.

Meta Storage

  1. Online CDC status - ON / OFF
  2. Committed pointer (confirmed by CdcConsumer).


CdcWorker
class CdcWorker {
	private final CdcConsumer consumer;
	
	private final long checkFreq;
	
	// Invoked in wal-sync-thread.
	public void offer(ReadSegment seg) {
		// Check capacity, adding segment to the queue.
	} 

	// online-cdc-thread
	public void body() {
		// Polling queue, push to CdcConsumer, writing CdcState to MetaStorage.
	}
}


Discussion Links

http://apache-ignite-developers.2346864.n4.nabble.com/DISCUSSION-IEP-59-CDC-Capture-Data-Change-tc49677.html

Reference Links

https://dev.mysql.com/doc/refman/8.0/en/mysqlbinlog.html

https://debezium.io/documentation/reference/1.2/architecture.html

https://jdbc.postgresql.org/documentation/head/replication.html

https://www.oracle.com/middleware/technologies/goldengate.html

https://docs.microsoft.com/ru-ru/sql/relational-databases/track-changes/track-data-changes-sql-server?view=sql-server-ver15

Tickets

key summary type created updated due assignee reporter priority status resolution

JQL and issue key arguments for this macro require at least one Jira application link to be configured

  • No labels