You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

IDIEP-71
Author
Sponsor
Created06.04.2021
StatusDRAFT


Motivation

Proposal

Ignite will provide an API for querying and creating secondary indexes with next features:

  1. Public Ignite Java API (IgniteCache, ClientCache) for direct querying secondary indexes:
    1. Query result is Iterator<Cache.Entry<K.V>>, result data is sorted by an index conditions;
    2. Decrease an overhead on SQL stuff like query parsing, query planning, etc;
    3. Guarantee to run queries on index (Ignite SQL does not support index hints to force IndexScan operations);
    4. Clear Java syntax instead of sometimes non-intuitive Ignite SQL;
    5. Flexibility in returned fields (cache key, cache val, both, only indexed fields). It helps improve performance for some cases even more.
  2. Improved public Ignite Java API for creating secondary indexes:
    1. Functional indexes with Java functional interfaces;
    2. New Java API for dynamic creation of secondary indexes;
    3. Remove SQL dependency (annotation QuerySqlField) if only index feature is required and SQL is not needed.
  3. Transactional support for "select for update" index queries:
    1. User will able to update cache entries for result of an index query within same transaction.

Current state

Ignite has limitations in search by non-key fields. The proposal's goal is to overcome the issues described below.

  1. Syntax limitations for SqlFieldsQuery.
  2. ScanQuery may be slow in case of full cache scan for a data with high selectivity.
  3. Usage of affinity keys does not provide well performance for arbitrary range queries.
  4. No way to provide a functional index.
  5. No transaction guarantees ("select for update" query). 

Show this limitations with an example entity: Good(long uid, @Index long ts). Task is to get all cache entries that are created since the last midnight.

1) Using SqlFieldsQuery.

FieldsQueryCursor<List<?>> result = cache.query(
	new SqlFieldsQuery("select _val from Good where ts > ?").setArgs(lastMidnightTs));

List<Good> goods = (List<Good>) result.getAll().get(0);

Pros are:

  1. SQL query uses embedded fast B+Tree index;
  2. Flexibility in returned fields (key only, val only, both, or only indexed fields).

Cons are:

  1. SQL query syntax: 
    1. User may not use SQL anywhere in a project. So this query can be the only SQL query within a project, so it leads to bad UX.
    2. SQL may be unclear for a user as it contains "_val" as column to return. Also it doesn't provide a type check, due to Java generics runtime erasion. 
    3. Result of a query is not a list of Cache.Entry, but a list of rows, where cache value is just one of column.
  2. SQL does not give guarantee of using index for a query (no hints, also H2 does not automatically analyze selectivity of a data for a GridH2Table).
  3. SQL does not support functional indexes.
  4. SQL does not support transactions, no way to implement "select for update" query.

2) Using ScanQuery.

QueryCursor<Cache.Entry<Long, Good>> cursor = cache.query(new ScanQuery<>((k, v) -> v.ts > lastMidnightTs));

List<Cache.Entry<Long, Good>> result = cursor.getAll();

Pros are:

  1. Clear Java syntax (generics, BiPredicate);
  2. Returns Cache.Entry cursor.

Cons are:

  1. Scan query may be slow as traverses all data in a cache for high selectivity data;
  2. Scan queries does not support transactional updates, no way to implement "select for update" query;
  3. No flexibility in returning fields (it returns just cache key-val pairs).

3) Leverage on partitioning.

Ignite provides an opportunity to define AffinityKey for a cache and optionally custom AffinityFunction

Pros is: 

  1. Speed up queries that conditions match to an affinity:
    1. ScanQuery have an attribute of partition number.
    2. SqlFieldsQuery calculates required partition by self.
  2. Custom affinity function can provide functional condition for data distribution.

Cons are: 

  1. Cache can have only one affinity function.
  2. Affinity function maps cache entries to partitions, so range queries won't win there.
  3. Custom affinity function can unevenly distribute data between nodes. For our example, custom affinity will distribute Goods data by date. But for some days there can be less data. So this function should be tested well before usage.

Description

 It is suggested to implement the proposal step by step in 4 sequential phases.

Phase 1 (Index Queries API). User will get:

  • Create fast index queries with the IndexQuery class in public API (IgniteCache, ClientCache);
  • Possibility to query indexes created with: QueryEntity.setIndexes, or QuerySqlField.index withCacheConfiguration.setIndexedTypes.

Phase 2 (Index Creation API). User will get:

  • Improved a public Java API for creating indexes (annotation SecondaryIndex instead of SqlQueryFields);
  • New Java API for dynamic index creation;
  • Opportunity to create functional indexes.

Phase 3 (Return Flexibility). User will get:

  • Improved the IndexQuery API to get only required cache parts (cache key, cache value, both, or indexed fields only). It will make queries even more fast if only partial info is required.

Phase 4 (Select for update support):

  • Transactional support for "select for update" queries

Phase 1. Index Queries API.

1) Queries API

In this step we will provide Index Query API for existing indexes. Currently Ignite supports 3 ways to declare indexes:

  1. Java API: CacheConfiguration.setQueryEntity.setIndexes(QueryIndex idxs... );
  2. Java API: CacheConfiguration.setIndexedTypes + annotation QuerySqlField.index;
  3. SQL API: Create index with SQL query.

For Java API (ways 1, 2) an index is described with (IgniteCache, CacheValue.class, fields, indexName).

For SQL API (way 3) an index is described with (Schema, Table, fields, indexName).

Index Query API will support those different index descriptions. API provides:

  1. Public class IndexQuery for building index query:
    1. it inherits public class Query<R> with R = Cache.Entry<K, V>;
    2. it accepts index descriptions at a constructor;
    3. it accepts multiple index conditions (joint with AND) at object methods.
  2. Ideally user provides index conditions that matches index fields and their order (ts desc, price asc) to leverage on an index structure.
  3. But for queries that doesn't match index Ignite will run the index scan find(null, null) and apply user conditions on it. Cases are:
    1. index conditions joint with OR;
    2. different sort order: idx(ts desc, price asc) but query(ts desc, price desc);
    3. Cache is wide and IndexScan on few fields is much cheaper than ScanQuery;
    4. predicate operation with a custom user condition.

Internal changes for the query are:

  1. Internal class QueryIndexDesc supports different ways of index description.
  2. Internal class IndexCondition, IndexConditions for building index queries
  3. New query type INDEX in the enum InternalGridCacheQueryType.

So this is how user will specify an index query:

// Creates an index query for an index created with Java API. 
// 1. Specify index description at constructor.
// 2. Index name is optional param, can try do best to find right index basing on specified Value.class in constructor and fields in conditions.
// 3. Index conditions (joint with AND operation) with methods.
QueryCursor<Cache.Entry<Long, Good>> cursor = ignite.cache("GOOD").query(
	IndexQuery.forType(Good.class, idxName?)  // idxName is optional.
		.gt("ts", lastMidnightTs)
		.lt("price", 123.0)
);

// Create index with SQL query: "CREATE INDEX GOOD_TS_DESC_PRICE_ASC_IDX on GOOD_TBL (ts DESC, price ASC)"
// 1. Table name should be specified because it is possible to have the same index name for different tables (e.g., __key_PK).
// 2. Index name is optional too (do our best to find right index to run the query).
QueryCursor<Cache.Entry<Long, Good>> cursor = ignite.cache("GOOD").query(
	IndexQuery.forTable("GOOD_TBL", "GOOD_TS_DESC_PRICE_ASC_IDX"?)  // idxName is optional.
		.gt("ts", lastMidnightTs)
		.lt("price", 123.0)
);

// The "predicate" operation can't find best index for a user query. So user must provide an index name for this operation. 
QueryCursor<Cache.Entry<Long, Good>> cursor = ignite.cache("GOOD").query(
	new IndexQuery<>("GOOD_TBL", "GOOD_TS_DESC_PRICE_ASC_IDX")
		.predicate((good) -> good.ts > lastMidnightTs || price > 100)
);


Classes to implement the API:

// Public packages.

// IndexQuery extends
public IndexQuery<K, V> extends Query<Cache.Entry<K, V>> {

	private List<IndexCondition> idxConds = new ArrayList<>();

	private QueryIndexDesc idxDesc;

	private IndexQuery(QueryIndexDesc idxDesc) {
		this.idxDesc = idxDesc;
	}

	public static IndexQuery forType(Class<?> valClass, String? idxName) {
		QueryIndexDesc idxDesc = new QueryIndexDesc(valClass, idxName);

		return new IndexQuery(idxDesc);		
	}

	public static IndexQuery forTable(String table, String? schema, String? idxName) {
		QueryIndexDesc idxDesc = new QueryIndexDesc(table, schema, idxName);

		return new IndexQuery(idxDesc);
	}

	public IndexQuery lt(String field, Object val) {
		IndexCondition cond = IndexConditions.lt(field, val));
		idxConds.add(cond);

		return this;
	}

	// Other methods are:
	// eq, notEq, gt, gte, lt, lte, between, in, notIn, min, max, predicate
}

// Internal packages.

/** Describes an index to query. */
class QueryIndexDesc implements Serializable {
	private @Nullable String idxName;
	private @Nullable String valClass;
	private @Nullable String tblName;
	private @Nullable String schemaName;
}

class IndexConditions {
	/** Find index keys that are less than a specified key. */
	public static IndexCondition lt(String field, Object val) {
		return new RangeIndexCondition(field, null, val, false, false);
	} 	

	// eq, notEq, gt, gte, lt, lte, between, in, notIn, min, max, predicate
}


abstract class IndexCondition extends Serializable {
	private final String field;
}

// min, max
class MinMaxIndexCondition extends IndexCondition {
	private final boolean max;
}

// gt, gte, lt, lte, between
class RangeIndexCondition extends IndexCondition {
	private final @Nullable Object lower;
	private final @Nullable Object upper;

	private final boolean lowerInclusive;
	private final boolean upperInclusive;
}

// in, notIn, eq, notEq
class InIndexCondition extends IndexCondition {
	private final Object[] vals;

	// Flag for not-in condition.
	private final boolean inverse;
}

// predicate
class PredicateIndexCondition extends IndexCondition {
	private IgnitePredicate<?> predicate;
}

2) Query Processing

Currently Ignite have infrastructure for processing index queries: GridIndexingManager, IndexingSpi, SpiQuery. So it is possible to use this code to implement processing of index queries: provide a default implementation for IndexingSpi and make IndexQuery as a wrapper on SpiQuery. But there are some issues:

  1. GridIndexingManager (and IndexingSpi) supports 3 methods: store, remove, query.
    1. Store/remove methods work with cache (K, V). Looks like it is implemented for storing the data in user defined structures, independent to Ignite storages. So SpiQuery aimed to query those user defined storages. 
    2. Ignite has the IndexProcessor that is responsible for storing / removing index entries, and it works directly with CacheDataRow.
    3. Our goal is run queries on indexes are handled with Ignite storages (including already existing indexes).
  2. Some users may provide a custom implementation of IndexingSpi by some reasons. Then user will have to choose between own implementation and new one.
  3. Name "SpiQuery" isn't obvious for a goal of querying secondary indexes.
  4. IndexingSpi.query function has the "cache" param. But cache actually is only a one of attributes describes an index. Also SQL indexes are not described with cache at all, but schema/table instead. Such method doesn't fit us well.

Then proposal is:

  1. Introduce new type of query - INDEX;
  2. Final query processing is performed within IndexQueryProcessor;
  3. Entrypoint for distributed index queries is the IndexQueryProcessor.queryDistributed method, it executes MapReduce query:
    1. the Map phase is node-local querying of index, it returns sorted data by definition;
    2. the Reduce phase is responsible for the Merge Sort of data from multiple nodes;
    3. for implementing it's suggested to extend the h2.twostep package - move basic functionality to the ignite-core module, then different query processors can extends them (h2, index query processor).
  4. Entrypoint for local query is the IndexQueryProcessor.queryLocal method. It accepts users IndexQueryand IndexingQueryFilter for filtering result cache entries (primary partition, MVCC, etc);
  5. The method predicate accepts a function, that should be deployed on other nodes with GridCacheDeploymentManager.


// Internal package.

public class IndexQueryProcessor extends GridProcessorAdapter {
	// Provides access to indexes.
	private final IndexProcessor processor;

	// Listen for messages from other nodes (index queries, fetch data)
	private final GridMessageListener idxQryLsnr;

	// Executor of map queries (index.find).
	private final IndexQueryMapExecutor mapExec;

	// Executor for reduce query (MergeSort).
	private final IndexQueryReduceExecutor rdcExec;

	// Entrypoint for distributed query.
    public Iterator<Cache.Entry<?,?>> queryDistributed(IndexQuery idxQuery) throws IgniteException {
		if (containsPredicate(idxQuery))
			deployPredicate(idxQuery);

		IndexQueryTwoStepQuery twoStepQry = prepareTwoStepQuery();

		return rdcExec.execute(twoStepQuery);
	}

	// Entrypoint for local query.
    public Iterator<Cache.Entry<?,?>> queryLocal(IndexQuery idxQuery, @Nullable IndexingQueryFilter filters) throws IgniteException {
		// 1. If user specified index name, then check a query:
		// - sort query fields in index key order;
		// - check that it's a valid index query: fields covers index keys (from left to right)
		// - fail otherwise.
		// 2. If user doesn't specified index name:
		// - get all indexes for specified cache and Value.class;
		// - find index by filtering by a query fields (index keys must be in the same order as specified query fields, try sort fields to match any index).
		// - validate index query (see validation steps from 1.)
		Index idx = index(idxQuery.desc());

		// 1. Parse index conditions.
		// 2. Validate index condition, index type.
		// 3. Maps index conditions to an index query methods.
		// 4. Perform index operations, get Cursor.
		GridCursor<IndexRow> cursor = query(idx, idxQry.conditions());

		// 1. Map IndexRow to Cache entry.
		// 2. Apply specified cache filters (primary partitions, MVCC versions, etc.)
		// 3. Wrap cursor to iterator and return.
		return map_and_filter(cursor, filters);
	}

	private GridCursor<IndexRow> query(Index idx, List<IndexCondition> conditions) {
		// eq(key) -> idx.findOne(key)
		// notEq(key) -> idx.find(null, null, current -> current != key)
		// gt(key) -> idx.find(key, null, current -> current != key)
		// gte(key) -> idx.find(key, null)
		// lt(key) -> idx.find(null, key, current -> current != key)
		// lte(key) -> idx.find(null, key)
		// between(left, right) -> idx.find(left, right), inclusive only
		// in(keys...) -> idx.findOne(keys[0]) && idx.findOne(keys[1]) ...
		// notIn(keys...) -> idx.find(null, null, current -> !keys.contains(current))
		// min() -> idx.findFirst()
		// max() -> idx.findLast()
		// predicate(p) -> idx.find(null, null, p)
	}
}

Phase 2. Index creation API.

Currently an index can be declared statically (with CacheConfiguration) or created dynamically only with SQL ddl query. Both ways doesn't support functional indexes. Then, implementation will have next steps:

  1. Provide a mechanism of creating index on distributed nodes (from CacheConfiguration, or dynamically):
    1. There is the function IgniteH2Indexing.dynamicIndexCreate that handles the SchemaIndexCreateOperation operation;
    2. Looks like non-SQL it can be done in this way too with a slight refactoring, and handling SQL and non-SQL indexes differently. Condition (QueryIndex.tableName == null) can be used to separate SQL and non-SQL indexes;
  2.  Introduce Functional indexes. Some assumptions are:
    1. Add new QueryIndexType - FUNCTIONAL_SORTED;
    2. Then we can use QueryIndex class with set function name as a field name in QueryIndex definition;
    3. Function is distributed between nodes with the PeerClassLoading feature as IgniteCallable;
  3. Provide a new IndexFactory, IndexDefinition for such indexes.

API for creating non-SQL indexes:

  1. On cache start: Introduce annotation SecondaryIndex:
    1. Actually it is alias for QuerySqlField to avoid the "Sql" prefix.
    2. But it can be used to annotated a function (the SQL annotation does not support it) in user class to declare functional index.
  2. Dynamically:
    1. Extend IgniteCache, ClientCache with method "createIndex" that accepts QueryIndex object.

User will apply SecondaryIndex on POJO field or function:

// Using annotation @SecondaryIndex that is actually alias for QuerySqlField.

public class Good {
	private long uid;
	@SecondaryIndex(descending = true)
	private long ts;
	private float price;
	private String title;

	@SecondaryIndex(name = "GOOD_DISCOUNT_PRICE_IDX")
	public float discontPrice() {
		return price * discont();
	}

	/** Function that depends on attributes like count of goods, geography, etc. */
	protected float discont();
}


Dynamic index creation is a new method in IgniteCache, ClientCache:

// Dynamic index creation.

QueryIndex qryIdx = new QueryIndex()
	.setName("GOOD_DISCOUNT_PRICE_IDX")
	.setFieldNames(Collections.singleton("discountPrice"))
	.setIndexType(QueryIndexType.FUNCTIONAL_SORTED);

cache.createIndex(qryIdx)

Phase 3. Flexibility of return.

Goal is to provide opportunity to a user to result partially and skip unneeded hops to a Cache:

  1. Cache Key only:
    1. If inline is enabled then cache key is stored within inlined fields. So there is no need to ask a Cache for entries at all;
    2. if inlined is disabled then there will be only CacheDataRow.key() call without triggering CacheDataRow.value().
  2. Cache Value only: if only value is required then trigger only CacheDataRow.value().
  3. Indexed fields only:
    1. if inline is enabled then just construct a new object (using reflection for user class), and fill it with inlined values.

User will specify a flag in IndexQuery: KeyOnly, ValueOnly, IndexFieldsOnly. Result for queries are:

  1. KeyOnly: QueryCursor<Cache.Entry<K, V>> where value is always NULL;
  2. ValueOnly: QueryCursor<Cache.Entry<K, V>> where key is always NULL;
  3. IndexFieldsOnly: QueryCursor<Cache.Entry<K, V>> where key is always NULL, value is an object with NULL in non-indexed fields.

Phase 4. Support of SELECT FOR UPDATE for Index queries.

Goal is to provide "select for update" query with transactional support for index queries:

  1. User provide:
    1. an index query
    2. an update function
    3. transaction parameters (concurrency, isolation);
  2. Ignite:
    1. start a transaction
    2. finds all cache entries matches for a query
    3. apply the update function for every cache row
    4. close transaction
    5. return to user cursor with updated cache rows.


Deep design of solution is TBD.


Risks and Assumptions

  1. Supporting SQL and non-SQL indexes on the same field - they must work equally.
  2. Rebuild, clear, defragmentation tasks, control.sh, system views, IndexRowCache, security checks, etc(?) for non-SQL indexes - all of the tooling should work correctly with non-SQL indexes.
  3. Need support for tracking index queries (security checks, tracing, query views).
  4. Transactional support for "update for select" should be discovered.
  5. There can be performance issues with IndexFieldsOnly queries due to reflection, or wrong inlining.


Tickets


  • No labels