ID | IEP-40 |
Author | |
Sponsor | |
Created | 2020-02-14 |
Status | DRAFT |
Currently, when the persistent node joins to cluster its does nothing for loading data from disk to memory. So when the new node would finish the join and ready to work It usually has in memory only some entities which were received during rebalance. And it loads another data only upon request under the load. Which leads to a slow down of the whole cluster.
To resolve the issues described above, It makes sense to join to cluster only after warming-up the caches(load some pages from disk to memory). But it's not easy to implement right now due to restriction of the current implementation of the node life cycle, namely:
Considering these problems looks like our new flow has to be changed in the following way:
Explicitly split existed code to phase such - localInit(ex. recovery), pre-fetch(ex. collect, received), globalInit(ex. warm-up), join(ex. collect, received), postInit, exchange.
Proposed API ...
Some attribute should indicate expected cache warm-up or not. According to this attribute, we can understand the pre-fetch phase makes sense or not, also as warm-up actions.
Maybe we should always do the pre-fetch phase?
Should be it a warm-up attribute or pre-fetch attribute?
The new phase is the lightweight analog of the join phase. It also has several sub-phase like collectJoiningData?, collectGridData?, receivedJoiningData?, receivedGridData?. Unlike the join phase which has a pair of messages like NodeAdd and NodeAddFinish, pre-fetch has only one message which just collects the data and disappears from the cluster(or we should keep the connection? I don't think so).
Current join process described here
PreFetch is on high-level abstraction looks like a join process. But I suggest several changes:
Instead of GridComponent#collectJoiningNodeData and so on, I suggest to use follows interface:
public interface ConfigurationCollector<ClusterDataT, SpecificNodeDataT, JoiningDataT> { /** * Invoked on the joining node for collection of information that will be sent to the cluster. * * @return Data to send to cluster. */ JoiningDataT collectJoiningNodeData(); /** * Invoked on each cluster nodes for collection of information specific for only this node. * * @param joiningNode Joining node info. * @param joiningNodeData Data from joining node which was collected at {@link * ConfigurationCollector#collectJoiningNodeData()} * @return Specific node data. */ SpecificNodeDataT collectNodeSpecificData(ClusterNode joiningNode, JoiningDataT joiningNodeData); /** * Invoked on only one node on the cluster to collect common cluster data. * * @param joiningNode Joining node info. * @param joiningNodeData Data from joining node which was collected at {@link * ConfigurationCollector#collectJoiningNodeData()} * @return Common cluster data. */ ClusterDataT collectClusterData(ClusterNode joiningNode, JoiningDataT joiningNodeData); /** * Invoked on the joining node after collection data from cluster(after {@link ConfigurationCollector#collectNodeSpecificData} * and {@link ConfigurationCollector#collectClusterData}) * * @param clusterNodeData Collected data from cluster on {@link ConfigurationCollector#collectClusterData}. * @param nodeSpecificData Collected data from cluster on {@link ConfigurationCollector#collectNodeSpecificData} * per node. */ void handleClusterData(ClusterDataT clusterNodeData, Map<ClusterNode, SpecificNodeDataT> nodeSpecificData); }
Also, GridDiscoveryManager can be expanded next methods:
void registerPreFetchCollector(ConfigurationCollector collector) - for registration of pre-fetch collectors
void registerJoinCollector(ConfigurationCollector collector) - for registration of the join collectors in case of replacing of current join approach.
So after this registration of collectors inside of each processor which demands it, it will look like:
... discovery.registerPreFetchCollector(new ConfigurationCollector() { PreFetchJoiningData collectJoiningNodeData(){} PreFetchSpecificNodeData collectNodeSpecificData(ClusterNode joiningNode, PreFetchJoiningData joiningNodeData){} PreFetchClusterData collectClusterData(ClusterNode joiningNode, PreFetchJoiningData joiningNodeData){} void handleClusterData(PreFetchClusterData clusterNodeData, Map<ClusterNode, PreFetchSpecificNodeData> nodeSpecificData){} }) discovery.registerJoinCollector(new ConfigurationCollector() { JoinJoiningData collectJoiningNodeData(){} JoinSpecificNodeData collectNodeSpecificData(ClusterNode joiningNode, JoinJoiningData joiningNodeData){} JoinClusterData collectClusterData(ClusterNode joiningNode, JoinJoiningData joiningNodeData){} void handleClusterData(JoinClusterData clusterNodeData, Map<ClusterNode, JoinSpecificNodeData> nodeSpecificData){} }) ...
PreFetch process:
The joining node call collectJoiningNodeData and send TcpDiscoveryPreFetchRequestMessage(new message which is analog of TcpDiscoveryJoinRequestMessage) to the cluster and immediately disconnect from it.
When this message reaches the coordinator, the coordinator validates the node and checks the security.
After all validations, the coordinator sends a new discovery message(ex. TcpDiscoveryCollectMessage) which collect data via collectNodeSpecificData
After specific data was collected, the coordinator collect common data locally via collectClusterData
After all data were collected coordinator opens a connection to the joining node, sends all data and closes the connection.
When joining node received the cluster data it calls handleClusterData on each processor and continues the start node process.
PreFetch collection data per each processor:
CacheObjectBinaryProcessorImpl - the same as the join but both can be optimized to send only difference.
DistributedMetaStorageImpl - the same as the join because it already works with diff correctly.
GridCacheProcessor - it makes sense to do the same as the join but perhaps the join should be rewritten to detect changes after preFetch
IgniteServiceProcessor - doesn’t have any influence on the current task but possible to do the same as the join with future optimization
GridMarshallerMappingProcessor - the same as above
GridContinuousProcessor - the same as above
Also, it requires adding a new place for collection(WriteAheadLog?) which sends details about available WAL history for an understanding of the possibility of historical rebalance.
The same as the current one, but it still needs to improve the handling of received duplicated data from the cluster or we should send to joining node only difference of information between the current state and pre-fetched one like distributed metastore already do.
The strategy is a new entity that contains the logic of cache warm-up such as random pages(easy one), last memory snapshot, specific cache and so on.
Strategies are suggested to be launched after phase recovery for each persistent data region, and configurations can be set either for specific data regions or common for all.
Internal interface offered:
package org.apache.ignite.internal.processors.cache.warmup; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.configuration.WarmUpConfiguration; import org.apache.ignite.internal.processors.cache.persistence.DataRegion; /** * Interface for warming up. */ public interface WarmUpStrategy<T extends WarmUpConfiguration> { /** * Returns configuration class for mapping to strategy. * * @return Configuration class. */ Class<T> configClass(); /** * Warm up. * * @param cfg Warm-up configuration. * @param region Data region. * @throws IgniteCheckedException if faild. */ void warmUp(T cfg, DataRegion region) throws IgniteCheckedException; /** * Stop warming up. * * @throws IgniteCheckedException if faild. */ void stop() throws IgniteCheckedException; }
To run warm-up strategies, they need to be configured through configurations:
The marker interface is offered as a configuration:
package org.apache.ignite.configuration; import java.io.Serializable; /** * Warm-up configuration marker interface. */ public interface WarmUpConfiguration extends Serializable { // No-op. }
For possibility of dynamically adding strategies, an extension interface is offered, which can be added to any custom plugin:
package org.apache.ignite.internal.processors.cache.warmup; import java.util.Collection; import org.apache.ignite.plugin.Extension; /** * Interface for getting warm-up strategies from plugins. */ public interface WarmUpStrategySupplier extends Extension { /** * Getting warm-up strategies. * * @return Warm-up strategies. */ Collection<WarmUpStrategy<?>> strategies(); }
MXBean is offered for external interaction with warm-up strategies:
package org.apache.ignite.mxbean; /** * Warm-up MXBean interface. */ @MXBeanDescription("MBean that provides access to warm-up.") public interface WarmUpMXBean { /** * Stop warm-up. */ @MXBeanDescription("Stop warm-up.") void stopWarmUp(); }
When starting a node, it is checked that the warm-up configuration corresponds to one of strategies (by default + from plugins) and then sequentially starting from default data region strategy is applied to it, either a special strategy for data region or default is used.
// Links to various reference documents, if applicable.