IDIEP-40
Author
Sponsor
Created2020-02-14
Status
DRAFT


Motivation

Currently, when the persistent node joins to cluster its does nothing for loading data from disk to memory. So when the new node would finish the join and ready to work It usually has in memory only some entities which were received during rebalance. And it loads another data only upon request under the load. Which leads to a slow down of the whole cluster.

Description

To resolve the issues described above, It makes sense to join to cluster only after warming-up the caches(load some pages from disk to memory). But it's not easy to implement right now due to restriction of the current implementation of the node life cycle, namely:

  • Node doesn't have any information about clusters before the join phase. So it's impossible to understand which caches warm-up still make sense.

Considering these problems looks like our new flow has to be changed in the following way:

  • When the node is starting it should detect expectation of cache warm-up by some attribute(SystemProperites??) - or it doesn't matter?
  • After the recovery phase, we shouldn't shutdown the caches
  • Instead of join phase, one new phase(named it, for example, pre-fetch) should be added which would be collected the cluster details without blocking the cluster work.
  • According to collected information(ex. full/historical rebalance needed, cache destroyed and so on) and configured strategy, the node decides which of caches can be warmed-up.
  • After local actions were finished, the node is transited to join phase which, in the ideal case, should support the collecting only difference between current information and pre-fetch information. - this is already implemented in distributed metastorage, but it can lead to the problem in other processes.
  • Initial exchange and possible historical rebalance are finishing the node start.

Generall proposal

Explicitly split existed code to phase such - localInit(ex. recovery), pre-fetch(ex. collect, received), globalInit(ex. warm-up), join(ex. collect, received), postInit, exchange.

Proposed API ...

Warm-up attribute

Some attribute should indicate expected cache warm-up or not. According to this attribute, we can understand the pre-fetch phase makes sense or not, also as warm-up actions. 

Maybe we should always do the pre-fetch phase?

Should be it a warm-up attribute or pre-fetch attribute?


Part 1

Pre-fetch phase

The new phase is the lightweight analog of the join phase. It also has several sub-phase like collectJoiningData?, collectGridData?, receivedJoiningData?, receivedGridData?. Unlike the join phase which has a pair of messages like NodeAdd and NodeAddFinish, pre-fetch has only one message which just collects the data and disappears from the cluster(or we should keep the connection? I don't think so).

Current join process described here

PreFetch abstractions

PreFetch is on high-level abstraction looks like a join process. But I suggest several changes:

Instead of GridComponent#collectJoiningNodeData and so on, I suggest to use follows interface:

public interface ConfigurationCollector<ClusterDataT, SpecificNodeDataT, JoiningDataT> {
        /**
         * Invoked on the joining node for collection of information that will be sent to the cluster.
         *
         * @return Data to send to cluster.
         */
        JoiningDataT collectJoiningNodeData();

        /**
         * Invoked on each cluster nodes for collection of information specific for only this node.
         *
         * @param joiningNode Joining node info.
         * @param joiningNodeData Data from joining node which was collected at {@link
         * ConfigurationCollector#collectJoiningNodeData()}
         * @return Specific node data.
         */
        SpecificNodeDataT collectNodeSpecificData(ClusterNode joiningNode, JoiningDataT joiningNodeData);

        /**
         * Invoked on only one node on the cluster to collect common cluster data.
         *
         * @param joiningNode Joining node info.
         * @param joiningNodeData Data from joining node which was collected at {@link
         * ConfigurationCollector#collectJoiningNodeData()}
         * @return Common cluster data.
         */
        ClusterDataT collectClusterData(ClusterNode joiningNode, JoiningDataT joiningNodeData);

        /**
         * Invoked on the joining node after collection data from cluster(after {@link ConfigurationCollector#collectNodeSpecificData}
         * and {@link ConfigurationCollector#collectClusterData})
         *
         * @param clusterNodeData Collected data from cluster on {@link ConfigurationCollector#collectClusterData}.
         * @param nodeSpecificData Collected data from cluster on {@link ConfigurationCollector#collectNodeSpecificData}
         * per node.
         */
        void handleClusterData(ClusterDataT clusterNodeData, Map<ClusterNode, SpecificNodeDataT> nodeSpecificData);
    }


Also, GridDiscoveryManager can be expanded next methods:

  • void registerPreFetchCollector(ConfigurationCollector collector) - for registration of pre-fetch collectors

  • void registerJoinCollector(ConfigurationCollector collector) - for registration of the join collectors in case of replacing of current join approach.

So after this registration of collectors inside of each processor which demands it, it will look like:

...
discovery.registerPreFetchCollector(new ConfigurationCollector() {
  PreFetchJoiningData collectJoiningNodeData(){}
  PreFetchSpecificNodeData collectNodeSpecificData(ClusterNode joiningNode, PreFetchJoiningData joiningNodeData){}
  PreFetchClusterData collectClusterData(ClusterNode joiningNode, PreFetchJoiningData joiningNodeData){}
  void handleClusterData(PreFetchClusterData clusterNodeData, Map<ClusterNode, PreFetchSpecificNodeData> nodeSpecificData){}
})

discovery.registerJoinCollector(new ConfigurationCollector() {
  JoinJoiningData collectJoiningNodeData(){}
  JoinSpecificNodeData collectNodeSpecificData(ClusterNode joiningNode, JoinJoiningData joiningNodeData){}
  JoinClusterData collectClusterData(ClusterNode joiningNode, JoinJoiningData joiningNodeData){}
  void handleClusterData(JoinClusterData clusterNodeData, Map<ClusterNode, JoinSpecificNodeData> nodeSpecificData){}
})
...


PreFetch process

PreFetch process:

  • The joining node call collectJoiningNodeData and send TcpDiscoveryPreFetchRequestMessage(new message which is analog of TcpDiscoveryJoinRequestMessage) to the cluster and immediately disconnect from it.

  • When this message reaches the coordinator, the coordinator validates the node and checks the security.

  • After all validations, the coordinator sends a new discovery message(ex. TcpDiscoveryCollectMessage) which collect data via collectNodeSpecificData

  • After specific data was collected, the coordinator collect common data locally via collectClusterData

  • After all data were collected coordinator opens a connection to the joining node, sends all data and closes the connection.

  • When joining node received the cluster data it calls handleClusterData on each processor and continues the start node process.

PreFetch collection data per each processor:

  • CacheObjectBinaryProcessorImpl - the same as the join but both can be optimized to send only difference.

  • DistributedMetaStorageImpl - the same as the join because it already works with diff correctly.

  • GridCacheProcessor - it makes sense to do the same as the join but perhaps the join should be rewritten to detect changes after preFetch

  • IgniteServiceProcessor - doesn’t have any influence on the current task but possible to do the same as the join with future optimization

  • GridMarshallerMappingProcessor - the same as above

  • GridContinuousProcessor - the same as above

  • Also, it requires adding a new place for collection(WriteAheadLog?) which sends details about available WAL history for an understanding of the possibility of historical rebalance.

Join phase

The same as the current one, but it still needs to improve the handling of received duplicated data from the cluster or we should send to joining node only difference of information between the current state and pre-fetched one like distributed metastore already do.

Part 2

Cache warm-up strategy

The strategy is a new entity that contains the logic of cache warm-up such as random pages(easy one), last memory snapshot, specific cache and so on.
Strategies are suggested to be launched after phase recovery for each persistent data region, and configurations can be set either for specific data regions or common for all.
Internal interface offered:

package org.apache.ignite.internal.processors.cache.warmup;

import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.configuration.WarmUpConfiguration;
import org.apache.ignite.internal.processors.cache.persistence.DataRegion;

/**
 * Interface for warming up.
 */
public interface WarmUpStrategy<T extends WarmUpConfiguration> {
    /**
     * Returns configuration class for mapping to strategy.
     *
     * @return Configuration class.
     */
    Class<T> configClass();

    /**
     * Warm up.
     *
     * @param cfg       Warm-up configuration.
     * @param region    Data region.
     * @throws IgniteCheckedException if faild.
     */
    void warmUp(T cfg, DataRegion region) throws IgniteCheckedException;

    /**
     * Stop warming up.
     *
     * @throws IgniteCheckedException if faild.
     */
    void stop() throws IgniteCheckedException;
}

To run warm-up strategies, they need to be configured through configurations:

  • org.apache.ignite.configuration.DataRegionConfiguration#setWarmUpConfiguration - to warm up a specific data region;
  • org.apache.ignite.configuration.DataStorageConfiguration#setDefaultWarmUpConfiguration - to warm up all data regions.

The marker interface is offered as a configuration:

package org.apache.ignite.configuration;

import java.io.Serializable;

/**
 * Warm-up configuration marker interface.
 */
public interface WarmUpConfiguration extends Serializable {
    // No-op.
}

For possibility of dynamically adding strategies, an extension interface is offered, which can be added to any custom plugin:

package org.apache.ignite.internal.processors.cache.warmup;

import java.util.Collection;
import org.apache.ignite.plugin.Extension;

/**
 * Interface for getting warm-up strategies from plugins.
 */
public interface WarmUpStrategySupplier extends Extension {
    /**
     * Getting warm-up strategies.
     *
     * @return Warm-up strategies.
     */
    Collection<WarmUpStrategy<?>> strategies();
}

MXBean is offered for external interaction with warm-up strategies:

package org.apache.ignite.mxbean;

/**
 * Warm-up MXBean interface.
 */
@MXBeanDescription("MBean that provides access to warm-up.")
public interface WarmUpMXBean {
    /**
     * Stop warm-up.
     */
    @MXBeanDescription("Stop warm-up.")
    void stopWarmUp();
}

When starting a node, it is checked that the warm-up configuration corresponds to one of strategies (by default + from plugins) and then sequentially starting from default data region strategy is applied to it, either a special strategy for data region or default is used.

Strategies.

  • No-op - does nothing, allows you to disable execution of strategies for a specific data region, corresponding classes:
    • org.apache.ignite.configuration.NoOpWarmUpConfiguration;
    • org.apache.ignite.internal.processors.cache.warmup.NoOpWarmUpStrategy.
  • Load all - sequential loading of pages for each cache group starting from index partition as long as there is space in data region, corresponding classes:
    • org.apache.ignite.configuration.LoadAllWarmUpConfiguration;
    • org.apache.ignite.internal.processors.cache.warmup.LoadAllWarmUpStrategy.

Open questions

  • Should the pre-fetch depend on any attribute, or should it be added on an ongoing basis?
  • Do we need any attribute for pre-fetch or it should be another way for interaction with it(configuration, something new)?
  • Should subphases of pre-fetch be different from join subphases or they should be the same?

Usability considerations

Risks and Assumptions

Discussion Links

http://apache-ignite-developers.2346864.n4.nabble.com/Cluster-auto-activation-design-proposal-td20295.html

Reference Links

// Links to various reference documents, if applicable.

Open Tickets 

key summary type created updated due assignee reporter priority status resolution

JQL and issue key arguments for this macro require at least one Jira application link to be configured

Closed Tickets 

key summary type created updated due assignee reporter priority status resolution

JQL and issue key arguments for this macro require at least one Jira application link to be configured



  • No labels