Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

IDIEP-71
Author
SponsorMaksim Timonin
Created06.04.2021
StatusDRAFT

Status
colourGreen
titleACTIVE


Table of Contents

Motivation

Proposal

Ignite will provide an API for querying and creating secondary (non-SQL) indexes with next features:

  1. Public Ignite Java API (IgniteCache, ClientCache) for direct querying secondary indexes:
    1. Query result is Iterator<Cache.Entry<K.V>>, result data is sorted by an index conditions;
    2. Decrease an overhead on SQL stuff like query parsing, query planning, etc;
    3. Guarantee to run queries on index (Ignite SQL does not support index hints to force IndexScan operations Actually SQL do support index hints https://ignite.apache.org/docs/latest/perf-and-troubleshooting/sql-tuning#index-hints)
    4. Clear Java syntax instead of sometimes non-intuitive Ignite SQL;
    5. Flexibility in returned fields (cache key, cache val, both, only indexed fields). It helps improve performance for some cases even more.
  2. Improved public Ignite Java API for creating secondary indexes:
    1. Functional indexes with Java functional interfaces;
    2. New Java API for dynamic creation of secondary indexes;
    3. Remove SQL dependency (annotation QuerySqlField) if only index feature is required and SQL is not needed.
  3. Transactional support for "select for update" index queries:
    1. User will able to update cache entries for result of an index query within same transaction.

Current state

Ignite has limitations in search by non-key fields. The proposal's goal is to overcome the issues described below.

...

Show this limitations with an example entity: Good(long uid, @Index long ts). Task is to get all cache entries that are created since the last midnight.

1) Using SqlFieldsQuery.

Code Block
languagejava
FieldsQueryCursor<List<?>> result = cache.query(
	new SqlFieldsQuery("select _val from Good where ts > ?").setArgs(lastMidnightTs));

List<Good> goods = (List<Good>) result.getAll().get(0);

...

  1. SQL query syntax: 
    1. User may not use SQL anywhere in a project. So this query can be the only SQL query within a project, so it leads to bad UX.
    2. SQL may be unclear for a user as it contains "_val" as column to return. Also it doesn't provide a type check, due to Java generics runtime erasion. 
    3. Result of a query is not a list of Cache.Entry, but a list of rows, where cache value is just one of column.
  2. SQL does not give guarantee of using index for a query (no hints, also H2 does not automatically analyze selectivity of a data for a GridH2Table).
  3. SQL does not support functional indexes.
  4. SQL does not support transactions, no way to implement "select for update" query.

2) Using ScanQuery.

Code Block
languagejava
QueryCursor<Cache.Entry<Long, Good>> cursor = cache.query(new ScanQuery<>((k, v) -> v.ts > lastMidnightTs));

List<Cache.Entry<Long, Good>> result = cursor.getAll();

...

  1. Scan query may be slow as traverses all data in a cache for high selectivity data;
  2. Scan queries does not support transactional updates, no way to implement "select for update" query;
  3. No flexibility in returning fields (it returns just cache key-val pairs).

3) Leverage on partitioning.

Ignite provides an opportunity to define AffinityKey for a cache and optionally custom AffinityFunction

...

  1. Cache can have only one affinity function.
  2. Affinity function maps cache entries to partitions, so range queries won't win there.
  3. Custom affinity function can unevenly distribute data between nodes. For our example, custom affinity will distribute Goods data by date. But for some days there can be less data. So this function should be tested well before usage.

Description

 It is suggested to implement the proposal step by step in 4 sequential phases.

...

  • Transactional support for "select for update" queries

Phase 1. Index Queries API.

1) Queries API

In this step we will provide Index Query API for existing indexes. Currently Ignite supports 3 ways to declare indexes:

...

Code Block
languagejava
// Public packages.

// IndexQuery extends
public IndexQuery<K, V> extends Query<Cache.Entry<K, V>> {

	private List<IndexCondition> idxConds = new ArrayList<>();

	private QueryIndexDesc idxDesc;

	private IndexQuery(QueryIndexDesc idxDesc) {
		this.idxDesc = idxDesc;
	}

	public static IndexQuery forType(Class<?> valClass, String? idxName) {
		QueryIndexDesc idxDesc = new QueryIndexDesc(valClass, idxName);

		return new IndexQuery(idxDesc);		
	}

	public static IndexQuery forTable(String table, String? schema, String? idxName) {
		QueryIndexDesc idxDesc = new QueryIndexDesc(table, schema, idxName);

		return new IndexQuery(idxDesc);
	}

	public IndexQuery lt(String field, Object val) {
		IndexCondition cond = IndexConditions.lt(field, val));
		idxConds.add(cond);

		return this;
	}

	// Other methods are:
	// eq, notEq, gt, gte, lt, lte, between, in, notIn, min, max, predicate
}

// Internal packages.

/** Describes an index to query. */
class QueryIndexDesc implements Serializable {
	private @Nullable String idxName;
	private @Nullable String valClass;
	private @Nullable String tblName;
	private @Nullable String schemaName;
}

class IndexConditions {
	/** Find index keys that are less than a specified key. */
	public static IndexCondition lt(String field, Object val) {
		return new RangeIndexCondition(field, null, val, false, false);
	} 	

	// eq, notEq, gt, gte, lt, lte, between, in, notIn, min, max, predicate
}


abstract class IndexCondition extends Serializable {
	private final String field;
}

// min, max
class MinMaxIndexCondition extends IndexCondition {
	private final boolean max;
}

// gt, gte, lt, lte, between
class RangeIndexCondition extends IndexCondition {
	private final @Nullable Object lower;
	private final @Nullable Object upper;

	private final boolean lowerInclusive;
	private final boolean upperInclusive;
}

// in, notIn, eq, notEq
class InIndexCondition extends IndexCondition {
	private final Object[] vals;

	// Flag for not-in condition.
	private final boolean inverse;
}

// predicate
class PredicateIndexCondition extends IndexCondition {
	private IgnitePredicate<?> predicate;
}

2) Query Processing

Currently Ignite have infrastructure for processing index queries: GridIndexingManager, IndexingSpi, SpiQuery. So it is possible to use this code to implement processing of index queries: provide a default implementation for IndexingSpi and make IndexQuery as a wrapper on SpiQuery. But there are some issues:

...

Code Block
languagejava
// Internal package.

public class IndexQueryProcessor extends GridProcessorAdapter {
	// Provides access to indexes.
	private final IndexProcessor processor;

	// Listen for messages from other nodes (index queries, fetch data)
	private final GridMessageListener idxQryLsnr;

	// Executor of map queries (index.find).
	private final IndexQueryMapExecutor mapExec;

	// Executor for reduce query (MergeSort).
	private final IndexQueryReduceExecutor rdcExec;

	// Entrypoint for distributed query.
    public Iterator<Cache.Entry<?,?>> queryDistributed(IndexQuery idxQuery) throws IgniteException {
		if (containsPredicate(idxQuery))
			deployPredicate(idxQuery);

		IndexQueryTwoStepQuery twoStepQry = prepareTwoStepQuery();

		return rdcExec.execute(twoStepQuery);
	}

	// Entrypoint for local query.
    public Iterator<Cache.Entry<?,?>> queryLocal(IndexQuery idxQuery, @Nullable IndexingQueryFilter filters) throws IgniteException {
		// 1. If user specified index name, then check a query:
		// - sort query fields in index key order;
		// - check that it's a valid index query: fields covers index keys (from left to right)
		// - fail otherwise.
		// 2. If user doesn't specified index name:
		// - get all indexes for specified cache and Value.class;
		// - find index by filtering by a query fields (index keys must be in the same order as specified query fields, try sort fields to match any index).
		// - validate index query (see validation steps from 1.)
		Index idx = index(idxQuery.desc());

		// 1. Parse index conditions.
		// 2. Validate index condition, index type.
		// 3. Maps index conditions to an index query methods.
		// 4. Perform index operations, get Cursor.
		GridCursor<IndexRow> cursor = query(idx, idxQry.conditions());

		// 1. Map IndexRow to Cache entry.
		// 2. Apply specified cache filters (primary partitions, MVCC versions, etc.)
		// 3. Wrap cursor to iterator and return.
		return map_and_filter(cursor, filters);
	}

	private GridCursor<IndexRow> query(Index idx, List<IndexCondition> conditions) {
		// eq(key) -> idx.findOne(key)
		// notEq(key) -> idx.find(null, null, current -> current != key)
		// gt(key) -> idx.find(key, null, current -> current != key)
		// gte(key) -> idx.find(key, null)
		// lt(key) -> idx.find(null, key, current -> current != key)
		// lte(key) -> idx.find(null, key)
		// between(left, right) -> idx.find(left, right), inclusive only
		// in(keys...) -> idx.findOne(keys[0]) && idx.findOne(keys[1]) ...
		// notIn(keys...) -> idx.find(null, null, current -> !keys.contains(current))
		// min() -> idx.findFirst()
		// max() -> idx.findLast()
		// predicate(p) -> idx.find(null, null, p)
	}
}

Phase 2. Index creation API.

Currently an index can be declared statically (with CacheConfiguration) or created dynamically only with SQL ddl query. Both ways doesn't support functional indexes. Then, implementation will have next steps:

...

Code Block
languagejava
// Dynamic index creation.

QueryIndex qryIdx = new QueryIndex()
	.setName("GOOD_DISCOUNT_PRICE_IDX")
	.setFieldNames(Collections.singleton("discountPrice"))
	.setIndexType(QueryIndexType.FUNCTIONAL_SORTED);

cache.createIndex(qryIdx)

Phase 3. Flexibility of return.

Goal is to provide opportunity to a user to result partially and skip unneeded hops to a Cache:

...

  1. KeyOnly: QueryCursor<Cache.Entry<K, V>> where value is always NULL;
  2. ValueOnly: QueryCursor<Cache.Entry<K, V>> where key is always NULL;
  3. IndexFieldsOnly: QueryCursor<Cache.Entry<K, V>> where key is always NULL, value is an object with NULL in non-indexed fields.

Phase 4. Support of SELECT FOR UPDATE for Index queries.

Goal is to provide "select for update" query with transactional support for index queries:

...

Deep design of solution is TBD.


Risks and Assumptions

  1. Supporting SQL and non-SQL indexes on the same field - they must work equally.
  2. Rebuild, clear, defragmentation tasks, control.sh, system views, IndexRowCache, security checks, etc(?) for non-SQL indexes - all of the tooling should work correctly with non-SQL indexes.
  3. Need support for tracking index queries (security checks, tracing, query views).
  4. Transactional support for "update for select" should be discovered.
  5. There can be performance issues with IndexFieldsOnly queries due to reflection, or wrong inlining.


Tickets