Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • The approach will require a new temporary FilePageStore to be initialized. It must be created as a part of the temporary cache group or in the separate temporary data region to provide reusing machinery of iteration over the full partition file.

Proposed Changes (Hot swap

...

with historical rebalance)

Process Overview

In the process of balancing data:

  • demaner Demander (receiver of partition files)
  • supplier Supplier (sender of partition files).

The whole process is described in terms of rebalance single cache group and partition files rebalancing a single partition file of a cache group. All the other partitions would be rebalanced one-by-one:.

  1. NODE_JOIN event occurrs and the blocking PME starts;
    1. The Demander decides which partitions must be loaded. All the desired partitions have MOVING state;
    2. The Demander initiates a new checkpoint process;
      1. Under the checkpoint write-lock it swaps cache data storage with the temporary one for each partition of the given set;
      2. The temporary cache data storage tracks partition counter number as ususal (on each cache operations);
      3. Wait for the checkpoint begin future ends;
  2. The Demander sends a request to the Supplier with the previously prepares set of cache groups and partition files;
  3. The Supplier receives a request and starts a new local checkpoint process;
    1. Creates a temporary file with .delta postfix (for each partition file e.g. part-0.bin.delta);
    2. Under checkpoint write lock fixes the partition expected file size (at the moment of the checkpoint end);
    3. Wait for the checkpoint begin future ends;
    4. Starts the copy process of the partition file to the Demander;
      1. Opens the partition file in read-only mode;
      2. Starts sending partition file (with any concurrent writes) by chunks of predefined size;
    5. Asynchronouosly writes each page to the partition file and the same page to the corresponding file with .delta postfix;
    6. When the partition file sent it starts sending corresponding .delta file;
  4. The Demander listens new file sending attempts from the Supplier;
  5. The Demander receives partition file (for each partition file one by one);
  6. The Demander reads corresponding partition .delta file by chunks and applies them on the received partiton file;
  7. When the Demander receives the whole cache partition file;
    1. Swap the temporary cache data storage with the original one on the next checkpoint (under write lock); 
    2. When the partition has been swapped it starts the rebuild indexes procedure over given partition files;
    3. Starts historical rebalance for the given partition file;
  8. The Supplier deletes
  9. The demander node prepares the set of IgniteDhtDemandedPartitionsMap#full cache partitions to fetch;
  10. The demander node checks compatibility version (for example, 2.8) and starts recording all incoming cache updates to the new special storage – the temporary WAL storage;
  11. The demander node sends the GridDhtPartitionDemandMessage to the supplier node as usual;
  12. The supplier node receives GridDhtPartitionDemandMessage and starts the new checkpoint process and fixes cache partition file sizes;
  13. The supplier node creates an empty temporary file with .delta (e.g. part-0.bin.delta file) postfix for each cache partition file (in the same cache working directory or another configured);
  14. The supplier node starts tracking each pageId write attempt to these partition files
    1. When the write attempt happens, the thread that caused it reads the original copy of this page from the partition and flushes it to the corresponding .delta file;
    2. After it the thread writes the changed page data to the partition file;
  15. The supplier waits the checkpoint process ends;
  16. On the supplier for each cache partition file
    1. The process opens the partition file in read-only mode;
    2. Starts sending partition file (as it is with any concurrent writes) by chunks of predefined constant size (multiple of PageMemory size);
    3. After the partition file sent it starts sending corresponding .delta file;
  17. The demander node starts to listen to new type of incoming connections (a socket channel created event) from the supplier node;
  18. When the appropriate connection established the demander node for each cache partition file
    1. Receives file metadata information (corresponding cache group identifier, cache partition file name, file size)
    2. Writes data from the socket to the particular cache partition file from the beginning of the file
    3. After the original cache partition file received the node starts receiving corresponding .delta file
    4. The node reads data from the socket by chunks of PageMemory size and applies each received pageId to the partition file
  19. When the demander node receives the whole cache partition file
    1. The node begins the rebuild secondary indexes procedure over received partition file
    2. After it the thread begins to apply for data entries from the beginning of WAL-temporary storage;
    3. All async operations corresponding to this partition file still write to the end of temporary WAL;
    4. At the moment of WAL-temporary storage is ready to be empty
      1. Start the first checkpoint;
      2. Wait for the first checkpoint ends and own the cache partition;
      3. All operations now are switched to the partition file instead of writing to the temporary WAL;
      4. Schedule the temporary WAL storage deletion;
  20. The supplier node deletes all temporary files;

Components

...