ID | IEP-59 |
Author | |
Sponsor | Anton Vinogradov |
Created | 14.10.2020 |
Status | DRAFT |
Many use-cases build on observation and processing changed records.
These use-cases include but not limited by
For now, such scenarios are hard to implement in Ignite.
The only solution that can help with it, for now, is a Continuous Query.
Disadvantages of the CQ in described scenarios:
The convenient solution should be:
Ignite CDC is a new utility that should be run on the server node host. CDC utility watches by the appearance of the WAL archive segments.
On the segment archiving, the utility iterates it using the existing WAL Iterator and notifications CDC Consumer of each record from the segment.
Public API:
/** Consumer of data change events. */ @IgniteExperimental public interface ChangeDataCaptureConsumer { /** * Starts the consumer. */ public void start(); /** * Handles entry changes events. * If this method return {@code true} then current offset will be stored * and ongoing notifications after CDC application fail/restart will be started from it. * * @param events Entry change events. * @return {@code True} if current offset should be saved on the disk * to continue from it in case any failures or restart. */ public boolean onEvents(Iterator<ChangeDataCaptureEvent> events); /** * Stops the consumer. * This methods can be invoked only after {@link #start()}. */ public void stop(); } /** * Event of single entry change. * Instance presents new value of modified entry. * * @see IgniteCDC * @see CaptureDataChangeConsumer */ @IgniteExperimental public interface ChangeDataCaptureEvent extends Serializable { /** * @return Key for the changed entry. */ public Object key(); /** * @return Value for the changed entry or {@code null} in case of entry removal. */ @Nullable public Object value(); /** * @return {@code True} if event fired on primary node for partition containing this entry. * @see <a href=" * https://ignite.apache.org/docs/latest/configuring-caches/configuring-backups#configuring-partition-backups"> * Configuring partition backups.</a> */ public boolean primary(); /** * Ignite split dataset into smaller chunks to distribute them across the cluster. * {@link ChangeDataCaptureConsumer} implementations can use {@link #partition()} to split changes processing * in the same way as it done for the cache. * * @return Partition number. * @see Affinity#partition(Object) * @see Affinity#partitions() * @see <a href="https://ignite.apache.org/docs/latest/data-modeling/data-partitioning">Data partitioning</a> * @see <a href="https://ignite.apache.org/docs/latest/data-modeling/affinity-collocation">Affinity collocation</a> */ public int partition(); /** * @return Version of the entry. */ public CacheEntryVersion version(); /** * @return Cache ID. * @see org.apache.ignite.internal.util.typedef.internal.CU#cacheId(String) * @see CacheView#cacheId() */ public int cacheId(); } /** * Entry event order. * Two concurrent updates of the same entry can be ordered based on {@link ChangeEventOrder} comparsion. * Greater value means that event occurs later. */ @IgniteExperimental public interface CacheEntryVersion extends Comparable<CacheEntryVersion>, Serializable { /** * Order of the update. Value is an incremental counter value. Scope of counter is node. * @return Version order. */ public long order(); /** @return Node order on which this version was assigned. */ public int nodeOrder(); /** * Cluster id is a value to distinguish updates in case user wants to aggregate and sort updates from several * Ignite clusters. {@code clusterId} id can be set for the node using * {@link GridCacheVersionManager#dataCenterId(byte)}. * * @return Cluster id. */ public byte clusterId(); /** @return Topology version plus number of seconds from the start time of the first grid node. */ public int topologyVersion(); /** * If source of the update is "local" cluster then {@code null} will be returned. * If updated comes from the other cluster using {@link IgniteInternalCache#putAllConflict(Map)} * then entry version for other cluster. * @return Replication version. * @see IgniteInternalCache#putAllConflict(Map) * @see IgniteInternalCache#removeAllConflict(Map) */ public CacheEntryVersion otherClusterVersion(); }
Risks and Assumptions
In case of using separate process for capturing data changes from WAL archives makes the lag between CDC event happens and consumer notified about it is relatively big. It's proposed to provide opportunity to capture data and notify consumers directly from Ignite process. It helps minimize the lag by cost of additional memory usage.
Enable OnlineCDC on cluster:
ACTIVE_READ_ONLY
mode.BACKUP
mode.ACTIVE
state.Note, that ignite-cdc.sh can be run in 2 modes - BACKUP
, ACTIVE
:
BACKUP
is used as backup process for OnlineCDC, and then such process may fetch CDC configuration from IgniteConfiguration. Case is async replication between master and stand-by clusters.ACTIVE
is used as independent process that doesn’t rely on OnlineCDC, has its own configuration. Case is filling a cold data lake.Ignite node restart after failure:
Stop OnlineCDC and use ignite-cdc instead:
Stop both CDC - Online and ignite-cdc:
Ignite
IgniteConfiguration#OnlineCdcConfiguration
- CdcConsumer, keepBinary.DataStorageConfiguration#onlineCdcBufSize
- by default (walSegments * walSegmentSize). it’s now 640 MB by default. DataRegionConfiguration#cdcMode
- BACKGROUND, ONLINE (default is BACKGROUND)BACKGROUND
- make hard links of archived segments into cdc directory, that is watched by the background ignite-cdc process.ONLINE
- OnlineCDC enabled + still do BACKGROUND mode job.ignite-cdc
CdcConfiguration#mode
- ACTIVE, BACKUP (default ACTIVE if OnlineCDC is not configured, and BACKUP otherwise)control.sh
Note, there is a confusion of using “segment” word:
DataStorageConfiguration#walSegmentSize
.ReadSegment
is a slice of the mmap WAL segment. It contains WAL records to sync with the actual file. Size of the segment differs from time to time and its maximum can be configured with DataStorageConfiguration#walBuffSize
.On Ignite start during memory restore (in the main thread):
DataRegionConfiguration#cdcMode
== ONLINE, then create CdcProcessor
.CdcProcessor
read from the Metastorage the last persisted CdcConsumerState
.CdcState#enabled
is false then skip initialization.CdcState == null
then initialize.GridCacheDatabaseSharedManager#performBinaryMemoryRestore
.Entrypoint for WALRecords to be captured by CDC. Options are:
FileWriteAheadLogManager#log(WALRecord).
First option is proposed to use.
CdcWorker is a thread responsible for collecting WAL records, transforming them to CdcEvents
and submitting them to a CdcConsumer
. The worker collects records in the queue.
Capturing from the buffer (wal-sync-thread):
Otherwise, stop online CDC:
CdcConsumerState
with (enabled=false, last send WALPointer)StopOnlineCdcRecord
into WAL (use the prepared CdcConsumerState).Also, it's possible to stop Online CDC using command in control.sh. In this case it also writes StopOnlineCdcRecord.
Body loop (cdc-worker-thread):
OnlineCdcRecord
record to WAL with the WALPointer.OnlineCdcRecord extends WALRecord { private WALPointer last; } StopOnlineCdcRecord extends WALRecord { private WALPointer last; }
OnlineCdcRecord
and StopOnlineCdcRecord
OnlineCdcRecord
- clears obsolete links from CDC directoryStopOnlineCdcRecord
- switch to ACTIVE mode, start capturing from the last WALPointer (from previous OnlineCdcRecord).class CdcWorker { private final CdcConsumer consumer; private final long checkFreq; // Invoked in wal-sync-thread. public void offer(ReadSegment seg) { // Check capacity, adding segment to the queue. } // online-cdc-thread public void body() { // Polling queue, push to CdcConsumer, writing CdcState to MetaStorage. } }
https://dev.mysql.com/doc/refman/8.0/en/mysqlbinlog.html
https://debezium.io/documentation/reference/1.2/architecture.html
https://jdbc.postgresql.org/documentation/head/replication.html
https://www.oracle.com/middleware/technologies/goldengate.html