Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

IDIEP-28
Author
Sponsor
Created

31-Oct-2018

Status

Status
colourYellow
titleACTIVE

...

  1. The demander node prepares the set of IgniteDhtDemandedPartitionsMap#full cache partitions to fetch;
  2. The demander node checks compatibility version (for example, 2.8) and starts recording all incoming cache updates to the new special storage – the temporary WAL storage;
  3. The demander node sends the GridDhtPartitionDemandMessage to the supplier node as usual;
  4. The supplier node receives GridDhtPartitionDemandMessage and starts the new checkpoint process and fixes cache partition file sizes;
  5. The supplier node creates an empty temporary file with .delta (e.g. part-0.bin.delta file) postfix for each cache partition file (in the same cache working directory or another configured);
  6. The supplier node starts tracking each pageId write attempt to these partition files
    1. When the write attempt happens, the thread that caused it reads the original copy of this page from the partition and flushes it to the corresponding .delta file;
    2. After it the thread writes the changed page data to the partition file;
  7. The supplier waits the checkpoint process ends;
  8. On the supplier for each cache partition file
    1. The process opens the partition file in read-only mode;
    2. Starts sending partition file (as it is with any concurrent writes) by chunks of predefined constant size (multiple of PageMemory size);
    3. After the partition file sent it starts sending corresponding .delta file;
  9. The demander node starts to listen to new type of incoming connections (a socket channel created event) from the supplier node;
  10. When the appropriate connection established the demander node for each cache partition file
    1. Receives file metadata information (corresponding cache group identifier, cache partition file name, file size)
    2. Writes data from the socket to the particular cache partition file from the beginning of the file
    3. After the original cache partition file received the node starts receiving corresponding .delta file
    4. The node reads data from the socket by chunks of PageMemory size and applies each received pageId to the partition file
  11. When the demander node receives the whole cache partition file
    1. The node begins the rebuild secondary indexes procedure over received partition file
    2. After it the thread begins to apply for data entries from the beginning of WAL-temporary storage;
    3. All async operations corresponding to this partition file still write to the end of temporary WAL;
    4. At the moment of WAL-temporary storage is ready to be empty
      1. Start the first checkpoint;
      2. Wait for the first checkpoint ends and own the cache partition;
      3. All operations now are switched to the partition file instead of writing to the temporary WAL;
      4. Schedule the temporary WAL storage deletion;
  12. The supplier node deletes all temporary files;

...

Components

In terms of a high level overview a list of components that will be changed provided -level abstraction, Apache Ignite must support the features described below.

CommunicationSpi

To benefit from zero copy we must delegate the file transferring to FileChannel#transferTo(long, long, java.nio.channels.WritableByteChannel) [2] because the fast path of transferTo method is only executed if the destination buffer inherits from an internal JDK class.

  • The CommunicationSpi needs to support pipe connections between two nodes;
    • The WritableByteChannel needs to be accesses on the supplier side;
    • The ReadableByteChannel needs to be read on the demander side;
  • The CommunicationListener must be extended to respond on new incoming pipe connections;

Preloader

A new implementation of cache entries preloader assume to be done. The new implementation must send and receive cache partition files over the CommunicationSpi channels by chunks of data with validation received items.

...

File transfer between nodes

Partition preloader must support cache partition file relocation from one cluster node to another (the zero copy algorithm [1] assume to be used by default). To achieve this, the file transfer machinery must be implemented at Apache Ignite over Communication SPI.

CommunicationSpi

Ignite's Comminication SPI must support to:

  • establishing channel connections to the remote node to an arbitrary topic (GridTopic) with predefined processing policy;
  • listening incoming channel creation events and registering connection handlers on the particular node;
  • an arbitrary set of channel parameters on connection handshake;
API
Code Block
languagejava
themeConfluence
titleChannel.java
collapsetrue
/**
 *
 */
public interface Channel extends Closeable {
    /**
     * @return Connection id to remote node.
     */
    public ChannelId id();


    /**
     * @return The channel's configuration.
     */
    public ChannelConfig config();

    /**
     * @return A map of all channel attributes.
     */
    public Map<String, Serializable> attrs();

    /**
     * @param name The name to get attribute.
     * @param <T> The attribute type.
     * @return The corresponding channel attribute instance.
     */
    public <T extends Serializable> T attr(String name);

    /**
     * @param name The name to get attribute.
     * @param obj The chanel attribute instance.
     * @param <T> The attribute type.
     */
    public <T extends Serializable> void attr(String name, T obj);

    /**
     * @return <tt>true</tt> if the channel is configured and ready to use.
     */
    public boolean active();

    /**
     * Make the channel ready for use.
     */
    public void activate();
}


Code Block
languagejava
themeConfluence
titleChannelConfig.java
collapsetrue
/**
 *
 */
public interface ChannelConfig {
    /**
     * Gets the channel's mode.
     */
    public boolean blocking();

    /**
     * @param blocking The channel's mode to set.
     * @return The config instance.
     */
    public ChannelConfig blocking(boolean blocking);

    /**
     * Gets the timeout option option.
     */
    public int timeout();

    /**
     * @param millis The timeout in milliseconds.
     * @return The config instance.
     */
    public ChannelConfig timeout(int millis);
}


Code Block
languagejava
themeConfluence
titleCommunicationSpi.java
collapsetrue
public interface CommunicationSpi<T extends Serializable> extends IgniteSpi {
    /**
     * @param remote Destination cluster node to communicate with.
     * @param attrs Configuration channel attributes.
     * @throws IgniteSpiException If fails.
     */
    public default Channel channel(ClusterNode remote, Map<String, Serializable> attrs) throws IgniteSpiException {
        throw new UnsupportedOperationException();
    }
}


Code Block
languagejava
themeConfluence
titleCommunicationListener.java
collapsetrue
public interface CommunicationListener<T extends Serializable> extends EventListener {
    /**
     * @param ch A channel instance to process configure request.
     * @return Additional attributes to send to remote node.
     */
    public default Map<String, Serializable> onChannelConfigure(Channel ch) {
        return null;
    }

    /**
     * @param nodeId Remote node id.
     * @param ch Locally created channel endpoint.
     */
    public default void onChannelCreated(UUID nodeId, Channel ch) {
        // No-op.
    }
}

FileTransmitProcessor

The file transmission manager must support to:

  • using different approaches of incoming data handling – buffered and direct (zero-copy approach of FileChannel#transferTo);
  • transferring data by chunks of predefined size with saving intermediate results;
  • re-establishing connection if an error occurs and continue file upload\download;
  • limiting connection bandwidth (upload and download) at runtime;
API
Code Block
languagejava
themeConfluence
titleFileWriter.java
collapsetrue
/**
 *
 */
public interface FileWriter extends Closeable {
    /**
     * @param file The source file to send at.
     * @param offset The position to start at.
     * @param count The number of bytes to transfer.
     * @param params The additional transfer file description keys.
     * @param plc The policy of handling data on remote.
     * @throws IgniteCheckedException If fails.
     */
    public void write(
        File file,
        long offset,
        long count,
        Map<String, Serializable> params,
        ReadPolicy plc
    ) throws IgniteCheckedException;
}


Code Block
languagejava
themeConfluence
titleTransmitSession.java
collapsetrue
/**
 *
 */
public interface TransmitSession {
    /**
     * @param nodeId The remote node id connected from.
     * @param sessionId The unique session id.
     */
    public void begin(UUID nodeId, String sessionId);

    /**
     * @return The instance of read handler to process incoming data by chunks.
     */
    public ChunkHandler chunkHandler();

    /**
     * @return The intance of read handler to process incoming data like the {@link FileChannel} manner.
     */
    public FileHandler fileHandler();

    /**
     * The end of session transmission process.
     */
    public void end();

    /**
     * @param cause The cause of fail handling process.
     */
    public void onException(Throwable cause);
}


Code Block
languagejava
themeConfluence
titleFileHandler.java
collapsetrue
/**
 *
 */
public interface FileHandler {
    /**
     * @param name The file name transfer from.
     * @param position The start position pointer of download file in original source.
     * @param count Total count of bytes readed from the original source.
     * @param params The additional transfer file description params.
     * @return The absolute pathname string denoting the file or {@code null} if there is no sense.
     * @throws IOException If fails.
     */
    public String begin(String name, long position, long count, Map<String, Serializable> params) throws IOException;

    /**
     * @param file The file with fully downloaded data into.
     * @param params The additional transfer file description params.
     */
    public void end(File file, Map<String, Serializable> params);
}


Code Block
languagejava
themeConfluence
titleChunkHandler.java
collapsetrue
/**
 *
 */
public interface ChunkHandler {
    /**
     * @param name The file name on remote.
     * @param position The start position pointer of downloading file in original source.
     * @param count Total count of bytes to read from the original source.
     * @param params The additional transfer file description params.
     * @return The size of of {@link ByteBuffer} to read the input channel into.
     * @throws IOException If fails.
     */
    public int begin(String name, long position, long count, Map<String, Serializable> params) throws IOException;

    /**
     * @param buff The data filled buffer.
     * @return {@code true} if the chunk of data have been successfully accepted.
     * @throws IOException If fails.
     */
    public boolean chunk(ByteBuffer buff) throws IOException;

    /**
     * @param params The additional handling channel description params.
     */
    public void end(Map<String, Serializable> params);
}

Copy partition on the fly

...

Checkpointer

When the supplier node receives the cache partition file demand request it will send the file over the CommunicationSpi. The cache partition file can be concurrently updated by checkpoint thread during its transmission. To guarantee the file consistency Сheckpointer must use Copy-on-Write [3] tehnique and save a copy of updated chunk into the temporary file.

Apply partition on the fly

Catch-up temporary WAL

While the demander node is in the partition file transmission state it must save sequentially all cache entries corresponding to the MOVING partition into a new temporary storage. These entries will be applied later one by one on the newly received cache partition file. All asynchronous operations will be enrolled to the end of temporary storage during storage reads until it becomes fully read. The file-based FIFO approach assumes to be used by this process.

...