...
ID | IEP-59 | ||||||||
Author | |||||||||
Sponsor | Nikolay IzhikovAnton Vinogradov | ||||||||
Created | 14.10.2020 | ||||||||
Status |
|
Table of Contents |
---|
Many use-cases build on observation and processing changed records.
...
...
draw.io Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
IgniteCDC Ignite CDC is a new utility that should be run on the server node host. CDC utility watches by the appearance of the WAL archive segments.
On the segment archiving, the utility iterates it using the existing WALIterator and notificates CDCConsumer of each record from the segment.WAL Iterator and notifications CDC Consumer of each record from the segment.
Public APICDCConsumer public API interface:
Code Block | ||||
---|---|---|---|---|
| ||||
/** Consumer of data change events. */ @IgniteExperimental public interface DataChangeListener<K, V> { /** * @return Consumer ID. */ChangeDataCaptureConsumer { String id(); /** * Starts the consumer. */ * @param configuration Ignite configuration. */ void start(IgniteConfiguration configuration, IgniteLogger log); /** * @return {@code True} if entry key and value should be keeped in binary format. */ boolean keepBinarypublic void start(); /** * Handles entry changes events. * If this method return {@code true} then current offset will be stored * and ongoing notifications after CDC application fail/restart * will be started from it. * * @param events Entry change events. * @return {@code True} if current offset should be commited saved on the disk * to continue from it in case any failures or restart. */ public boolean onChange(Iterable<EntryEvent<K, V>>onEvents(Iterator<ChangeDataCaptureEvent> events); /** * Stops the consumer. * This methods can be invoked only after {@link #start(IgniteConfiguration, IgniteLogger)}. */ public void stop(); } /** * Event forof single entry change. * * @param <K> Key type Instance presents new value of modified entry. * @param <V> Value type. * @see IgniteCDC * @see CaptureDataChangeConsumer */ @IgniteExperimental public interface ChangeDataCaptureEvent EntryEvent<K,extends V>Serializable { /** * @return Key for the changed entry. */ public KObject key(); /** * @return Value for the changed entry or {@code null} in case of entry removal. */ @Nullable public VObject value(); /** * @return {@code True} if event fired on primary node for partition containing this entry. * @see <a href=" * https://ignite.apache.org/docs/latest/configuring-caches/configuring-backups#configuring-partition-backups">Configuring> * Configuring partition backups.</a> */ public boolean primary(); /** * @return Operation typeIgnite split dataset into smaller chunks to distribute them across the cluster. */ EntryEventType operation(); {@link ChangeDataCaptureConsumer} implementations can use {@link #partition()} to split changes processing * in the same way as it done for the cache. /* * * @return CachePartition ID.number. * @see Affinity#partition(Object) * @see org.apache.ignite.internal.util.typedef.internal.CU#cacheId(String)Affinity#partitions() * @see <a href="https://ignite.apache.org/docs/latest/data-modeling/data-partitioning">Data partitioning</a> * @see <a href="https://ignite.apache.org/docs/latest/data-modeling/affinity-collocation">Affinity collocation</a> */ public int partition(); /** * @return Version of the entry. */ public longCacheEntryVersion cacheIdversion(); /** * @return Expire time. Cache ID. * @see org.apache.ignite.internal.util.typedef.internal.CU#cacheId(String) * @see CacheView#cacheId() */ public longint expireTimecacheId(); } /** * Event for single entry change. * * @param <K> Key type Entry event order. * Two concurrent updates of the same entry can be ordered based on {@link ChangeEventOrder} comparsion. * @param <V> Value typeGreater value means that event occurs later. */ @IgniteExperimental public interface CacheEntryVersion EntryEvent<Kextends Comparable<CacheEntryVersion>, V>Serializable { /** * @return Key for the changed entry Order of the update. Value is an incremental counter value. Scope of counter is node. * @return Version order. */ public Klong keyorder(); /** @return Node order on *which @returnthis Valueversion for the changed entry. was assigned. */ public Vint valuenodeOrder(); /** * Cluster id is a value to distinguish updates in case user wants to aggregate and sort updates from several * @returnIgnite clusters. {@code TrueclusterId} id ifcan eventbe firedset onfor primarythe node forusing partition containing* this entry{@link GridCacheVersionManager#dataCenterId(byte)}. * @see <a href="https://ignite.apache.org/docs/latest/configuring-caches/configuring-backups#configuring-partition-backups">Configuring partition backups.</a>* @return Cluster id. */ public booleanbyte primaryclusterId(); /** * @return Operation type. @return Topology version plus number of seconds from the start time of the first grid node. */ public EntryEventTypeint operationtopologyVersion(); /** * @return Cache ID If source of the update is "local" cluster then {@code null} will be returned. * @see org.apache.ignite.internal.util.typedef.internal.CU#cacheId(String) If updated comes from the other cluster using {@link IgniteInternalCache#putAllConflict(Map)} */ then entry version for other cluster. * long cacheId(); @return Replication version. /*** @see IgniteInternalCache#putAllConflict(Map) * @return Expire time.@see IgniteInternalCache#removeAllConflict(Map) */ public longCacheEntryVersion expireTimeotherClusterVersion(); } |
Risks and Assumptions
// Links to discussions on the devlist, if applicable.http://apache-ignite-developers.2346864.n4.nabble.com/DISCUSSION-IEP-59-CDC-Capture-Data-Change-tc49677.html
https://dev.mysql.com/doc/refman/8.0/en/mysqlbinlog.html
...
Jira | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|
|