You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 44 Next »


IDIEP-28
Author
Sponsor
Created

31-Oct-2018

Status

DRAFT


Motivation

The Apache Ignite cluster balance procedure with enabled persitence currently doesn't utilize network and storage device throughout to its full extent. The balance procedure processes cache data entries one by one which is not efficient enough for the cluster with enabled persistence.

Description

The Apache Ignite needs to support cache rebalancing as transferring partition files using zero copy algorithm [1] based on an extension of communication SPI and Java NIO API.

Process Overview

In the process of balancing data:

  • demaner (receiver of partition files)
  • supplier (sender of partition files).

The whole process is described in terms of rebalance single cache group and partition files would be rebalanced one-by-one:

  1. The demander node prepares the set of IgniteDhtDemandedPartitionsMap#full cache partitions to fetch;
  2. The demander node checks compatibility version (for example, 2.8) and starts recording all incoming cache updates to the new special storage – the temporary WAL storage;
  3. The demander node sends the GridDhtPartitionDemandMessage to the supplier node as usual;
  4. The supplier node receives GridDhtPartitionDemandMessage and starts the new checkpoint process and fixes cache partition file sizes;
  5. The supplier node creates an empty temporary file with .delta (e.g. part-0.bin.delta file) postfix for each cache partition file (in the same cache working directory or another configured);
  6. The supplier node starts tracking each pageId write attempt to these partition files
    1. When the write attempt happens, the thread that caused it reads the original copy of this page from the partition and flushes it to the corresponding .delta file;
    2. After it the thread writes the changed page data to the partition file;
  7. The supplier waits the checkpoint process ends;
  8. On the supplier for each cache partition file
    1. The process opens the partition file in read-only mode;
    2. Starts sending partition file (as it is) by chunks of predefined constant size (multiple of PageMemory size);
    3. After the partition file sent it starts sending corresponding .delta file;
  9. The demander node starts to listen to new type of incoming connections (a socket channel created event) from the supplier node;
  10. When the appropriate connection established the demander node for each cache partition file
    1. Receives file metadata information (corresponding cache group identifier, cache partition file name, file size)
    2. Writes data from the socket to the particular cache partition file from the beginning of the file
    3. After the original cache partition file received the node starts receiving corresponding .delta file
    4. The node reads data from the socket by chunks of PageMemory size and applies each received pageId to the partition file
  11. When the demander node receives the whole cache partition file
    1. The node begins the rebuild secondary indexes procedure over received partition file
    2. After it the thread begins to apply for data entries from the beginning of WAL-temporary storage;
    3. All async operations corresponding to this partition file still write to the end of temporary WAL;
    4. At the moment of WAL-temporary storage is ready to be empty
      1. Start the first checkpoint;
      2. Wait for the first checkpoint ends and own the cache partition;
      3. All operations now are switched to the partition file instead of writing to the temporary WAL;
      4. Schedule the temporary WAL storage deletion;
  12. The supplier node deletes all temporary files;

Сomponents to change

In terms of a high level overview a list of components that will be changed provided below.

CommunicationSpi

To benefit from zero copy we must delegate the file transferring to FileChannel#transferTo(long, long, java.nio.channels.WritableByteChannel) [2] because the fast path of transferTo method is only executed if the destination buffer inherits from an internal JDK class.

  • The CommunicationSpi needs to support pipe connections between two nodes;
    • The WritableByteChannel needs to be accesses on the supplier side;
    • The ReadableByteChannel needs to be read on the demander side;
  • The CommunicationListener must be extended to respond on new incoming pipe connections;

Preloader

A new implementation of cache entries preloader assume to be done. The new implementation must send and receive cache partition files over the CommunicationSpi channels by chunks of data with validation received items.

  • The new layer over the cache partition file must support direct using of FileChannel#transferTo method over the CommunicationSpi pipe connection;
  • The process manager must support transferring the cache partition file by chunks of predefined size (multiply to the page size) one by one;
  • The connection bandwidth of the cache partition file transfer must have the ability to be limited at runtime;

Checkpointer

When the supplier node receives the cache partition file demand request it will send the file over the CommunicationSpi. The cache partition file can be concurrently updated by checkpoint thread during its transmission. To guarantee the file consistency Сheckpointer must use Copy-on-Write [3] tehnique and save a copy of updated chunk into the temporary file.

The checkpoint process description on the supplier node – items 4, 5, 6 of the Process Overview.

(new) Catch-up temporary WAL

While the demander node is in the partition file transmission state it must save all cache entries corresponding to the moving partition into a new temporary WAL storage. These entries will be applied later one by one on the received cache partition file. All asynchronous operations will be enrolled to the end of temporary WAL storage during storage reads until it becomes fully read. The file-based FIFO approach assumes to be used by this process.

The new write-ahead-log manager for writing temporary records must support to:

  • Unlimited number of wal-files to store temporary data records;
  • Iterating over stored data records during an asynchronous writer thread inserts new records;
  • WAL-per-partiton approach need to be used;
  • Write operations to temporary WAL storage must have higher priority over read operations;

The process description on the demander node – items 2, 10 of the Process Overview.

Public API changes

The following changes needs to be made:

CommunicationSpi.java
/**
 * @return {@code True} if new type of direct connections supported.
 */
public default boolean pipeConnectionSupported() {
    return false;
}
 
/**
 * @param src Source cluster node to initiate connection with.
 * @return Channel to listen.
 * @throws IgniteSpiException If fails.
 */
public default ReadableByteChannel getRemotePipe(ClusterNode src) throws IgniteSpiException {
    throw new UnsupportedOperationException();
}
 
/**
 * @param dest Destination cluster node to communicate with.
 * @param out Channel to write data.
 * @throws IgniteSpiException If fails.
 */
public default void sendOnPipe(ClusterNode dest, WritableByteChannel out) throws IgniteSpiException {
    throw new UnsupportedOperationException();
}

Recovery

In case of crash recovery, there is no additional actions need to be applied to keep the cache partition file consistency. We are not recovering partition with the moving state, thus the single partition file will be lost and only it. The uniqueness of it is guaranteed by the single-file-transmission process. The cache partition file will be fully loaded on the next rebalance procedure.

To provide default cluster recovery guarantee we must to: 

  • Start the checkpoint process when the temporary WAL storage becomes empty;
  • Wait for the first checkpoint ends and set owning status to the cache partition;

Risks and Assumptions

A few notes can be mentioned:

  • Zero-copy limitations – If operating system does not support zero copy, sending a file with FileChannel#transferTo might fail or yield worse performance. For example, sending a large file doesn't work well enough on Windows;
  • Writing WAL io wait time –  Under the heavy load of partition file transmission, writing to the temporary WAL storage may be slowing down. Since the loss of data of temporary WAL storage has no risks we can consider store the whole storage into the memory.

Phase-2

The SSL must be disabled to take an advantage of Java NIO zero-copy file transmission using FileChannel#transferTo method. If we need to use SSL the file must be splitted on chunks the same way to send them over the socket channel with ByteBuffer. As the SSL engine generally needs a direct ByteBuffer to do encryption we can't avoid copying buffer payload from the kernel level to the application level

Discussion Links

http://apache-ignite-developers.2346864.n4.nabble.com/DISCUSSION-Design-document-Rebalance-caches-by-transferring-partition-files-td38388.html

Reference Links

  1. Zero Copy I: User-Mode Perspective – https://www.linuxjournal.com/article/6345
  2. Example: Efficient data transfer through zero copy – https://www.ibm.com/developerworks/library/j-zerocopy/index.html
  3. Copy-on-write – https://en.wikipedia.org/wiki/Copy-on-write

Tickets

// Links or report with relevant JIRA tickets.

  • No labels