HTTP API

  • POST:  /admin/rebalance
    parameters:
      – dataverseName (optional):  the name of the dataverse to be rebalanced;
      – datasetName (optional): the name of the dataset to be rebalanced;
      – nodes: a comma separated list of node names where the dataset gets rebalanced to.

    If neither dataverseName nor datasetName are provided, we will rebalance all datasets except Metadata datasets.
    If dataverseName is provided but datasetName is not provided, we will rebalance the given dataverse.
     
    Example: curl -X POST http://localhost:19002/admin/rebalance?dataverseName=tpch&datasetName=LineItem&nodes="asterix_nc1"

  • DELETE: /admin/rebalance
    It cancels all running or pending rebalance requests.
    Example: curl -X DELETE http://localhost:19002/admin/rebalance

Internal Implementation:

  • Service thread:
    • rebalance requests are processed one-at-a-time by a singleton thread executor;
    • each time a request comes, we submit the rebalance task into the thread executor and add it to the task queue;
    • once a rebalance task completes, we remove the task from the task queue;
    • a cancellation request will cancel of running or pending requests in the rebalance task queue.

  • Metadata entity:
    • Dataset: 
      for each dataset record, there is one more open field called "rebalanceCount".  If the dataset has never been rebalanced, it is Missing.
    • Nodegroup:
      when a dataset foo is created, we internally create a nodegroup with name foo_i (or just foo if i is Missing) where i=foo.rebalanceCount, based on the current available nodes. Then, we let the nodegroup of foo be foo_i.

  • Primary/secondary index file directory layout:
    • If the rebalanceCount of the dataset is Missing,  the file directory layout of indexes is the same as before --- primary/secondary index directories are directly under  in the dataset's directory.
    • If the rebalanceCount of the dataset is larger than 0,  primary/secondary index directories are under a nested directory in the dataset's directory with name rebalanceCount.

  • For each shadow dataset foo, repeat the following process:
    1. create a new node group foo_i (where i= foo.rebalance_count  is Missing? 1: foo.rebalance_count + 1) that contains the current available nodes, if the node group has already been occupied, we let the new node group have name foo_i_<uuid>;

    2. create an uncommitted dataset (i.e., an in-memory variable) with same name foo using node group foo_<i> (or foo_i_<uuid>with rebalanceCount as i;   (in the following description, we will call this dataset "rebalance target" and call the original dataset foo "rebalance source".)

    3. drop any leftover files for rebalance target;

    4. upsert all records from rebalance source to rebalance target on all partitions;

    5. check the existence of foo – if foo does not exist in metadata, drop the files for rebalance target. Let the persistent metadata entity of dataset foo switch to the rebalance target (i.e., the in-memory variable created in step 2);

    6. drop files of the rebalance source and drop node group foo_<i-1> (or foo_<i-1>_<uuid>).


    • There are three metadata transactions for step 1 to 6:

      1. step 1-4,  locks – read lock on foo and read lock on node group foo.nodegroup;

      2. step 5: write lock on foo, conditional read lock on node group foo.nodegroup;

      3. step 6: read lock on foo and write lock on node group foo_(i-1) (the same as foo.nodegroup in metadata transaction a).


    • The locks in metadata transaction a to c make sure that read-only queries are allowed for the most time except the duration of metadata transaction b.                 

    • We make sure step 5 and 6 can sustain InterruptedException, which means we will keep retrying metadata transaction 5 and 6 until success, in the event of interrupted exceptions.

  • Concurrency:
          Since we cut the rebalance process into three metadata transactions, other metadata write operations could potentially interleave with the rebalance process.
    • CASE 1: if foo is dropped between metadata transaction a and b.  At the beginning of step 5, we check the existence of foo and drop target files if dataset foo is dropped between transaction a and b.
    • CASE 2: if foo is dropped between metadata transaction b and c.  This time, it's the rebalance target that gets dropped.  Therefore, step 6 is independent to the drop operation.

  • Idempotent property:
    •  The whole rebalance process is idempotent in case it fails or the node crashses in the middle:
      • In metadata transaction a,  since (1) only the new node group creation is persistent metadata write, (2) we use uuid to name a node group if the desired name exists, and (3) step 3 drops leftover files for the rebalance target, it is idempotent if failure or crash happens at any point.
      • In metadata transaction b and c, since they sustain InterruptedException, they will never be interrupted in the middle as long as the node doesn't crash. In this way, a user cancellation request will not waste the bulk part of the rebalance work, i.e., metadata transaction a.
      • In metadata transaction b and c, if the node or JVM crashes, 
        • if the metadata entity was not switched (depending on what's there in the metadata node), the system uses the rebalance source as foo;
        • if the metadata entity was switched, the system uses the rebalance target as foo;
      • In the event of failures, there could be leaked source files (from metadata transaction c) which will be reclaimed in the next rebalance operation, or leaked target files (from metadata transaction a) which will not be reclaimed,  or leaked node group name (from metadata transaction a) which doesn't prevent the success of the next rebalance operation.  (ASTERIXDB-1948)
  • No labels