...
ID | IEP-59 | ||||||||
Author | |||||||||
Sponsor | Nikolay IzhikovAnton Vinogradov | ||||||||
Created | 14.10.2020 | ||||||||
Status |
|
Table of Contents |
---|
Many use-cases build on observation and processing changed records.
...
...
draw.io Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
IgniteCDC Ignite CDC is a new utility that should be run on the server node host. CDC utility watches by the appearance of the WAL archive segments.
On the segment archiving, the utility iterates it using the existing WALIterator and notificates CDCConsumer of each record from the segment.WAL Iterator and notifications CDC Consumer of each record from the segment.
Public APICDCConsumer public API interface:
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** Consumer of data WALchange recordsevents. */ @IgniteExperimental public interface CDCConsumerChangeDataCaptureConsumer { /** * @returnStarts Consumerthe IDconsumer. */ public Stringvoid idstart(); /** * Handles entry changes events. * InitializeIf this consumer method return {@code true} then current offset will be stored * and ongoing notifications after CDC application fail/restart will be started from it. * * @param configuration Ignite configuration events Entry change events. * @return {@code True} if current offset should be saved on the disk * to continue from it in case any failures or restart. */ public boolean onEvents(Iterator<ChangeDataCaptureEvent> events); /** * Stops the consumer. * This methods can be invoked only after {@link #start()}. */ public void start(IgniteConfiguration configuration, IgniteLogger logstop(); } /** * Event of single entry change. * Instance presents new value of modified entry. * * @see IgniteCDC * @see CaptureDataChangeConsumer */ @IgniteExperimental public interface ChangeDataCaptureEvent extends Serializable { /** * @return Key for the changed entry. */ public Object key(); /** * @param record WAL record@return Value for the changed entry or {@code null} in case of entry removal. * @param <T> Record type. / @Nullable public Object value(); /** * @return {@code True} if state of the consumption should be saved. */ <T extends WALRecord> booolean onRecord(T record event fired on primary node for partition containing this entry. * @see <a href=" * https://ignite.apache.org/docs/latest/configuring-caches/configuring-backups#configuring-partition-backups"> * Configuring partition backups.</a> */ public boolean primary(); /** * Ignite split dataset into smaller chunks to distribute them across the cluster. * {@link ChangeDataCaptureConsumer} implementations can use {@link #partition()} to split changes processing * in the same way as it done for the cache. * * @return Partition number. * @see Affinity#partition(Object) * @see Affinity#partitions() * @see <a href="https://ignite.apache.org/docs/latest/data-modeling/data-partitioning">Data partitioning</a> * @see <a href="https://ignite.apache.org/docs/latest/data-modeling/affinity-collocation">Affinity collocation</a> */ public int partition(); /** * @return Version of the entry. */ public CacheEntryVersion version(); /** * @return Cache ID. * @see org.apache.ignite.internal.util.typedef.internal.CU#cacheId(String) * @see CacheView#cacheId() */ public int cacheId(); } /** * Entry event order. * Two concurrent updates of the same entry can be ordered based on {@link ChangeEventOrder} comparsion. * Greater value means that event occurs later. */ @IgniteExperimental public interface CacheEntryVersion extends Comparable<CacheEntryVersion>, Serializable { /** * Order of the update. Value is an incremental counter value. Scope of counter is node. * @return Version order. */ public long order(); /** @return Node order on which this version was assigned. */ public int nodeOrder(); /** * Stops this consumer. Cluster id is a value to distinguish updates in case user wants to aggregate and sort updates from several * This methods Ignite clusters. {@code clusterId} id can be invoked only after {@link #start(IgniteConfiguration, IgniteLogger)}. set for the node using * {@link GridCacheVersionManager#dataCenterId(byte)}. * * @return Cluster id. */ public byte clusterId(); /** @return Topology version plus number of seconds from the start time of the first grid node. */ public int topologyVersion(); /** * If source of the update is "local" cluster then {@code null} will be returned. * If updated comes from the other cluster using {@link IgniteInternalCache#putAllConflict(Map)} * then entry version for other cluster. * @return Replication version. * @see IgniteInternalCache#putAllConflict(Map) * @see IgniteInternalCache#removeAllConflict(Map) */ public voidCacheEntryVersion stopotherClusterVersion(); } |
Risks and Assumptions
// Links to discussions on the devlist, if applicable.http://apache-ignite-developers.2346864.n4.nabble.com/DISCUSSION-IEP-59-CDC-Capture-Data-Change-tc49677.html
https://dev.mysql.com/doc/refman/8.0/en/mysqlbinlog.html
...
Jira | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|
|