Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The Apache Ignite can support cache rebalancing as transferring partition files using zero copy algorithm [1] based on an extension of communication SPI and Java NIO API. When the partition file has been transferred to the demander node there are two a few possible approaches can be implemented to preload entries from particular partition file.

Hot swap cache data storage

The demander node will initialize a preloaded partition file as a new PageStore, make this storage up-to-date by applying previously saved async cache operations and then hot swap it under the checkpoint write lock.

Disadvantages:

Demander node first under checkpoint write lock must swap cache data storage with the temporary one to perform recovery operations under original cache data storage. After partition file has been received from the Supplier node there are to possible cases to make this partition file up-to-date.

Disadvantages:

  • A A long and complex index reduild procedure that requires the development of additional crash recovery guarantees.
    The index rebuild procedure starts It will start immediately when the partition file is fully received from the supplier node. If the node crashes in the middle of the rebuilding index process it will have an inconsistent index state at the further node startup. To avoid this a new index-undo WAL record must be logged within rebuilding and used on node start to remove previously added index records.

Preload entries from loaded partition file

The demander node will use a preloaded patition file as a new source of cache data entries to load.

Disadvantages:

  • The approach will require a new temporary FilePageStore to be initialized. It must be created as a part of the temporary cache group or in the separate temporary data region to provide reusing machinery of iteration over the full partition file.

Proposed Changes (Hot swap)

Process Overview

In the process of balancing data:

  • demaner (receiver of partition files)
  • supplier (sender of partition files).

The whole process is described in terms of rebalance single cache group and partition files would be rebalanced one-by-one:

  1. The demander node prepares the set of IgniteDhtDemandedPartitionsMap#full cache partitions to fetch;
  2. The demander node checks compatibility version (for example, 2.8) and starts recording all incoming cache updates to the new special storage – the temporary WAL storage;
  3. The demander node sends the GridDhtPartitionDemandMessage to the supplier node as usual;
  4. The supplier node receives GridDhtPartitionDemandMessage and starts the new checkpoint process and fixes cache partition file sizes;
  5. The supplier node creates an empty temporary file with .delta (e.g. part-0.bin.delta file) postfix for each cache partition file (in the same cache working directory or another configured);
  6. The supplier node starts tracking each pageId write attempt to these partition files
    1. When the write attempt happens, the thread that caused it reads the original copy of this page from the partition and flushes it to the corresponding .delta file;
    2. After it the thread writes the changed page data to the partition file;
  7. The supplier waits the checkpoint process ends;
  8. On the supplier for each cache partition file
    1. The process opens the partition file in read-only mode;
    2. Starts sending partition file (as it is with any concurrent writes) by chunks of predefined constant size (multiple of PageMemory size);
    3. After the partition file sent it starts sending corresponding .delta file;
  9. The demander node starts to listen to new type of incoming connections (a socket channel created event) from the supplier node;
  10. When the appropriate connection established the demander node for each cache partition file
    1. Receives file metadata information (corresponding cache group identifier, cache partition file name, file size)
    2. Writes data from the socket to the particular cache partition file from the beginning of the file
    3. After the original cache partition file received the node starts receiving corresponding .delta file
    4. The node reads data from the socket by chunks of PageMemory size and applies each received pageId to the partition file
  11. When the demander node receives the whole cache partition file
    1. The node begins the rebuild secondary indexes procedure over received partition file
    2. After it the thread begins to apply for data entries from the beginning of WAL-temporary storage;
    3. All async operations corresponding to this partition file still write to the end of temporary WAL;
    4. At the moment of WAL-temporary storage is ready to be empty
      1. Start the first checkpoint;
      2. Wait for the first checkpoint ends and own the cache partition;
      3. All operations now are switched to the partition file instead of writing to the temporary WAL;
      4. Schedule the temporary WAL storage deletion;
  12. The supplier node deletes all temporary files;

Components

Historical rebalance

After partition is received the historical rebalance must be initiated to load other cache updates.

Catch-up temporary WAL

The swapped temporary storage will log all the cache updates to the temporary WAL storage (per each partition) for further applying them to the corresponding partition file. While the Demander is being receive partition files it must save sequentially all cache entries corresponding to the MOVING partition into a new temporary storage. These entries will be applied later one by one on the newly received cache partition file. All asynchronous operations will be enrolled to the end of temporary storage during storage reads until it becomes fully read. The file-based FIFO approach assumes to be used by this process.

The temporary storage is chosen to be WAL-based. The storage must support to:

  • Unlimited number of WAL-files to store temporary data records;
  • Iterating over stored data records during an asynchronous writer thread insert new records;
  • WAL-per-partiton approach is need to be used;
  • Write operations to storage must have higher priority over read operations;

Expected problems to be solved

  • We must stop updating indexes on demander when the data is ready to be transferred from the supplier node. All async cache updates on demander must not cause the index update;
  • The previous partition metadata page and all stored meta information must be destroyed in PageMemory and restored from the new partition file;

Preload entries from loaded partition file

The demander node will use a preloaded patition file as a new source of cache data entries to load.

Disadvantages:

  • The approach will require a new temporary FilePageStore to be initialized. It must be created as a part of the temporary cache group or in the separate temporary data region to provide reusing machinery of iteration over the full partition file.

Proposed Changes (Hot swap with historical rebalance)

Process Overview

In the process of balancing data:

  • Demander (receiver of partition files)
  • Supplier (sender of partition files)

The whole process is described in terms of rebalancing a single partition file of a cache group. All the other partitions would be rebalanced one-by-one.

  1. NODE_JOIN event occurrs and the blocking PME starts;
    1. The Demander decides which partitions must be loaded. All the desired partitions have MOVING state;
    2. The Demander initiates a new checkpoint process;
      1. Under the checkpoint write-lock it swaps cache data storage with the temporary one for each partition of the given set;
      2. The temporary cache data storage tracks partition counter number as ususal (on each cache operations);
      3. Wait for the checkpoint begin future ends;
  2. The Demander sends a request to the Supplier with the previously prepares set of cache groups and partition files;
  3. The Supplier receives a request and starts a new local checkpoint process;
    1. Creates a temporary file with .delta postfix (for each partition file e.g. part-0.bin.delta);
    2. Under checkpoint write lock fixes the partition expected file size (at the moment of the checkpoint end);
    3. Wait for the checkpoint begin future ends;
    4. Starts the copy process of the partition file to the Demander;
      1. Opens the partition file in read-only mode;
      2. Starts sending partition file (with any concurrent writes) by chunks of predefined size;
    5. Asynchronouosly writes each page to the partition file and the same page to the corresponding file with .delta postfix;
    6. When the partition file sent it starts sending corresponding .delta file;
  4. The Demander listens new file sending attempts from the Supplier;
  5. The Demander receives partition file (for each partition file one by one);
  6. The Demander reads corresponding partition .delta file by chunks and applies them on the received partiton file;
  7. When the Demander receives the whole cache partition file;
    1. Swap the temporary cache data storage with the original one on the next checkpoint (under write lock); 
    2. When the partition has been swapped it starts the rebuild indexes procedure over given partition files;
    3. Starts historical rebalance for the given partition file;
  8. The Supplier deletes all temporary files;

Components

In terms of a high-level abstraction, Apache Ignite must support the features described below.

File transfer between nodes

Partition preloader must support The node partition preloader machinery download cache partition file relocation files from one cluster node to another cluster nodes which owns desired partitions (the zero copy algorithm [1] assume to be used by default). To achieve this, the file transfer machinery transmission process must be implemented at Apache Ignite over Communication SPI.

CommunicationSpi

Ignite's IThe Comminication SPI must support to:

  • establishing opening channel connections to the a remote node to an arbitrary topic (GridTopic is used) with predefined processing policyinitial meta information;
  • listening incoming channel creation events and registering connection handlers on the particular nodeconnections and handling them by registered handlers;
  • an arbitrary set of channel parameters on connection handshake (some initial Message assumed to be used);
API
Code Block
languagejava
themeConfluence
titleChannelCommunicationListenerEx.java
collapsetrue
/**
 *
 */
public interface ChannelCommunicationListenerEx<T extends CloseableSerializable> extends EventListener {
    /**
     * @return@param ConnectionnodeId idRemote tonode remote nodeid.
     */
 @param initMsg Init publicchannel ChannelId id();


message.
     /**
 @param channel   * @return TheLocally created channel's configurationendpoint.
     */
    public ChannelConfigvoid config();

    /**
     * @return A map of all channel attributes.
     */
    public Map<String, Serializable> attrs();

    /**
     * @param name The name to get attribute.
     * @param <T> The attribute type.
     * @return The corresponding channel attribute instance.
     */
    public <T extends Serializable> T attr(onChannelOpened(UUID nodeId, Message initMsg, Channel channel);
}

GridIoManager

IO manager must support to:

  • different approaches of incoming data handling: CHUNK (read channel into ByteBuffer), FILE (zero-copy approach)
  • send and receive data by chunks of predefined size with storing intermediate results;
  • reestablishing connection between nodes if an error occurs and continue file sending\receiving;
  • limiting connection bandwidth at runtime;
API
Code Block
languagejava
themeConfluence
titleTransmissionHandler.java
collapsetrue
public interface TransmissionHandler {
   String name);

    /**
     * @param nameerr The err nameof tofail gethandling attributeprocess.
     * @param obj The chanel attribute instance.
  /
   * @param <T> The attribute type.
     */
    public <T extends Serializable> void attronException(StringUUID namenodeId, TThrowable objerr);

    /**
     * @return@param <tt>true</tt>nodeId ifRemote thenode channelid isfrom configuredwhich andrequest readyhas tobeen usereceived.
     */
 @param fileMeta File public boolean active();
meta info.
    /**
     * Make@return theAbsolute channelpathname readydenoting fora usefile.
     */
    public voidString activate(filePath(UUID nodeId, TransmissionMeta fileMeta);
}
Code Block
languagejava
themeConfluence
titleChannelConfig.java
collapsetrue


    /**
 *
 */
public interface ChannelConfig {
* <em>Chunk handler</em> represents /**
by itself the way of *input Getsdata the channel's modestream processing.
     */
 It accepts  public boolean blocking();

    /**within each chunk a {@link ByteBuffer} with data from input for further processing.
     *
 @param blocking The channel's mode* to@param set.
nodeId Remote node id from *which @returnrequest Thehas configbeen instancereceived.
     */
 @param initMeta Initial publichandler ChannelConfig blocking(boolean blocking);

meta info.
     /**
 @return Instance of chunk handler *to Getsprocess theincoming timeoutdata optionby optionchunks.
     */
    public intConsumer<ByteBuffer> timeout(chunkHandler(UUID nodeId, TransmissionMeta initMeta);

    /**
     * @param<em>File millishandler</em> Therepresents timeoutby initself milliseconds.
the way of input data * @return The config instance.stream processing. All the data will
     */
 be processed under publicthe ChannelConfighood timeout(int millis);
}
Code Block
languagejava
themeConfluence
titleCommunicationSpi.java
collapsetrue
public interface CommunicationSpi<T extends Serializable> extends IgniteSpi {
using zero-copy transferring algorithm and only start file processing and
     /**
 the end of processing *will @parambe remoteprovided.
 Destination cluster node to communicate with.*
     * @param attrsnodeId ConfigurationRemote channelnode attributes.
id from which request has * @throws IgniteSpiException If failsbeen received.
     */
 @param initMeta Initial publichandler defaultmeta Channel channel(ClusterNode remote, Map<String, Serializable> attrs) throws IgniteSpiException {
        throw new UnsupportedOperationException();
    }
}info.
     * @return Intance of read handler to process incoming data like the {@link FileChannel} manner.
     */
    public Consumer<File> fileHandler(UUID nodeId, TransmissionMeta initMeta);
}


Code Block
languagejava
titleGridIoManager.TransmissionSender
Code Block
languagejava
themeConfluence
titleCommunicationListener.java
collapsetrue
public interfaceclass CommunicationListener<TTransmissionSender extends Serializable> extends EventListenerimplements Closeable {
    /**
     * @param chfile ASource channel instancefile to processsend configureto requestremote.
     * @param @returnparams Additional attributestransfer tofile send to remote node.description keys.
     */
 @param plc The publicpolicy defaultof Map<String,handling Serializable>data onChannelConfigure(Channel ch) {on remote.
     * @throws IgniteCheckedException returnIf null;fails.
     }*/

    public void /**send(
     * @param nodeId Remote node id.File file,
     * @param ch LocallyMap<String, created channel endpoint.Serializable> params,
     */
   TransmissionPolicy publicplc
   default void) onChannelCreated(UUIDthrows nodeIdIgniteCheckedException, ChannelInterruptedException, ch)IOException {
         // No-op.send(file, 0, file.length(), params, plc);
    }
}

FileTransmitProcessor

The file transmission manager must support to:

  • using different approaches of incoming data handling – buffered and direct (zero-copy approach of FileChannel#transferTo);
  • transferring data by chunks of predefined size with saving intermediate results;
  • re-establishing connection if an error occurs and continue file upload\download;
  • limiting connection bandwidth (upload and download) at runtime;
API
Code Block
languagejava
themeConfluence
titleFileWriter.java
collapsetrue
/**
 *
 */
public interface FileWriter extends Closeable {
    /**
     * @param file The source file to send at.
     * @param offset The position to start

    /**
     * @param file Source file to send to remote.
     * @param plc The policy of handling data on remote.
     * @throws IgniteCheckedException If fails.
     */
    public void send(
        File file,
        TransmissionPolicy plc
    ) throws IgniteCheckedException, InterruptedException, IOException {
        send(file, 0, file.length(), new HashMap<>(), plc);
    }

    /**
     * @param file Source file to send to remote.
     * @param offset Position to start trasfer at.
     * @param countcnt The numberNumber of bytes to transfer.
     * @param params The additionalAdditional transfer file description keys.
     * @param plc The policy of handling data on remote.
     * @throws IgniteCheckedException If fails.
     */
    public void writesend(
        File file,
        long offset,
        long countcnt,
        Map<String, Serializable> params,
        ReadPolicyTransmissionPolicy plc
    ) throws IgniteCheckedException;
}
Code Block
languagejava
themeConfluence
titleTransmitSession.java
collapsetrue
/**
 *
 */
public interface TransmitSession {, InterruptedException, IOException {
		// Impl.
    /**
     * @param nodeId The remote node id connected from.
     * @param sessionId The unique session id.
     */
    public void begin(UUID nodeId, String sessionId);

    /**
     * @return The instance of read handler to process incoming data by chunks.
     */
    public ChunkHandler chunkHandler();

    /**
     * @return The intance of read handler to process incoming data like the {@link FileChannel} manner.
     */
    public FileHandler fileHandler();

    /**
     * The end of session transmission process.
     */
    public void end();

    /**
     * @param cause The cause of fail handling process.
     */
    public void onException(Throwable cause);
}
Code Block
languagejava
themeConfluence
titleFileHandler.java
collapsetrue
/**
 *
 */
public interface FileHandler {
    /**
     * @param name The file name transfer from.
     * @param position The start position pointer of download file in original source.
     * @param count Total count of bytes readed from the original source.
     * @param params The additional transfer file description params.
     * @return The absolute pathname string denoting the file or {@code null} if there is no sense.
     * @throws IOException If fails.
     */
    public String begin(String name, long position, long count, Map<String, Serializable> params) throws IOException;

    /**
     * @param file The file with fully downloaded data into.
     * @param params The additional transfer file description params.
     */
    public void end(File file, Map<String, Serializable> params);
}
Code Block
languagejava
themeConfluence
titleChunkHandler.java
collapsetrue
/**
 *
 */
public interface ChunkHandler {
    /**
     * @param name The file name on remote.
     * @param position The start position pointer of downloading file in original source.
     * @param count Total count of bytes to read from the original source.
     * @param params The additional transfer file description params.
     * @return The size of of {@link ByteBuffer} to read the input channel into.
     * @throws IOException If fails.
     */
    public int begin(String name, long position, long count, Map<String, Serializable> params) throws IOException;

    /**
     * @param buff The data filled buffer.
     * @return {@code true} if the chunk of data have been successfully accepted.
     * @throws IOException If fails.
     */
    public boolean chunk(ByteBuffer buff) throws IOException;

    /**
     * @param params The additional handling channel description params.
     */
    public void end(Map<String, Serializable> params);
}

Copy partition on the fly

Checkpointer

When the supplier node receives the cache partition file demand request it will send the file over the CommunicationSpi. The cache partition file can be concurrently updated by checkpoint thread during its transmission. To guarantee the file consistency Сheckpointer must use Copy-on-Write [3] tehnique and save a copy of updated chunk into the temporary file.

Apply partition on the fly

Catch-up temporary WAL

While the demander node is in the partition file transmission state it must save sequentially all cache entries corresponding to the MOVING partition into a new temporary storage. These entries will be applied later one by one on the newly received cache partition file. All asynchronous operations will be enrolled to the end of temporary storage during storage reads until it becomes fully read. The file-based FIFO approach assumes to be used by this process.

The temporary storage is chosen to be WAL-based. The storage must support to:

  • Unlimited number of WAL-files to store temporary data records;
  • Iterating over stored data records during an asynchronous writer thread insert new records;
  • WAL-per-partiton approach is need to be used;
  • Write operations to storage must have higher priority over read operations;

Expected problems to be solved

  • We must stop updating indexes on demander when the data is ready to be transferred from the supplier node. All async cache updates on demander must not cause the index update;
  • The previous partition metadata page and all stored meta information must be destroyed in PageMemory and restored from the new partition file;

Rebuild indexes

The node is ready to become partition owner when partition data is rebalanced and cache indexes are ready. For the message-based cluster rebalancing approach indexes are rebuilding simultaneously with cache data loading. For the file-based rebalancing approach, the index rebuild procedure must be finished before the partition state is set to the OWNING state. 

Public API changes

The following changes needs to be made:

Code Block
languagejava
titleCommunicationSpi.java
collapsetrue
/**
 * @return {@code True} if new type of direct connections supported.
 */
public default boolean channelConnectionSupported() {
    return false;
}
 
/**
 * @param remote Destination cluster node to communicate with.
 * @param msg Configuration channel message.
 * @throws IgniteSpiException If fails.
 */
public default IgniteSocketChannel channel(ClusterNode remote, T msg) throws IgniteSpiException {
    throw new UnsupportedOperationException();
}

Failover and Recovery

}
}


Copy partition on the fly

Checkpointer

When the supplier node receives the cache partition file demand request it will send the file over the CommunicationSpi. The cache partition file can be concurrently updated by checkpoint thread during its transmission. To guarantee the file consistency Сheckpointer must use Copy-on-Write [3] tehnique and save a copy of updated chunk into the temporary file.

Rebuild indexes

The node is ready to become partition owner when partition data is rebalanced and cache indexes are ready. For the message-based cluster rebalancing approach indexes are rebuilding simultaneously with cache data loading. For the file-based rebalancing approach, the index rebuild procedure must be finished before the partition state is set to the OWNING state. 

Failover and Recovery

Ignite doesn't provide any recovery guarantees for the partitions with the MOVING state. The cache partitions will be fully loaded when the next rebalance procedure occurs.

FAIL\LEFT during rebalancing

The node which is beeing rebalancing left the cluster. For such nodes WAL is always disabled (all partitions have MOVING state due to this node is new for the cluster and has no cache data). 
Since WAL is disabled we can guarantee that all operations with loaded partition files are safe to be done (renaming partition files, applying async updates) due to a cache directory will be fully dropped on recoveryApache Ignite doesn't provide any recovery guarantees for the partitions with the MOVING state. The cache partitions will be fully loaded when the next rebalance procedure occurs.

Topology change

Each topology change event JOIN/LEFT/FAILED may or may not change cache affinity assignments of currently rebalacning caches. If assignments is not changed and the node is still needs partitions being rebalanced we can continue the current rebalance process (see for details IGNITE-7165).

...

To provide basic recovery guarantees we must to: 

...

  • Wait for the first checkpoint ends and set OWNING status to partition;

Recovery from different stages:

  • The Supplier crashes when sending partition;
  • The Demander crashes when receiving partition;
  • The Demander crashes when applying temp WAL;

Phase-2

The SSL must be disabled to take an advantage of Java NIO zero-copy file transmission using FileChannel#transferTo method. If we need to use SSL the file must be splitted on chunks the same way to send them over the socket channel with ByteBuffer. As the SSL engine generally needs a direct ByteBuffer to do encryption we can't avoid copying buffer payload from the kernel level to the application level

...