You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

Problem Description And Test Case

In Ignite 1.x implementation general reads performed out-of-transaction (such as getAll() or SQL SELECT) do not respect transaction boundaries. This problem is two-fold. First, local node transaction visibility is not atomic with respect to multi-entry read. Committed entry version is made visible immediately after entry is updated. Second, there is no visible version coordination when a read involves multiple nodes. Thus, even if local transaction visibility is made atomic, this does not solve the issue.

The problem can be easily described using a test case. Let's say we have a bank system with a fixed number of accounts and we continuously run random money transfers between random pairs of accounts. In this case the sum of account balances is a system invariant and must be the same for any getAll() or SQL query.

General Approach Overview

The main idea is that every node should store not only the current (last) entry value, but also some number of previous values in order to allow consistent distributed reads. To do this, we need to introduce a separate node role - transaction version coordinators - which will be responsible for assigning a monotonically growing transaction version as well as maintaining versions of in-progress transactions and in-progress reads. The last committed transaction ID and IDs of pending transactions define the versions that should be visible for any subsequent read. The IDs of pending reads defines the value versions that are no longer needed and can be discarded.

Version Coordinator(s)

In the initial version of distributed MVCC we will use single transaction coordinator that will define the global transaction order for all transactions in the cluster. The coordinator may be a dedicated node in the cluster. Upon version coordinator failure a new coordinator should be elected in such a way that the new coordinator will start assigning new versions that is guaranteed to be greater than all previous transaction versions. This can be easily implemented by defining a transaction version as a tuple (TV, LV) where TV is cluster topology version which is set to 1 on the first node and is incremented on each topology change, LV is coordinator local version which is set to 1 when coordinator is elected and is incremented on each transaction version request. When sorted lexicographically, this satisfies the requirements above.

When a transaction write version is requested, version coordinator generates a new version and adds this version to the pending transactions set. When transaction is committed or rolled back, version coordinator removes this version from pending set. When a transactional read is requested, the version coordinator captures the current state of pending transactions in order to determine which versions should not be evicted until read is finished. When a read is finished, an acknowledgement is sent to coordinator. Coordinator periodically broadcasts the last safe to keep version to the whole cluster so that nodes can discard temporarily saved versions.

Internal Data Structures Changes

Besides the existing key-value storage, each node should introduce an additional versioned key-value storage used to filter out not-ready-to-be-read versions. Each versioned read attempt should merge results obtained from versioned store and general storage. Below is one of the possible data structures designs for SQL indexes (main PK index should be the same with the only exception that it returns a single value instead of a range cursor)

 

MVCC LSM
class VersionedIndex {
    private NavigableMap<Key, List<VersionedRow>> versionedVals;
 
    public Cursor find(SearchRow lower, SearchRow upper, VersionInfo readVersion) {
        // Here we must first get a range of keys matching the search boundaries and then filter out the visible versions.
        // Note that versionedVals must explicitly contain deleted keys in order to be able to correctly merge results.
    }
}
 
class MergeIndex {
    public Cursor find(SearchRow lower, SearchRow upper, VersionInfo readVersion) {
        Cursor versioned = versionedIdx.find(lower, upper, readVersion);
 
        // Committed index does not need readVersion because it contains values that are 100% safe to read.
        Cursor committed = committedIdx.find(lower, upper);
 
        // Here we must return a merge cursor. In merge cursor keys from versioned cursor must override keys from committed cursor.
    }
}

 

 

Transactional Protocol Changes

Read (getAll and SQL changes)

  • No labels