Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • CDC application works as a separate process.
  • CDC relies on the existing Ignite mechanism - WAL.
  • IEP Scope - deliver local data change events to a local consumer.
  • CDC keeps consumer offset in a special file.
    WAL process will start from this offset on restart.
  • To prevent interference between the WAL archive process and CDC Ignite will create a hard link to the newly created segment in a special folder.
    After success processing, CDC will delete this link.
    Note, data will be removed from the disk only after CDC and Ignite will remove the link to a segment from both corresponding folders.
  • To manage minimal event gap new configuration timeout introduced - WalForceArchiveTimeout.
  • Flag to distinguish DataEntry on primary and backup nodes introduced.
  • All public APIs market with @IgniteExperimental to be able to improve it based on real-world usage feedback.
  • CDC consumer will be notified about binary metadata changes (Phase 2).
  • Configuration parameter "Maximum CDC folder size" will be implemented to prevent disk volume exceed.
  • CDC folder resolved using the logic as Ignite node does.
  • CDC application should be restarted by the OS mechanism in case of any error (destination unavailability, for example)
  • Initially, single CDC consumer supported. Support of several concurrently running consumers will be implemented in Phase2.


CDCConsumer public Public API interface:



Code Block
languagejava
titleCDCConsumer.javaJava API
/** Consumer of data change events. */
@IgniteExperimental
public interface ChangeDataCaptureConsumer {
    /**
     * Starts the consumer.
     */
    public void start();

    /**
     * Handles entry changes events.
     * If this method return {@code true} then current offset will be stored
     * and ongoing notifications after CDC application fail/restart will be started from it.
     *
     * @param events Entry change events.
     * @return {@code True} if current offset should be saved on the disk
     * to continue from it in case any failures or restart.
     */
    public boolean onEvents(Iterator<ChangeDataCaptureEvent> events);

    /**
     * Stops the consumer.
     * This methods can be invoked only after {@link #start()}.
     */
    public void stop();
}

/**
 * Event of single entry change.
 * Instance presents new value of modified entry.
 *
 * @see IgniteCDC
 * @see CaptureDataChangeConsumer
 */
@IgniteExperimental
public interface ChangeDataCaptureEvent extends Serializable {
    /**
     * @return Key for the changed entry.
     */
    public Object key();

    /**
     * @return Value for the changed entry or {@code null} in case of entry removal.
     */
    @Nullable public Object value();

    /**
     * @return {@code True} if event fired on primary node for partition containing this entry.
     * @see <a href="
     * https://ignite.apache.org/docs/latest/configuring-caches/configuring-backups#configuring-partition-backups">
     * Configuring partition backups.</a>
     */
    public boolean primary();

    /**
     * Ignite split dataset into smaller chunks to distribute them across the cluster.
     * {@link ChangeDataCaptureConsumer} implementations can use {@link #partition()} to split changes processing
     * in the same way as it done for the cache.
     *
     * @return Partition number.
     * @see Affinity#partition(Object)
     * @see Affinity#partitions()
     * @see <a href="https://ignite.apache.org/docs/latest/data-modeling/data-partitioning">Data partitioning</a>
     * @see <a href="https://ignite.apache.org/docs/latest/data-modeling/affinity-collocation">Affinity collocation</a>
     */
    public int partition();

    /**
     * @return Version of the entry.
     */
    public CacheEntryVersion version();

    /**
     * @return Cache ID.
     * @see org.apache.ignite.internal.util.typedef.internal.CU#cacheId(String)
     * @see CacheView#cacheId()
     */
    public int cacheId();
}

/**
 * Entry event order.
 * Two concurrent updates of the same entry can be ordered based on {@link ChangeEventOrder} comparsion.
 * Greater value means that event occurs later.
 */
@IgniteExperimental
public interface CacheEntryVersion extends Comparable<CacheEntryVersion>, Serializable {
    /**
     * Order of the update. Value is an incremental counter value. Scope of counter is node.
     * @return Version order.
     */
    public long order();

    /** @return Node order on which this version was assigned. */
    public int nodeOrder();

    /**
     * Cluster id is a value to distinguish updates in case user wants to aggregate and sort updates from several
     * Ignite clusters. {@code clusterId} id can be set for the node using
     * {@link GridCacheVersionManager#dataCenterId(byte)}.
     *
     * @return Cluster id.
     */
    public byte clusterId();

    /** @return Topology version plus number of seconds from the start time of the first grid node. */
    public int topologyVersion();

    /**
     * If source of the update is "local" cluster then {@code null} will be returned.
     * If updated comes from the other cluster using {@link IgniteInternalCache#putAllConflict(Map)}
     * then entry version for other cluster.
     * @return Replication version.
     * @see IgniteInternalCache#putAllConflict(Map)
     * @see IgniteInternalCache#removeAllConflict(Map)
     */
    public CacheEntryVersion otherClusterVersion();
}

...