...
ID | IEP-28 | ||||||
Author | |||||||
Sponsor | |||||||
Created | 31-Oct-2018 | ||||||
Status |
|
...
The Apache Ignite can support cache rebalancing as transferring partition files using zero copy algorithm [1] based on an extension of communication SPI and Java NIO API.
In the process of balancing data:
The whole process is described in terms of rebalance single cache group and partition files would be rebalanced one-by-one:
In terms of a high level overview a list of components that will be changed provided below.
To benefit from zero copy we must delegate the file transferring to FileChannel#transferTo(long, long, java.nio.channels.WritableByteChannel) [2] because the fast path of transferTo method is only executed if the destination buffer inherits from an internal JDK class.
A new implementation of cache entries preloader assume to be done. The new implementation must send and receive cache partition files over the CommunicationSpi channels by chunks of data with validation received items.
When the supplier node receives the cache partition file demand request it will send the file over the CommunicationSpi. The cache partition file can be concurrently updated by checkpoint thread during its transmission. To guarantee the file consistency Сheckpointer must use Copy-on-Write [3] tehnique and save a copy of updated chunk into the temporary file.
When the partition file has been transferred to the demander node there are a few possible approaches can be implemented to preload entries from particular partition file.
The Demander node first under checkpoint write lock must swap cache data storage with the temporary one to perform recovery operations under original cache data storage. After partition file has been received from the Supplier node there are to possible cases to make this partition file up-to-date.
Disadvantages:
After partition is received the historical rebalance must be initiated to load other cache updates.
The swapped temporary storage will log all the cache updates to the temporary WAL storage (per each partition) for further applying them to the corresponding partition file. While the Demander is being receive partition files While the demander node is in the partition file transmission state it must save sequentially all cache entries corresponding to the MOVING partition into a new temporary storage. These entries will be applied later one by one on the newly received cache partition file. All asynchronous operations will be enrolled to the end of temporary storage during storage reads until it becomes fully read. The file-based FIFO approach assumes to be used by this process.
The temporary storage is chosen to be WAL-based. The storage must support to:
...
...
The node is ready to become partition owner when partition data is rebalanced and cache indexes are ready. For the message-based cluster rebalancing approach indexes are rebuilding simultaneously with cache data loading. For the file-based rebalancing approach, the index rebuild procedure must be run before the partition state is set to the OWNING state.
The following changes needs to be made:
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/**
* @return {@code True} if new type of direct connections supported.
*/
public default boolean channelConnectionSupported() {
return false;
}
/**
* @param remote Destination cluster node to communicate with.
* @param msg Configuration channel message.
* @throws IgniteSpiException If fails.
*/
public default IgniteSocketChannel channel(ClusterNode remote, T msg) throws IgniteSpiException {
throw new UnsupportedOperationException();
} |
demander node will use a preloaded patition file as a new source of cache data entries to load.
Disadvantages:
In the process of balancing data:
The whole process is described in terms of rebalancing a single partition file of a cache group. All the other partitions would be rebalanced one-by-one.
In terms of a high-level abstraction, Apache Ignite must support the features described below.
The node partition preloader machinery download cache partition files from cluster nodes which owns desired partitions (the zero copy algorithm [1] assume to be used by default). To achieve this, the file transmission process must be implemented at Apache Ignite over Communication SPI.
IThe Comminication SPI must support to:
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
public interface CommunicationListenerEx<T extends Serializable> extends EventListener {
/**
* @param nodeId Remote node id.
* @param initMsg Init channel message.
* @param channel Locally created channel endpoint.
*/
public void onChannelOpened(UUID nodeId, Message initMsg, Channel channel);
} |
IO manager must support to:
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
public interface TransmissionHandler {
/**
* @param err The err of fail handling process.
*/
public void onException(UUID nodeId, Throwable err);
/**
* @param nodeId Remote node id from which request has been received.
* @param fileMeta File meta info.
* @return Absolute pathname denoting a file.
*/
public String filePath(UUID nodeId, TransmissionMeta fileMeta);
/**
* <em>Chunk handler</em> represents by itself the way of input data stream processing.
* It accepts within each chunk a {@link ByteBuffer} with data from input for further processing.
*
* @param nodeId Remote node id from which request has been received.
* @param initMeta Initial handler meta info.
* @return Instance of chunk handler to process incoming data by chunks.
*/
public Consumer<ByteBuffer> chunkHandler(UUID nodeId, TransmissionMeta initMeta);
/**
* <em>File handler</em> represents by itself the way of input data stream processing. All the data will
* be processed under the hood using zero-copy transferring algorithm and only start file processing and
* the end of processing will be provided.
*
* @param nodeId Remote node id from which request has been received.
* @param initMeta Initial handler meta info.
* @return Intance of read handler to process incoming data like the {@link FileChannel} manner.
*/
public Consumer<File> fileHandler(UUID nodeId, TransmissionMeta initMeta);
} |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public class TransmissionSender implements Closeable {
/**
* @param file Source file to send to remote.
* @param params Additional transfer file description keys.
* @param plc The policy of handling data on remote.
* @throws IgniteCheckedException If fails.
*/
public void send(
File file,
Map<String, Serializable> params,
TransmissionPolicy plc
) throws IgniteCheckedException, InterruptedException, IOException {
send(file, 0, file.length(), params, plc);
}
/**
* @param file Source file to send to remote.
* @param plc The policy of handling data on remote.
* @throws IgniteCheckedException If fails.
*/
public void send(
File file,
TransmissionPolicy plc
) throws IgniteCheckedException, InterruptedException, IOException {
send(file, 0, file.length(), new HashMap<>(), plc);
}
/**
* @param file Source file to send to remote.
* @param offset Position to start trasfer at.
* @param cnt Number of bytes to transfer.
* @param params Additional transfer file description keys.
* @param plc The policy of handling data on remote.
* @throws IgniteCheckedException If fails.
*/
public void send(
File file,
long offset,
long cnt,
Map<String, Serializable> params,
TransmissionPolicy plc
) throws IgniteCheckedException, InterruptedException, IOException {
// Impl.
}
}
|
When the supplier node receives the cache partition file demand request it will send the file over the CommunicationSpi. The cache partition file can be concurrently updated by checkpoint thread during its transmission. To guarantee the file consistency Сheckpointer must use Copy-on-Write [3] tehnique and save a copy of updated chunk into the temporary file.
The node is ready to become partition owner when partition data is rebalanced and cache indexes are ready. For the message-based cluster rebalancing approach indexes are rebuilding simultaneously with cache data loading. For the file-based rebalancing approach, the index rebuild procedure must be finished before the partition state is set to the OWNING state.
Apache Ignite doesn't provide any recovery guarantees for the partitions with the MOVING state. The cache partitions will be fully loaded when the next rebalance procedure occurs.
The node which is beeing rebalancing left the cluster. For such nodes WAL is always disabled (all partitions have MOVING state due to this node is new for the cluster and has no cache data).
Since WAL is disabled we can guarantee that all operations with loaded partition files are safe to be done (renaming partition files, applying async updates) due to a cache directory will be fully dropped on recovery.
Each topology change event JOIN/LEFT/FAILED may or may not change cache affinity assignments of currently rebalacning caches. If assignments is not changed and the node is still needs partitions being rebalanced we can continue the current rebalance process (see for details IGNITE-7165).
...
To provide basic recovery guarantees we must to:
...
Recovery from different stages:
The SSL must be disabled to take an advantage of Java NIO zero-copy file transmission using FileChannel#transferTo method. If we need to use SSL the file must be splitted on chunks the same way to send them over the socket channel with ByteBuffer. As the SSL engine generally needs a direct ByteBuffer to do encryption we can't avoid copying buffer payload from the kernel level to the application level.
...