Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Public class IndexQuery for building index query:
    1. it inherits public class Query<R> with R = Cache.Entry<K, V>;
    2. it accepts index descriptions at a constructor;
    3. it accepts multiple index conditions (joint with AND) at object methods.
  2. Ideally user User provides index conditions that matches index fields and their order (ts desc, price asc) to leverage on an index structure.
  3. But for queries that doesn't match index Ignite will run the index scan find(null, null) and apply user conditions on it. Cases are:
    1. index conditions joint with OR;
    2. different sort order: idx(ts desc, price asc) but query(ts desc, price desc);
    3. Cache is wide and IndexScan on few fields is much cheaper than ScanQuery;
    4. predicate operation with a custom user condition.

Internal changes for the query are:

Internal changes for the query are:

  1. Internal class IndexQueryDesc for transferring query between nodes.
  2. Internal class IndexCondition for building index queries
  3. New query type INDEX
  4. Internal class QueryIndexDesc supports different ways of index description.
  5. Internal class IndexCondition, IndexConditions for building index queries
  6. New query type INDEX in the enum InternalGridCacheQueryType.

...

Code Block
languagejava
// Creates an index query for an index created with Java API. 
// 1. Specify index description at constructor.
// 2. Index name is optional param, can try do best to find right index basing on specified Value.class in constructor and fields in conditions.
// 3. Index conditions (joint with AND operation) with methods.
QueryCursor<Cache.Entry<Long, Good>> cursor = ignite.cache("GOOD").query(
	IndexQuery.forType(Good.class, idxName?)  // idxName is optional.
		.gt("ts", lastMidnightTs)
		.lt("price", 123.0)
);

// Create index with SQL query: "CREATE INDEX GOOD_TS_DESC_PRICE_ASC_IDX on GOOD_TBL (ts DESC, price ASC)"
// 1. Table name should be specified because it is possible to have the same index name for different tables (e.g., __key_PK).
// 2. Index name is optional too (do our best to find right index to run the query).
QueryCursor<Cache.Entry<Long, Good>> cursor = ignite.cache("GOOD").query(
	IndexQuery.forTable("GOOD_TBL", "GOOD_TS_DESC_PRICE_ASC_IDX"?)  // idxName is optional.
		.gt("ts", lastMidnightTs)
		.lt("price", 123.0)
);

// The "predicate" operation can't find best index for a user query. So user must provide an index name for this operation. 
QueryCursor<Cache.Entry<Long, Good>> cursor = ignite.cache("GOOD").query(
	new IndexQuery<>("GOOD_TBL", "GOOD_TS_DESC_PRICE_ASC_IDX")
		.predicate((good) -> good.ts > lastMidnightTs || price > 100)
);

Classes to implement the API:


Classes to implement the API:

Code Block
languagejava
// Public packages.

// IndexQuery extends
public IndexQuery<K, V> extends Query<Cache.Entry<K, V>> {

	private List<IndexCondition> idxConds = new ArrayList<>();

	// Index description.
	private @Nullable String idxName;
	private @Nullable String valClass;
	private @Nullable String schemaName;

	public static IndexQuery forType(Class<?> valClass, String? idxName) {
		return new IndexQuery(valCls, idxName);		
	}

	public static IndexQuery forTable(Class<?> valClass, String? schema, String? idxName) {
		return new IndexQuery(valClass, schema, idxName);
	}

	public IndexQuery lt(String field, Object val) {
		IndexCondition cond = RangeIndexCondition();
		idxConds.add(field, val);

		return this
Code Block
languagejava
// Public packages.

// IndexQuery extends
public IndexQuery<K, V> extends Query<Cache.Entry<K, V>> {

	private List<IndexCondition> idxConds = new ArrayList<>();

	private QueryIndexDesc idxDesc;

	private IndexQuery(QueryIndexDesc idxDesc) {
		this.idxDesc = idxDesc;
	}

	public// staticOther IndexQuery forType(Class<?> valClass, String? idxName) {
		QueryIndexDesc idxDesc = new QueryIndexDesc(valClass, idxName);

		return new IndexQuery(idxDesc);		
	}

	public static IndexQuery forTable(String table, String? schema, String? idxName) {
		QueryIndexDesc idxDesc = new QueryIndexDesc(table, schema, idxName);

		return new IndexQuery(idxDesc);
	}

	public IndexQuery lt(String field, Object val) {
		IndexCondition cond = IndexConditions.lt(field, val));
		idxConds.add(cond);

		return this;
	}

	// Other methods are:
	// eq, notEq, gt, gte, lt, lte, between, in, notIn, min, max, predicate
}

// Internal packages.

/** Describes an index to query. */
class QueryIndexDesc implements Serializable {
	private @Nullable String idxName;
	private @Nullable String valClass;
	private @Nullable String tblName;
	private @Nullable String schemaName;
}

class IndexConditions {
	/** Find index keys that are less than a specified key. */
	public static IndexCondition lt(String field, Object val) {
		return new RangeIndexCondition(field, null, val, false, false);
	} 	

	// eq, notEq, gt, gte, lt, lte, between, in, notIn, min, max, predicate
}


abstract class IndexCondition extends Serializable {
	private final String field;
}

// min, max
class MinMaxIndexCondition extends IndexCondition {
	private final boolean max;
}

// gt, gte, lt, lte, between
class RangeIndexCondition extends IndexCondition {
	private final @Nullable Object lower;
	private final @Nullable Object upper;

	private final boolean lowerInclusive;
	private final boolean upperInclusive;
}

// in, notIn, eq, notEq
class InIndexCondition extends IndexCondition {
	private final Object[] vals;

	// Flag for not-in condition.
	private final boolean inverse;
}

// predicate
class PredicateIndexCondition extends IndexCondition {
	private IgnitePredicate<?> predicate;
}

2) Query Processing

Currently Ignite have infrastructure for processing index queries: GridIndexingManager, IndexingSpi, SpiQuery. So it is possible to use this code to implement processing of index queries: provide a default implementation for IndexingSpi and make IndexQuery as a wrapper on SpiQuery. But there are some issues:

  1. GridIndexingManager (and IndexingSpi) supports 3 methods: store, remove, query.
    1. Store/remove methods work with cache (K, V). Looks like it is implemented for storing the data in user defined structures, independent to Ignite storages. So SpiQuery aimed to query those user defined storages. 
    2. Ignite has the IndexProcessor that is responsible for storing / removing index entries, and it works directly with CacheDataRow.
    3. Our goal is run queries on indexes are handled with Ignite storages (including already existing indexes).
  2. Some users may provide a custom implementation of IndexingSpi by some reasons. Then user will have to choose between own implementation and new one.
  3. Name "SpiQuery" isn't obvious for a goal of querying secondary indexes.
  4. IndexingSpi.query function has the "cache" param. But cache actually is only a one of attributes describes an index. Also SQL indexes are not described with cache at all, but schema/table instead. Such method doesn't fit us well.

Then proposal is:

  1. Introduce new type of query - INDEX;
  2. Final query processing is performed within IndexQueryProcessor;
  3. Entrypoint for distributed index queries is the IndexQueryProcessor.queryDistributed method, it executes MapReduce query:
    1. the Map phase is node-local querying of index, it returns sorted data by definition;
    2. the Reduce phase is responsible for the Merge Sort of data from multiple nodes;
    3. for implementing it's suggested to extend the h2.twostep package - move basic functionality to the ignite-core module, then different query processors can extends them (h2, index query processor).
  4. Entrypoint for local query is the IndexQueryProcessor.queryLocal method. It accepts users IndexQueryand IndexingQueryFilter for filtering result cache entries (primary partition, MVCC, etc);
  5. The method predicate accepts a function, that should be deployed on other nodes with GridCacheDeploymentManager.
methods are:
	// eq, notEq, gt, gte, lt, lte, between, in, notIn, min, max, predicate
}

// Internal packages.


abstract class IndexCondition extends Serializable {
	private final List<String> fields;
}

// min, max
class MinMaxIndexCondition extends IndexCondition {
	private final boolean max;
}

// gt, gte, lt, lte, between
class RangeIndexCondition extends IndexCondition {
	private final @Nullable Object lower;
	private final @Nullable Object upper;

	private final boolean lowerInclusive;
	private final boolean upperInclusive;
}

// in, notIn, eq, notEq
class InIndexCondition extends IndexCondition {
	private final Object[] vals;

	// Flag for not-in condition.
	private final boolean inverse;
}

// predicate
class PredicateIndexCondition extends IndexCondition {
	private IgnitePredicate<?> predicate;
}

2) Query Processing

Currently Ignite have infrastructure for processing index queries: GridIndexingManager, IndexingSpi, SpiQuery. So it is possible to use this code to implement processing of index queries: provide a default implementation for IndexingSpi and make IndexQuery as a wrapper on SpiQuery. But there are some issues:

  1. GridIndexingManager (and IndexingSpi) supports 3 methods: store, remove, query.
    1. Store/remove methods work with cache (K, V). Looks like it is implemented for storing the data in user defined structures, independent to Ignite storages. So SpiQuery aimed to query those user defined storages. 
    2. Ignite has the IndexProcessor that is responsible for storing / removing index entries, and it works directly with CacheDataRow.
    3. Our goal is run queries on indexes are handled with Ignite storages (including already existing indexes).
  2. Some users may provide a custom implementation of IndexingSpi by some reasons. Then user will have to choose between own implementation and new one.
  3. Name "SpiQuery" isn't obvious for a goal of querying secondary indexes.
  4. IndexingSpi.query function has the "cache" param. But cache actually is only a one of attributes describes an index. Also SQL indexes are not described with cache at all, but schema/table instead. Such method doesn't fit us well.

Then proposal is:

  1. Introduce new type of query - INDEX;
  2. Final query processing is performed within IndexQueryProcessor;
  3. Entrypoint for distributed index queries is the IndexQueryProcessor.queryDistributed method, it executes MapReduce query:
    1. the Map phase is node-local querying of index, it returns sorted data by definition;
    2. the Reduce phase is responsible for the Merge Sort of data from multiple nodes;
    3. for implementing it's suggested to extend the h2.twostep package - move basic functionality to the ignite-core module, then different query processors can extends them (h2, index query processor).
  4. Entrypoint for local query is the IndexQueryProcessor.queryLocal method. It accepts users IndexQueryand IndexingQueryFilter for filtering result cache entries (primary partition, MVCC, etc);
  5. The method predicate accepts a function, that should be deployed on other nodes with GridCacheDeploymentManager.


Code Block
languagejava
// Internal package.

public class IndexQueryProcessor extends GridProcessorAdapter {
	// Provides access to indexes.
	private final IndexProcessor processor;

	// Entrypoint for local query.
    public Iterator<Cache.Entry<?,?>> queryLocal(IndexQuery idxQuery, @Nullable IndexingQueryFilter filters) throws IgniteException {
		// 1. If user specified index name, then check a query:
		// - sort query fields in index key order;
		// - check that it's a valid index query: fields covers index keys (from left to right)
		// - fail otherwise.
		// 2. If user doesn't specified index name:
		// - get all indexes for specified cache and Value.class;
		// - find index by filtering by a query fields (index keys must be in the same order as specified query fields, try sort fields to match any index).
		// - validate index query (see validation steps from 1.)
		Index idx = index(idxQuery.desc());

		// 1. Parse index conditions.
		// 2. Validate index condition, index type.
		// 3. Maps index conditions to an index query methods.
		// 4. Perform index operations, get Cursor.
		GridCursor<IndexRow> cursor = query(idx, idxQry.conditions());

		// 1. Map IndexRow to Cache entry.
		// 2. Apply specified cache filters (primary partitions, MVCC versions, etc.)
		// 3. Wrap cursor to iterator and return.
		return map_and_filter(cursor, filters);
	}

	private GridCursor<IndexRow> query(Index idx, List<IndexCondition> conditions) {
		// eq(key) -> idx.findOne(key)
		// notEq(key) -> idx.find(null, null, current -> current != key)
		// gt(key) -> idx.find(key, null, current -> current != key)
		// gte(key) -> idx.find(key, null)
		// lt(key) -> idx.find(null, key, current -> current != key)
		// lte(key) -> idx.find(null, key)
		// between(left, right) -> idx.find(left, right), inclusive only
		// in(keys...) -> idx.findOne(keys[0]) && idx.findOne(keys[1]) ...
		// notIn(keys...) -> idx.find(null, null, current -> !keys.contains(current))
		// min() -> idx.findFirst()
		// max() -> idx.findLast()
		// predicate(p
Code Block
languagejava
// Internal package.

public class IndexQueryProcessor extends GridProcessorAdapter {
	// Provides access to indexes.
	private final IndexProcessor processor;

	// Listen for messages from other nodes (index queries, fetch data)
	private final GridMessageListener idxQryLsnr;

	// Executor of map queries (index.find).
	private final IndexQueryMapExecutor mapExec;

	// Executor for reduce query (MergeSort).
	private final IndexQueryReduceExecutor rdcExec;

	// Entrypoint for distributed query.
    public Iterator<Cache.Entry<?,?>> queryDistributed(IndexQuery idxQuery) throws IgniteException {
		if (containsPredicate(idxQuery))
			deployPredicate(idxQuery);

		IndexQueryTwoStepQuery twoStepQry = prepareTwoStepQuery();

		return rdcExec.execute(twoStepQuery);
	}

	// Entrypoint for local query.
    public Iterator<Cache.Entry<?,?>> queryLocal(IndexQuery idxQuery, @Nullable IndexingQueryFilter filters) throws IgniteException {
		// 1. If user specified index name, then check a query:
		// - sort query fields in index key order;
		// - check that it's a valid index query: fields covers index keys (from left to right)
		// - fail otherwise.
		// 2. If user doesn't specified index name:
		// - get all indexes for specified cache and Value.class;
		// - find index by filtering by a query fields (index keys must be in the same order as specified query fields, try sort fields to match any index).
		// - validate index query (see validation steps from 1.)
		Index idx = index(idxQuery.desc());

		// 1. Parse index conditions.
		// 2. Validate index condition, index type.
		// 3. Maps index conditions to an index query methods.
		// 4. Perform index operations, get Cursor.
		GridCursor<IndexRow> cursor = query(idx, idxQry.conditions());

		// 1. Map IndexRow to Cache entry.
		// 2. Apply specified cache filters (primary partitions, MVCC versions, etc.)
		// 3. Wrap cursor to iterator and return.
		return map_and_filter(cursor, filters);
	}

	private GridCursor<IndexRow> query(Index idx, List<IndexCondition> conditions) {
		// eq(key) -> idx.findOne(key)
		// notEq(key) -> idx.find(null, null, current -> current != key)
		// gt(key) -> idx.find(key, null, current -> current != key)
		// gte(key) -> idx.find(key, null)
		// lt(key) -> idx.find(null, key, current -> current != key)
		// lte(key) -> idx.find(null, key)
		// between(left, right) -> idx.find(left, right), inclusive only
		// in(keys...) -> idx.findOne(keys[0]) && idx.findOne(keys[1]) ...
		// notIn(keys...) -> idx.find(null, null, current -> !keys.contains(current))
		// min() -> idx.findFirst()
		// max() -> idx.findLast()
		// predicate(p) -> idx.find(null, null, p)
	}
}

...

p)
	}
}

Phase 2. IndexScan.

For queries that doesn't match index Ignite will run the index scan find(null, null) and apply user conditions on it. Cases are:

  1. index conditions joint with OR;
  2. different sort order: idx(ts desc, price asc) but query(ts desc, price desc);
  3. Cache is wide and IndexScan on few fields is much cheaper than ScanQuery;
  4. predicate operation with a custom user condition.


Code Block
languagejava
// The "predicate" operation can't find best index for a user query. So user must provide an index name for this operation. 
QueryCursor<Cache.Entry<Long, Good>> cursor = ignite.cache("GOOD").query(
	new IndexQuery<>("GOOD_TBL", "GOOD_TS_DESC_PRICE_ASC_IDX")
		.predicate((good) -> good.ts > lastMidnightTs || price > 100)
);



Phase 3. Index creation API.

Currently an index can be declared statically (with CacheConfiguration) or created dynamically only with SQL ddl query. Both ways doesn't support functional indexes. Then, implementation will have next steps:

...

Code Block
languagejava
// Dynamic index creation.

QueryIndex qryIdx = new QueryIndex()
	.setName("GOOD_DISCOUNT_PRICE_IDX")
	.setFieldNames(Collections.singleton("discountPrice"))
	.setIndexType(QueryIndexType.FUNCTIONAL_SORTED);

cache.createIndex(qryIdx)

Phase

...

4. Flexibility of return.

Goal is to provide opportunity to a user to result partially and skip unneeded hops to a Cache:

...

  1. KeyOnly: QueryCursor<Cache.Entry<K, V>> where value is always NULL;
  2. ValueOnly: QueryCursor<Cache.Entry<K, V>> where key is always NULL;
  3. IndexFieldsOnly: QueryCursor<Cache.Entry<K, V>> where key is always NULL, value is an object with NULL in non-indexed fields.

Phase

...

5. Support of SELECT FOR UPDATE for Index queries.

Goal is to provide "select for update" query with transactional support for index queries:

...