Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleCDCConsumer.java
/** Consumer of data change events. */
@IgniteExperimental
public interface DataChangeListener<K, V>CaptureDataChangeConsumer {
    /**
     * Starts the consumer.
     *
     * @param configurationlog Ignite configurationLogger.
     */
    void start(IgniteConfiguration configuration, IgniteLogger log);

    /**
     * Handles entry changes events.
     * If this method return {@code true} then current offset will be stored and ongoing notifications after CDC application fail/restart
     * will be started from it.
     *
     * @param events Entry change events.
     * @return {@code True} if current offset should be commited saved on the disk to continue from it in case any failures or restart.
     */
    boolean onChange(Iterable<EntryEvent<K,Iterator<ChangeEvent> V>> events);

    /**
     * Stops the consumer.
     * This methods can be invoked only after {@link #start(IgniteConfiguration, IgniteLogger)}.
     */
    void stop();
}

/**
 * Event forof single entry change.
 *
 * @param <K> Key type Instance presents new value of modified entry.
 * @param <V> Value type.
 * @see IgniteCDC
 * @see CaptureDataChangeConsumer
 */
@IgniteExperimental
public interface EntryEvent<K, V>class ChangeEvent implements Serializable {
    /**
     * @return Key for the changed entry.
     */
    public KObject key(); { /* ... */ }

    /**
     * @return Value for the changed entry.
     */
    public VObject value(); { /* ... */ }

    /**
     * @return {@code True} if event fired on primary node for partition containing this entry.
     * @see <a href="https://ignite.apache.org/docs/latest/configuring-caches/configuring-backups#configuring-partition-backups">Configuring partition backups.</a>
     */
    public boolean primary();() { /* ... */ }

    /**
     * @return Partition number.
     */
    public int partition() { /* ... */ }

    /**
     * @return Order of the update operation.
     */
    public ChangeEventOrder order() { /* ... */ }

    /**
     * @return Operation type.
     */
    EntryEventTypepublic ChangeEventType operation(); { /* ... */ }

    /**
     * @return Cache ID.
     * @see org.apache.ignite.internal.util.typedef.internal.CU#cacheId(String)
     */
    long cacheId();public int cacheId() { /* ... */ }
}

@IgniteExperimental
public class ChangeEventOrder implements Comparable<ChangeEventOrder>, Serializable {
    /** @return topVer Topology version plus number of seconds from the start time of the first grid node. */
    public int topVer() { /* ... */ }

    /** @return nodeOrderDrId Node order and DR ID. */
    public int nodeOrderDrId() { /* ... */ }

    /** @return Expire time.
      DR id. */
    public byte dataCenterId() { /* ... */ }

    /** @return order Version order. */
    public long expireTime();
} order() { /* ... */ }

    /** @param replicaVer Replication version. */
    public void otherDcOrder(ChangeEventOrder replicaVer) { /* ... */ }

    /** @return Replication version. */
    public ChangeEventOrder otherDcOrder() { /* ... */ }
}

Risks and Assumptions

  • CDC utility will be started and automatically restarted in the case of failure by the OS or some external tools to provide stable change event processing.
  • CDC feature may be used for the deployment that has WAL only.
  • At the start of the CDC first consumed event will be the first event available in the WAL archive.
  • The lag between the record change and CDC consumer notification will depend on segment archiving timeout and requires additional configuration from the user.
  • CDC failover depends on the WAL archive segment count. If the CDC application will be down a relatively long time it possible that Ignite deletes certain archive segments,
    therefore consumer can't continue to receive changed records and must restart from the existing segments.

...