Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
public interface TransmissionHandler {
/**
* @param err The err of fail handling process.
*/
public void onException(UUID nodeId, Throwable err);

/**
* @param nodeId Remote node id from which request has been received.
* @param fileMeta File meta info.
* @return Absolute pathname denoting a file.
*/
public String filePath(UUID nodeId, TransmissionMeta fileMeta);

/**
* <em>Chunk handler</em> represents by itself the way of input data stream processing.
* It accepts within each chunk a {@link ByteBuffer} with data from input for further processing.
*
* @param nodeId Remote node id from which request has been received.
* @param initMeta Initial handler meta info.
* @return Instance of chunk handler to process incoming data by chunks.
*/
public Consumer<ByteBuffer> chunkHandler(UUID nodeId, TransmissionMeta initMeta);

/**
* <em>File handler</em> represents by itself the way of input data stream processing. All the data will
* be processed under the hood using zero-copy transferring algorithm and only start file processing and
* the end of processing will be provided.
*
* @param nodeId Remote node id from which request has been received.
* @param initMeta Initial handler meta info.
* @return Intance of read handler to process incoming data like the {@link FileChannel} manner.
*/
public Consumer<File> fileHandler(UUID nodeId, TransmissionMeta initMeta);
}
IDIEP-28
Author
Sponsor
Created

31-Oct-2018

Status

Status
colourYellow
titleACTIVE


Table of Contents

Motivation

The Apache Ignite cluster balance procedure with enabled persitence currently doesn't utilize network and storage device throughout to its full extent. The balance procedure processes cache data entries one by one which is not efficient enough for the cluster with enabled persistence.

Competitive Analysis

Profiling current balancing procedure

Rebalance procedure optimizations

Possible partition file sending approaches

The Apache Ignite can support cache rebalancing as transferring partition files using zero copy algorithm [1] based on an extension of communication SPI and Java NIO API. When the partition file has been transferred to the demander node there are two possible approaches can be implemented to preload entries from particular partition file.

Hot swap cache data storage

The demander node will initialize a preloaded partition file as a new PageStore, make this storage up-to-date by applying previously saved async cache operations and then hot swap it under the checkpoint write lock.

Disadvantages:

  • A long and complex index reduild procedure that requires the development of additional crash recovery guarantees.
    The index rebuild procedure starts immediately when the partition file is fully received from the supplier node. If the node crashes in the middle of the rebuilding index process it will have an inconsistent index state at the further node startup. To avoid this a new index-undo WAL record must be logged within rebuilding and used on node start to remove previously added index records.

Preload entries from loaded partition file

The demander node will use a preloaded patition file as a new source of cache data entries to load.

Disadvantages:

  • The approach will require a new temporary FilePageStore to be initialized. It must be created as a part of the temporary cache group or in the separate temporary data region to provide reusing machinery of iteration over the full partition file.

Proposed Changes (Hot swap)

Process Overview

In the process of balancing data:

  • demaner (receiver of partition files)
  • supplier (sender of partition files).

The whole process is described in terms of rebalance single cache group and partition files would be rebalanced one-by-one:

  1. The demander node prepares the set of IgniteDhtDemandedPartitionsMap#full cache partitions to fetch;
  2. The demander node checks compatibility version (for example, 2.8) and starts recording all incoming cache updates to the new special storage – the temporary WAL storage;
  3. The demander node sends the GridDhtPartitionDemandMessage to the supplier node as usual;
  4. The supplier node receives GridDhtPartitionDemandMessage and starts the new checkpoint process and fixes cache partition file sizes;
  5. The supplier node creates an empty temporary file with .delta (e.g. part-0.bin.delta file) postfix for each cache partition file (in the same cache working directory or another configured);
  6. The supplier node starts tracking each pageId write attempt to these partition files
    1. When the write attempt happens, the thread that caused it reads the original copy of this page from the partition and flushes it to the corresponding .delta file;
    2. After it the thread writes the changed page data to the partition file;
  7. The supplier waits the checkpoint process ends;
  8. On the supplier for each cache partition file
    1. The process opens the partition file in read-only mode;
    2. Starts sending partition file (as it is with any concurrent writes) by chunks of predefined constant size (multiple of PageMemory size);
    3. After the partition file sent it starts sending corresponding .delta file;
  9. The demander node starts to listen to new type of incoming connections (a socket channel created event) from the supplier node;
  10. When the appropriate connection established the demander node for each cache partition file
    1. Receives file metadata information (corresponding cache group identifier, cache partition file name, file size)
    2. Writes data from the socket to the particular cache partition file from the beginning of the file
    3. After the original cache partition file received the node starts receiving corresponding .delta file
    4. The node reads data from the socket by chunks of PageMemory size and applies each received pageId to the partition file
  11. When the demander node receives the whole cache partition file
    1. The node begins the rebuild secondary indexes procedure over received partition file
    2. After it the thread begins to apply for data entries from the beginning of WAL-temporary storage;
    3. All async operations corresponding to this partition file still write to the end of temporary WAL;
    4. At the moment of WAL-temporary storage is ready to be empty
      1. Start the first checkpoint;
      2. Wait for the first checkpoint ends and own the cache partition;
      3. All operations now are switched to the partition file instead of writing to the temporary WAL;
      4. Schedule the temporary WAL storage deletion;
  12. The supplier node deletes all temporary files;

Components

In terms of a high-level abstraction, Apache Ignite must support the features described below.

File transfer between nodes

Partition preloader must support cache partition file relocation from one cluster node to another (the zero copy algorithm [1] assume to be used by default). To achieve this, the file transfer machinery must be implemented at Apache Ignite over Communication SPI.

CommunicationSpi

Ignite's Comminication SPI must support to:

  • establishing channel connections to the remote node to an arbitrary topic (GridTopic) with predefined processing policy;
  • listening incoming channel creation events and registering connection handlers on the particular node;
  • an arbitrary set of channel parameters on connection handshake;
API

...

...

...

31-Oct-2018

...

Status
colourYellow
titleACTIVE

Table of Contents

Motivation

The Apache Ignite cluster balance procedure with enabled persitence currently doesn't utilize network and storage device throughout to its full extent. The balance procedure processes cache data entries one by one which is not efficient enough for the cluster with enabled persistence.

Competitive Analysis

Profiling current balancing procedure

Rebalance procedure optimizations

Possible partition file sending approaches

The Apache Ignite can support cache rebalancing as transferring partition files using zero copy algorithm [1] based on an extension of communication SPI and Java NIO API. When the partition file has been transferred to the demander node there are two possible approaches can be implemented to preload entries from particular partition file.

Hot swap cache data storage

The demander node will initialize a preloaded partition file as a new PageStore, make this storage up-to-date by applying previously saved async cache operations and then hot swap it under the checkpoint write lock.

Disadvantages:

  • A long and complex index reduild procedure that requires the development of additional crash recovery guarantees.
    The index rebuild procedure starts immediately when the partition file is fully received from the supplier node. If the node crashes in the middle of the rebuilding index process it will have an inconsistent index state at the further node startup. To avoid this a new index-undo WAL record must be logged within rebuilding and used on node start to remove previously added index records.

Preload entries from loaded partition file

The demander node will use a preloaded patition file as a new source of cache data entries to load.

Disadvantages:

  • The approach will require a new temporary FilePageStore to be initialized. It must be created as a part of the temporary cache group or in the separate temporary data region to provide reusing machinery of iteration over the full partition file.

Proposed Changes (Hot swap)

Process Overview

In the process of balancing data:

  • demaner (receiver of partition files)
  • supplier (sender of partition files).

The whole process is described in terms of rebalance single cache group and partition files would be rebalanced one-by-one:

  1. The demander node prepares the set of IgniteDhtDemandedPartitionsMap#full cache partitions to fetch;
  2. The demander node checks compatibility version (for example, 2.8) and starts recording all incoming cache updates to the new special storage – the temporary WAL storage;
  3. The demander node sends the GridDhtPartitionDemandMessage to the supplier node as usual;
  4. The supplier node receives GridDhtPartitionDemandMessage and starts the new checkpoint process and fixes cache partition file sizes;
  5. The supplier node creates an empty temporary file with .delta (e.g. part-0.bin.delta file) postfix for each cache partition file (in the same cache working directory or another configured);
  6. The supplier node starts tracking each pageId write attempt to these partition files
    1. When the write attempt happens, the thread that caused it reads the original copy of this page from the partition and flushes it to the corresponding .delta file;
    2. After it the thread writes the changed page data to the partition file;
  7. The supplier waits the checkpoint process ends;
  8. On the supplier for each cache partition file
    1. The process opens the partition file in read-only mode;
    2. Starts sending partition file (as it is with any concurrent writes) by chunks of predefined constant size (multiple of PageMemory size);
    3. After the partition file sent it starts sending corresponding .delta file;
  9. The demander node starts to listen to new type of incoming connections (a socket channel created event) from the supplier node;
  10. When the appropriate connection established the demander node for each cache partition file
    1. Receives file metadata information (corresponding cache group identifier, cache partition file name, file size)
    2. Writes data from the socket to the particular cache partition file from the beginning of the file
    3. After the original cache partition file received the node starts receiving corresponding .delta file
    4. The node reads data from the socket by chunks of PageMemory size and applies each received pageId to the partition file
  11. When the demander node receives the whole cache partition file
    1. The node begins the rebuild secondary indexes procedure over received partition file
    2. After it the thread begins to apply for data entries from the beginning of WAL-temporary storage;
    3. All async operations corresponding to this partition file still write to the end of temporary WAL;
    4. At the moment of WAL-temporary storage is ready to be empty
      1. Start the first checkpoint;
      2. Wait for the first checkpoint ends and own the cache partition;
      3. All operations now are switched to the partition file instead of writing to the temporary WAL;
      4. Schedule the temporary WAL storage deletion;
  12. The supplier node deletes all temporary files;

Components

In terms of a high-level abstraction, Apache Ignite must support the features described below.

File transfer between nodes

Partition preloader must support cache partition file relocation from one cluster node to another (the zero copy algorithm [1] assume to be used by default). To achieve this, the file transfer machinery must be implemented at Apache Ignite over Communication SPI.

CommunicationSpi

Ignite's Comminication SPI must support to:

  • establishing channel connections to the remote node to an arbitrary topic (GridTopic) with predefined processing policy;
  • listening incoming channel creation events and registering connection handlers on the particular node;
  • an arbitrary set of channel parameters on connection handshake;
API
Code Block
languagejava
themeConfluence
titleCommunicationListenerEx.java
collapsetrue
public interface CommunicationListenerEx<T extends Serializable> extends EventListener {
    /**
     * @param nodeId Remote node id.
     * @param initMsg Init channel message.
     * @param channel Locally created channel endpoint.
     */
    public void onChannelOpened(UUID nodeId, Message initMsg, Channel channel);
}

FileTransmitProcessor

The file transmission manager must support to:

  • using different approaches of incoming data handling – buffered and direct (zero-copy approach of FileChannel#transferTo);
  • transferring data by chunks of predefined size with saving intermediate results;
  • re-establishing connection if an error occurs and continue file upload\download;
  • limiting connection bandwidth (upload and download) at runtime;
API

...

languagejava
themeConfluence
titleFileWriter.java
collapsetrue

...

Code Block
languagejava
themeConfluence
titleTransmitSessionHandlerCommunicationListenerEx.java
collapsetrue
/**
 *
 */
public interface TransmitSessionHandler CommunicationListenerEx<T extends Serializable> extends EventListener {
    /**
     * @param nodeId The remoteRemote node id connected from.
     * @param sessionIdinitMsg TheInit uniquechannel session idmessage.
     */
 @param channel Locally publiccreated void begin(UUID nodeId, String sessionId);

    /**channel endpoint.
     */
   @return Thepublic instancevoid of read handler to process incoming data by chunks.
     */
    public ChunkHandler chunkHandler();

onChannelOpened(UUID nodeId, Message initMsg, Channel channel);
}

GridIoManager

The file transmission manager must support to:

  • using different approaches of incoming data handling – buffered and direct (zero-copy approach of FileChannel#transferTo);
  • transferring data by chunks of predefined size with saving intermediate results;
  • re-establishing connection if an error occurs and continue file upload\download;
  • limiting connection bandwidth (upload and download) at runtime;
API
Code Block
languagejava
themeConfluence
titleTransmissionHandler.java
collapsetrue
public interface TransmissionHandler {
    /**
     * @param @returnerr The intanceerr of readfail handler tohandling process incoming data like the {@link FileChannel} manner.
     */
    public FileHandler fileHandler( public void onException(UUID nodeId, Throwable err);

    /**
     * The@param endnodeId ofRemote sessionnode transmissionid process.
from which request has been */received.
    public void end();

    /*** @param fileMeta File meta info.
     * @param@return causeAbsolute Thepathname cause of fail handling processdenoting a file.
     */
    public voidString onException(Throwable cause);
}
Code Block
languagejava
themeConfluence
titleFileHandler.java
collapsetrue
filePath(UUID nodeId, TransmissionMeta fileMeta);

    /**
     *
 <em>Chunk */
public interface FileHandler {
    /**handler</em> represents by itself the way of input data stream processing.
     * @paramIt nameaccepts Thewithin fileeach namechunk transfer from.
     * @param position The start position pointer of download file in original source.a {@link ByteBuffer} with data from input for further processing.
     *
     * @param countnodeId TotalRemote countnode ofid bytesfrom readedwhich fromrequest thehas originalbeen sourcereceived.
     * @param paramsinitMeta TheInitial additionalhandler transfer file description paramsmeta info.
     * @return The absolute pathname string denoting the file or {@code null} if there is no sense.
     * @throws IOException If fails. Instance of chunk handler to process incoming data by chunks.
     */
    public Consumer<ByteBuffer> chunkHandler(UUID nodeId, TransmissionMeta initMeta);

    /**
     */
 <em>File handler</em> represents publicby Stringitself begin(String name, long position, long count, Map<String, Serializable> params) throws IOException;

the way of input data stream processing. All the data will
     /**
 be processed under the * @param file The file with fully downloaded data into.hood using zero-copy transferring algorithm and only start file processing and
     * @param params The additional transfer file description params. the end of processing will be provided.
     *
     */
 @param nodeId Remote publicnode id voidfrom end(File file, Map<String, Serializable> params);
}which request has been received.
     * @param initMeta Initial handler meta info.
     * @return Intance of read handler to process incoming data like the {@link FileChannel} manner.
     */
    public Consumer<File> fileHandler(UUID nodeId, TransmissionMeta initMeta);
}


Code Block
languagejava
titleGridIoManager.TransmissionSender.java
collapsetrue

public class TransmissionSender implements Closeable {
/**
* @param file Source file to send to remote.
* @param params Additional
Code Block
languagejava
themeConfluence
titleChunkHandler.java
collapsetrue
/**
 *
 */
public interface ChunkHandler {
    /**
     * @param name The file name on remote.
     * @param position The start position pointer of downloading file in original source.
     * @param count Total count of bytes to read from the original source.
     * @param params The additional transfer file description paramskeys.
* @param plc   * @return The sizepolicy of ofhandling {@linkdata ByteBuffer} to read the input channel into.
     on remote.
* @throws IOExceptionIgniteCheckedException If fails.
     */
public void send(
File  public int begin(String name, long position, long count, Map<String, Serializable> params) throws IOException;

    /**
     * @param buff The data filled buffer.
     * @return {@code true} if the chunk of data have been successfully accepted.
     * @throws IOException If fails.
     */
    public boolean chunk(ByteBuffer buff) throws IOException;

    /**
     * @param params The additional handling channel description params.
     */
    public void end(Map<String, Serializable> params);
}file,
Map<String, Serializable> params,
TransmissionPolicy plc
) throws IgniteCheckedException, InterruptedException, IOException {
send(file, 0, file.length(), params, plc);
}

/**
* @param file Source file to send to remote.
* @param plc The policy of handling data on remote.
* @throws IgniteCheckedException If fails.
*/
public void send(
File file,
TransmissionPolicy plc
) throws IgniteCheckedException, InterruptedException, IOException {
send(file, 0, file.length(), new HashMap<>(), plc);
}

/**
* @param file Source file to send to remote.
* @param offset Position to start trasfer at.
* @param cnt Number of bytes to transfer.
* @param params Additional transfer file description keys.
* @param plc The policy of handling data on remote.
* @throws IgniteCheckedException If fails.
*/
public void send(
File file,
long offset,
long cnt,
Map<String, Serializable> params,
TransmissionPolicy plc
) throws IgniteCheckedException, InterruptedException, IOException {
...
}
}


Copy partition on the fly

...