You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 18 Next »


IDIEP-28
Author
Sponsor
Created

31-Oct-2018

Status

DRAFT


Motivation

The Apache Ignite cluster balance procedure with enabled persitence currently doesn't utilize network and storage device throughout to its full extent. The balance procedure processes cache data entries one by one which is not efficient enough for the cluster with enabled persistence.

Description

The Apache Ignite needs to support cache rebalancing as transferring partition files using zero copy algorithm [1] based on an extension of communication SPI and Java NIO API.

Process Overview

There are two participants in the process of balancing data – demaner (receiver of partition files), supplier (sender of partition files).
The process of ordering cache groups for rebalance remains the same. The whole process is described in terms of rebalance single cache group:

  1. The demander node prepares the set of IgniteDhtDemandedPartitionsMap#full cache partitions to fetch;
  2. The demander node checks compatibility version (for example, 2.8) and starts recording all incoming cache updates to the new special storage – the temporary WAL;
  3. The demander node sends the GridDhtPartitionDemandMessage to the supplier node;
  4. When the supplier node receives GridDhtPartitionDemandMessage and starts the new checkpoint process;
  5. The supplier node creates empty the temporary cache partition file with .tmp postfix in the same cache persistence directory;
  6. The supplier node splits the whole cache partition file into virtual chunks of predefined size (multiply to the PageMemory size);
    1. If the concurrent checkpoint thread determines the appropriate cache partition file chunk and tries to flush dirty page to the cache partition file
      1. If rebalance chunk already transferred
        1. Flush the dirty page to the file;
      2. If rebalance chunk not transferred
        1. Write this chunk to the temporary cache partition file;
        2. Flush the dirty page to the file;
    2. The node starts sending to the demander node each cache partition file chunk one by one using FileChannel#transferTo
      1. If the current chunk was modified by checkpoint thread – read it from the temporary cache partition file;
      2. If the current chunk is not touched – read it from the original cache partition file;
  7. The demander node starts to listen to new pipe incoming connections from the supplier node on TcpCommunicationSpi;
  8. The demander node creates the temporary cache partition file with .tmp postfix in the same cache persistence directory;
  9. The demander node receives each cache partition file chunk one by one
    1. The node checks CRC for each PageMemory in the downloaded chunk;
    2. The node flushes the downloaded chunk at the appropriate cache partition file position;
  10. When the demander node receives the whole cache partition file
    1. The node swaps the original partition file with the .tmp partition file;
    2. The node starts applying for data entries from temporary WAL storage on .tmp partition file;
    3. All concurrent operations corresponding to cache partition file still write to the end of temporary WAL;
    4. At the moment of temporary WAL store is ready to be empty
      1. Suspend applying async operations to the temporary WAL;
      2. Wait on last operations are applied from the temporary WAL store to the partition file;
      3. The node owning the new cache partition;
      4. Resume applying  async operations to the new owning partition file;
      5. Schedule the temporary WAL storage deletion;
  11. The supplier node deletes the temporary cache partition file;

CommunicationSpi

To benefit from zero file copy we must delegate the file transferring to FileChannel#transferTo(long, long, java.nio.channels.WritableByteChannel) [2] because the fast path of transferTo method is only executed if the destination buffer inherits from an internal JDK class.

  • The CommunicationSpi needs to support pipe connections between two nodes;
    • The WritableByteChannel needs to be accesses on the supplier side;
    • The ReadableByteChannel needs to be read on the demander side;
  • The CommunicationListener must be extended to respond on new incoming pipe connections;

Partition transmission

The cache partition file transfer over the network must be done using chunks with validation of received piece of data on the demander side.

  • The new layer over the cache partition file must support direct using of FileChannel#transferTo method over the CommunicationSpi pipe connection;
  • The process manager must support transferring the cache partition file by chunks of predefined size (multiply to the page size) one by one;
  • The connection bandwidth of the cache partition file transfer must have an ability to be limited at runtime;

Checkpointing on supplier

When the supplier node receives the cache partition file demand request it must prepare and provide the cache partition file to transfer over network. The Copy-on-Write [3] tehniques assume to be used to guarantee the data consistency during chunk transfer.  

The checkpointing process description on the supplier node – items 4, 5, 6 of the Process Overview.

Catch-up WAL

During the cache partition file transmitting, the demander node must hold all corresponding data entries on the new temporary WAL storage to apply them later. The file-based FIFO technique assumes to be used.

  • The new write-ahead-log manager for writing temporary records must support
    • Unlimited number of wal-files to store temporary data records;
    • Iterating over stored data records during an asynchronous writer thread inserts new records;
    • WAL-per-partiton approach need to be used;

The process description on the demander node – items 2, 10 of the Process Overview.

Public API changes

The following changes needs to be made:

CommunicationSpi.java
/**
 * @return {@code True} if new type of direct connections supported.
 */
public default boolean pipeConnectionSupported() {
    return false;
}
 
/**
 * @param src Source cluster node to initiate connection with.
 * @return Channel to listen.
 * @throws IgniteSpiException If fails.
 */
public default ReadableByteChannel getRemotePipe(ClusterNode src) throws IgniteSpiException {
    throw new UnsupportedOperationException();
}
 
/**
 * @param dest Destination cluster node to communicate with.
 * @param out Channel to write data.
 * @throws IgniteSpiException If fails.
 */
public default void sendOnPipe(ClusterNode dest, WritableByteChannel out) throws IgniteSpiException {
    throw new UnsupportedOperationException();
}


Risks and Assumptions

A few notes can be mentioned:

  • If operating system does not support zero copy, sending a file with FileChannel#transferTo might fail or yield worse performance.
    For example, sending a large file doesn't work well enough on Windows;
  • SSL must be disabled to take an advantage of Java NIO zero copy file transmission using of FileChannel#transferTo. We can consider to use OpenSSL's non-copying interface to avoid allocating new buffers for each read and write operation at Phase-2;

Discussion Links

// Links to discussions on the devlist, if applicable.

Reference Links

  1. Zero Copy I: User-Mode Perspective – https://www.linuxjournal.com/article/6345
  2. Example: Efficient data transfer through zero copy – https://www.ibm.com/developerworks/library/j-zerocopy/index.html
  3. Copy-on-write – https://en.wikipedia.org/wiki/Copy-on-write

Tickets

// Links or report with relevant JIRA tickets.

  • No labels