Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Nikolay Izhikov
IDIEP-59
Author
SponsorAnton Vinogradov
Created14.10.2020 
Status
Status
colourGrey
titleDRAFT


Table of Contents

Motivation

Many use-cases build on observation and processing changed records.

...

  • Independence from the server node process (JVM) - issues and failures of the consumer shouldn't lead to server node instability.
  • Notification guarantees and failover - i.e. track and save a pointer to the last consumed record. Continue notification from this pointer in case of restart.
  • Resilience for the consumer - it's not an issue when a consumer temporarily consumes slower than data appear.

Description:

draw.io Diagram
bordertrue
diagramNameCDC
simpleViewerfalse
width400
linksauto
tbstyletop
lboxtrue
diagramWidth391
revision34

IgniteCDC Ignite CDC is a new utility that should be run on the server node host. CDC utility watches by the appearance of the WAL archive segments.

On the segment archiving, the utility iterates it using the existing WALIterator WAL Iterator and notifications CDCConsumer CDC Consumer of each record from the segment.


Design choices:

  • CDC application works as a separate process.
  • CDC relies on the existing Ignite mechanism - WAL.
  • IEP Scope - deliver local data change events to a local consumer.
  • CDC keeps consumer offset in a special file.
    WAL process will start from this offset on restart.
  • To prevent interference between the WAL archive process and CDC Ignite will create a hard link to the newly created segment in a special folder.
    After success processing, CDC will delete this link.
    Note, data will be removed from the disk only after CDC and Ignite will remove the link to a segment from both corresponding folders.
  • To manage minimal event gap new configuration timeout introduced - WalForceArchiveTimeout.
  • Flag to distinguish DataEntry on primary and backup nodes introduced.
  • All public APIs market with @IgniteExperimental to be able to improve it based on real-world usage feedback.
  • CDC consumer will be notified about binary metadata changes (Phase 2).
  • Configuration parameter "Maximum CDC folder size" will be implemented to prevent disk volume exceed.
  • CDC folder resolved using the logic as Ignite node does.
  • CDC application should be restarted by the OS mechanism in case of any error (destination unavailability, for example)
  • Initially, single CDC consumer supported. Support of several concurrently running consumers will be implemented in Phase2.


CDCConsumer public Public API interface:



Code Block
languagejava
titleCDCConsumer.javaJava API
/** Consumer of data change events. */
@IgniteExperimental
public interface CaptureDataChangeConsumerChangeDataCaptureConsumer {
    /**
     * Starts the consumer.
     */
     * @param log Logger.
     */
    public void start(IgniteLogger log);

    /**
     * Handles entry changes events.
     * If this method return {@code true} then current offset will be stored
     * and ongoing notifications after CDC application fail/restart
     * will be started from it.
     *
     * @param events Entry change events.
     * @return {@code True} if current offset should be saved on the disk
     * to continue from it in case any failures or restart.
     */
    public boolean onChangeonEvents(Iterator<ChangeEvent>Iterator<ChangeDataCaptureEvent> events);

    /**
     * Stops the consumer.
     * This methods can be invoked only after {@link #start(IgniteLogger)}.
     */
    public void stop();
}

/**
 * Event of single entry change.
 * Instance presents new value of modified entry.
 *
 * @see IgniteCDC
 * @see CaptureDataChangeConsumer
 */
@IgniteExperimental
public interface ChangeEventChangeDataCaptureEvent extends Serializable {
    /**
     * @return Key for the changed entry.
     */
    public Object key();

    /**
     * @return Value for the changed entry or {@code null} in case of entry removal.
     */
    @Nullable public @Nullable Object value();

    /**
     * @return {@code True} if event fired on primary node for partition containing this entry.
     * @see <a href="
     * https://ignite.apache.org/docs/latest/configuring-caches/configuring-backups#configuring-partition-backups">Configuring>
     * Configuring partition backups.</a>
     */
    public boolean primary();

    /**
     * Ignite split dataset into smaller chunks to distribute them across the cluster.
     * {@link ChangeDataCaptureConsumer} implementations can use {@link #partition()} to split changes processing
     * in the same way as it done for the cache.
     *
     * @return Partition number.
     * @see Affinity#partition(Object)
     * @see Affinity#partitions()
     * @see <a href="https://ignite.apache.org/docs/latest/data-modeling/data-partitioning">Data partitioning</a>
     * @see <a href="https://ignite.apache.org/docs/latest/data-modeling/affinity-collocation">Affinity collocation</a>
     */
    public int partition();

    /**
     * @return OrderVersion of the update operationentry.
     */
    public ChangeEventOrderCacheEntryVersion orderversion();


    /**
     * @return Cache ID.
     * @see org.apache.ignite.internal.util.typedef.internal.CU#cacheId(String)
     * @see CacheView#cacheId()
     */
    public int cacheId();
}

/**
 * Entry event order.
 * Two concurrent updates of the same entry can be ordered based on {@link ChangeEventOrder} comparsion.
 * Greater value means that event occurs later.
 */
@IgniteExperimental
public interface ChangeEventOrderCacheEntryVersion extends Comparable<ChangeEventOrder>Comparable<CacheEntryVersion>, Serializable {
    /**
 @return topVer Topology version plus* numberOrder of seconds from the start time the update. Value is an incremental counter value. Scope of thecounter firstis grid node.
     * @return Version order.
     */
    public intlong topVerorder();

    /** @return nodeOrderDrId Node order and DR ID on which this version was assigned. */
    public int nodeOrderDrIdnodeOrder();

    /**
   @return  Data* center id. */Cluster id is a value to distinguish updates in case user wants to aggregate and sort updates from several
     * Ignite clusters. {@code clusterId} id can be set for the node using
    public * byte{@link dataCenterIdGridCacheVersionManager#dataCenterId(byte);
}.
     *
    /* * @return order Version order. Cluster id.
     */
    public longbyte orderclusterId();

    /** @return ReplicationTopology version plus number of seconds from the start time of the first grid node. */
    public ChangeEventOrderint otherDcOrdertopologyVersion();
}

    /**
     * If Typesource of operations the update is "local" cluster then {@code null} will be returned.
 *
 * @see IgniteCDC
 * @seeIf CaptureDataChangeConsumer
updated *comes @seefrom ChangeEvent
 */
@IgniteExperimental
public enum ChangeEventType {the other cluster using {@link IgniteInternalCache#putAllConflict(Map)}
    /** Update operation. */
    UPDATE,

 * then entry version for other cluster.
     * @return Replication version.
     * @see IgniteInternalCache#putAllConflict(Map)
     /** Delete operation.* @see IgniteInternalCache#removeAllConflict(Map)
     */
     DELETEpublic CacheEntryVersion otherClusterVersion();
}

Risks and Assumptions

  • CDC utility will be started and automatically restarted in the case of failure by the OS or some external tools to provide stable change event processing.
  • CDC feature may be used for the deployment that has WAL only.
  • At the start of the CDC first consumed event will be the first event available in the WAL archive.
  • The lag between the record change and CDC consumer notification will depend on segment archiving timeout and requires additional configuration from the user.
  • CDC failover depends on the WAL archive segment count. If the CDC application will be down a relatively long time it possible that Ignite deletes certain archive segments,
    therefore consumer can't continue to receive changed records and must restart from the existing segments.

Discussion Links

http://apache-ignite-developers.2346864.n4.nabble.com/DISCUSSION-IEP-59-CDC-Capture-Data-Change-tc49677.html

Reference Links

https://dev.mysql.com/doc/refman/8.0/en/mysqlbinlog.html

...

https://docs.microsoft.com/ru-ru/sql/relational-databases/track-changes/track-data-changes-sql-server?view=sql-server-ver15

Tickets

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
maximumIssues20
jqlQuerylabels = IEP-59 ORDER BY status ASC
serverId5aa69414-a9e9-3523-82ec-879b028fb15b