ID | IEP-71 |
Author | |
Sponsor | Maksim Timonin |
Created | 06.04.2021 |
Status | ACTIVE |
Ignite will provide an API for querying and creating secondary (non-SQL) indexes with next features:
Ignite has limitations in search by non-key fields. The proposal's goal is to overcome the issues described below.
Show this limitations with an example entity: Good(long uid, @Index long ts). Task is to get all cache entries that are created since the last midnight.
FieldsQueryCursor<List<?>> result = cache.query( new SqlFieldsQuery("select _val from Good where ts > ?").setArgs(lastMidnightTs)); List<Good> goods = (List<Good>) result.getAll().get(0);
Pros are:
Cons are:
QueryCursor<Cache.Entry<Long, Good>> cursor = cache.query(new ScanQuery<>((k, v) -> v.ts > lastMidnightTs)); List<Cache.Entry<Long, Good>> result = cursor.getAll();
Pros are:
Cons are:
Ignite provides an opportunity to define AffinityKey for a cache and optionally custom AffinityFunction.
Pros is:
Cons are:
It is suggested to implement the proposal step by step in 4 sequential phases.
Phase 1 (Index Queries API). User will get:
Phase 2 (Index Creation API). User will get:
Phase 3 (Return Flexibility). User will get:
Phase 4 (Select for update support):
In this step we will provide Index Query API for existing indexes. Currently Ignite supports 3 ways to declare indexes:
For Java API (ways 1, 2) an index is described with (IgniteCache, CacheValue.class, fields, indexName).
For SQL API (way 3) an index is described with (Schema, Table, fields, indexName).
Index Query API will support those different index descriptions. API provides:
Internal changes for the query are:
So this is how user will specify an index query:
// Creates an index query for an index created with Java API. // 1. Specify index description at constructor. // 2. Index name is optional param, can try do best to find right index basing on specified Value.class in constructor and fields in conditions. // 3. Index conditions (joint with AND operation) with methods. QueryCursor<Cache.Entry<Long, Good>> cursor = ignite.cache("GOOD").query( IndexQuery.forType(Good.class, idxName?) // idxName is optional. .gt("ts", lastMidnightTs) .lt("price", 123.0) ); // Create index with SQL query: "CREATE INDEX GOOD_TS_DESC_PRICE_ASC_IDX on GOOD_TBL (ts DESC, price ASC)" // 1. Table name should be specified because it is possible to have the same index name for different tables (e.g., __key_PK). // 2. Index name is optional too (do our best to find right index to run the query). QueryCursor<Cache.Entry<Long, Good>> cursor = ignite.cache("GOOD").query( IndexQuery.forTable("GOOD_TBL", "GOOD_TS_DESC_PRICE_ASC_IDX"?) // idxName is optional. .gt("ts", lastMidnightTs) .lt("price", 123.0) ); // The "predicate" operation can't find best index for a user query. So user must provide an index name for this operation. QueryCursor<Cache.Entry<Long, Good>> cursor = ignite.cache("GOOD").query( new IndexQuery<>("GOOD_TBL", "GOOD_TS_DESC_PRICE_ASC_IDX") .predicate((good) -> good.ts > lastMidnightTs || price > 100) );
Classes to implement the API:
// Public packages. // IndexQuery extends public IndexQuery<K, V> extends Query<Cache.Entry<K, V>> { private List<IndexCondition> idxConds = new ArrayList<>(); private QueryIndexDesc idxDesc; private IndexQuery(QueryIndexDesc idxDesc) { this.idxDesc = idxDesc; } public static IndexQuery forType(Class<?> valClass, String? idxName) { QueryIndexDesc idxDesc = new QueryIndexDesc(valClass, idxName); return new IndexQuery(idxDesc); } public static IndexQuery forTable(String table, String? schema, String? idxName) { QueryIndexDesc idxDesc = new QueryIndexDesc(table, schema, idxName); return new IndexQuery(idxDesc); } public IndexQuery lt(String field, Object val) { IndexCondition cond = IndexConditions.lt(field, val)); idxConds.add(cond); return this; } // Other methods are: // eq, notEq, gt, gte, lt, lte, between, in, notIn, min, max, predicate } // Internal packages. /** Describes an index to query. */ class QueryIndexDesc implements Serializable { private @Nullable String idxName; private @Nullable String valClass; private @Nullable String tblName; private @Nullable String schemaName; } class IndexConditions { /** Find index keys that are less than a specified key. */ public static IndexCondition lt(String field, Object val) { return new RangeIndexCondition(field, null, val, false, false); } // eq, notEq, gt, gte, lt, lte, between, in, notIn, min, max, predicate } abstract class IndexCondition extends Serializable { private final String field; } // min, max class MinMaxIndexCondition extends IndexCondition { private final boolean max; } // gt, gte, lt, lte, between class RangeIndexCondition extends IndexCondition { private final @Nullable Object lower; private final @Nullable Object upper; private final boolean lowerInclusive; private final boolean upperInclusive; } // in, notIn, eq, notEq class InIndexCondition extends IndexCondition { private final Object[] vals; // Flag for not-in condition. private final boolean inverse; } // predicate class PredicateIndexCondition extends IndexCondition { private IgnitePredicate<?> predicate; }
Currently Ignite have infrastructure for processing index queries: GridIndexingManager, IndexingSpi, SpiQuery. So it is possible to use this code to implement processing of index queries: provide a default implementation for IndexingSpi and make IndexQuery as a wrapper on SpiQuery. But there are some issues:
Then proposal is:
// Internal package. public class IndexQueryProcessor extends GridProcessorAdapter { // Provides access to indexes. private final IndexProcessor processor; // Listen for messages from other nodes (index queries, fetch data) private final GridMessageListener idxQryLsnr; // Executor of map queries (index.find). private final IndexQueryMapExecutor mapExec; // Executor for reduce query (MergeSort). private final IndexQueryReduceExecutor rdcExec; // Entrypoint for distributed query. public Iterator<Cache.Entry<?,?>> queryDistributed(IndexQuery idxQuery) throws IgniteException { if (containsPredicate(idxQuery)) deployPredicate(idxQuery); IndexQueryTwoStepQuery twoStepQry = prepareTwoStepQuery(); return rdcExec.execute(twoStepQuery); } // Entrypoint for local query. public Iterator<Cache.Entry<?,?>> queryLocal(IndexQuery idxQuery, @Nullable IndexingQueryFilter filters) throws IgniteException { // 1. If user specified index name, then check a query: // - sort query fields in index key order; // - check that it's a valid index query: fields covers index keys (from left to right) // - fail otherwise. // 2. If user doesn't specified index name: // - get all indexes for specified cache and Value.class; // - find index by filtering by a query fields (index keys must be in the same order as specified query fields, try sort fields to match any index). // - validate index query (see validation steps from 1.) Index idx = index(idxQuery.desc()); // 1. Parse index conditions. // 2. Validate index condition, index type. // 3. Maps index conditions to an index query methods. // 4. Perform index operations, get Cursor. GridCursor<IndexRow> cursor = query(idx, idxQry.conditions()); // 1. Map IndexRow to Cache entry. // 2. Apply specified cache filters (primary partitions, MVCC versions, etc.) // 3. Wrap cursor to iterator and return. return map_and_filter(cursor, filters); } private GridCursor<IndexRow> query(Index idx, List<IndexCondition> conditions) { // eq(key) -> idx.findOne(key) // notEq(key) -> idx.find(null, null, current -> current != key) // gt(key) -> idx.find(key, null, current -> current != key) // gte(key) -> idx.find(key, null) // lt(key) -> idx.find(null, key, current -> current != key) // lte(key) -> idx.find(null, key) // between(left, right) -> idx.find(left, right), inclusive only // in(keys...) -> idx.findOne(keys[0]) && idx.findOne(keys[1]) ... // notIn(keys...) -> idx.find(null, null, current -> !keys.contains(current)) // min() -> idx.findFirst() // max() -> idx.findLast() // predicate(p) -> idx.find(null, null, p) } }
Currently an index can be declared statically (with CacheConfiguration) or created dynamically only with SQL ddl query. Both ways doesn't support functional indexes. Then, implementation will have next steps:
API for creating non-SQL indexes:
User will apply SecondaryIndex on POJO field or function:
// Using annotation @SecondaryIndex that is actually alias for QuerySqlField. public class Good { private long uid; @SecondaryIndex(descending = true) private long ts; private float price; private String title; @SecondaryIndex(name = "GOOD_DISCOUNT_PRICE_IDX") public float discontPrice() { return price * discont(); } /** Function that depends on attributes like count of goods, geography, etc. */ protected float discont(); }
Dynamic index creation is a new method in IgniteCache, ClientCache:
// Dynamic index creation. QueryIndex qryIdx = new QueryIndex() .setName("GOOD_DISCOUNT_PRICE_IDX") .setFieldNames(Collections.singleton("discountPrice")) .setIndexType(QueryIndexType.FUNCTIONAL_SORTED); cache.createIndex(qryIdx)
Goal is to provide opportunity to a user to result partially and skip unneeded hops to a Cache:
User will specify a flag in IndexQuery: KeyOnly, ValueOnly, IndexFieldsOnly. Result for queries are:
Goal is to provide "select for update" query with transactional support for index queries:
Deep design of solution is TBD.