...
ID | IEP-71 | ||||||
Author | |||||||
Sponsor | Maksim Timonin | ||||||
Created | 06.04.2021 | ||||||
StatusDRAFT |
|
Table of Contents |
---|
Ignite will provide an API for querying and creating secondary (non-SQL) indexes with next features:
Ignite has limitations in search by non-key fields. The proposal's goal is to overcome the issues described below.
...
Show this limitations with an example entity: Good(long uid, @Index long ts). Task is to get all cache entries that are created since the last midnight.
Code Block | ||
---|---|---|
| ||
FieldsQueryCursor<List<?>> result = cache.query( new SqlFieldsQuery("select _val from Good where ts > ?").setArgs(lastMidnightTs)); List<Good> goods = (List<Good>) result.getAll().get(0); |
...
Code Block | ||
---|---|---|
| ||
QueryCursor<Cache.Entry<Long, Good>> cursor = cache.query(new ScanQuery<>((k, v) -> v.ts > lastMidnightTs)); List<Cache.Entry<Long, Good>> result = cursor.getAll(); |
...
Ignite provides an opportunity to define AffinityKey for a cache and optionally custom AffinityFunction.
...
It is suggested to implement the proposal step by step in 4 sequential phases.
...
In this step we will provide Index Query API for existing indexes. Currently Ignite supports 3 ways to declare indexes:
...
Code Block | ||
---|---|---|
| ||
// Public packages. // IndexQuery extends public IndexQuery<K, V> extends Query<Cache.Entry<K, V>> { private List<IndexCondition> idxConds = new ArrayList<>(); private QueryIndexDesc idxDesc; private IndexQuery(QueryIndexDesc idxDesc) { this.idxDesc = idxDesc; } public static IndexQuery forType(Class<?> valClass, String? idxName) { QueryIndexDesc idxDesc = new QueryIndexDesc(valClass, idxName); return new IndexQuery(idxDesc); } public static IndexQuery forTable(String table, String? schema, String? idxName) { QueryIndexDesc idxDesc = new QueryIndexDesc(table, schema, idxName); return new IndexQuery(idxDesc); } public IndexQuery lt(String field, Object val) { IndexCondition cond = IndexConditions.lt(field, val)); idxConds.add(cond); return this; } // Other methods are: // eq, notEq, gt, gte, lt, lte, between, in, notIn, min, max, predicate } // Internal packages. /** Describes an index to query. */ class QueryIndexDesc implements Serializable { private @Nullable String idxName; private @Nullable String valClass; private @Nullable String tblName; private @Nullable String schemaName; } class IndexConditions { /** Find index keys that are less than a specified key. */ public static IndexCondition lt(String field, Object val) { return new RangeIndexCondition(field, null, val, false, false); } // eq, notEq, gt, gte, lt, lte, between, in, notIn, min, max, predicate } abstract class IndexCondition extends Serializable { private final String field; } // min, max class MinMaxIndexCondition extends IndexCondition { private final boolean max; } // gt, gte, lt, lte, between class RangeIndexCondition extends IndexCondition { private final @Nullable Object lower; private final @Nullable Object upper; private final boolean lowerInclusive; private final boolean upperInclusive; } // in, notIn, eq, notEq class InIndexCondition extends IndexCondition { private final Object[] vals; // Flag for not-in condition. private final boolean inverse; } // predicate class PredicateIndexCondition extends IndexCondition { private IgnitePredicate<?> predicate; } |
Currently Ignite have infrastructure for processing index queries: GridIndexingManager, IndexingSpi, SpiQuery. So it is possible to use this code to implement processing of index queries: provide a default implementation for IndexingSpi and make IndexQuery as a wrapper on SpiQuery. But there are some issues:
...
Code Block | ||
---|---|---|
| ||
// Internal package. public class IndexQueryProcessor extends GridProcessorAdapter { // Provides access to indexes. private final IndexProcessor processor; // Listen for messages from other nodes (index queries, fetch data) private final GridMessageListener idxQryLsnr; // Executor of map queries (index.find). private final IndexQueryMapExecutor mapExec; // Executor for reduce query (MergeSort). private final IndexQueryReduceExecutor rdcExec; // Entrypoint for distributed query. public Iterator<Cache.Entry<?,?>> queryDistributed(IndexQuery idxQuery) throws IgniteException { if (containsPredicate(idxQuery)) deployPredicate(idxQuery); IndexQueryTwoStepQuery twoStepQry = prepareTwoStepQuery(); return rdcExec.execute(twoStepQuery); } // Entrypoint for local query. public Iterator<Cache.Entry<?,?>> queryLocal(IndexQuery idxQuery, @Nullable IndexingQueryFilter filters) throws IgniteException { // 1. If user specified index name, then check a query: // - sort query fields in index key order; // - check that it's a valid index query: fields covers index keys (from left to right) // - fail otherwise. // 2. If user doesn't specified index name: // - get all indexes for specified cache and Value.class; // - find index by filtering by a query fields (index keys must be in the same order as specified query fields, try sort fields to match any index). // - validate index query (see validation steps from 1.) Index idx = index(idxQuery.desc()); // 1. Parse index conditions. // 2. Validate index condition, index type. // 3. Maps index conditions to an index query methods. // 4. Perform index operations, get Cursor. GridCursor<IndexRow> cursor = query(idx, idxQry.conditions()); // 1. Map IndexRow to Cache entry. // 2. Apply specified cache filters (primary partitions, MVCC versions, etc.) // 3. Wrap cursor to iterator and return. return map_and_filter(cursor, filters); } private GridCursor<IndexRow> query(Index idx, List<IndexCondition> conditions) { // eq(key) -> idx.findOne(key) // notEq(key) -> idx.find(null, null, current -> current != key) // gt(key) -> idx.find(key, null, current -> current != key) // gte(key) -> idx.find(key, null) // lt(key) -> idx.find(null, key, current -> current != key) // lte(key) -> idx.find(null, key) // between(left, right) -> idx.find(left, right), inclusive only // in(keys...) -> idx.findOne(keys[0]) && idx.findOne(keys[1]) ... // notIn(keys...) -> idx.find(null, null, current -> !keys.contains(current)) // min() -> idx.findFirst() // max() -> idx.findLast() // predicate(p) -> idx.find(null, null, p) } } |
Currently an index can be declared statically (with CacheConfiguration) or created dynamically only with SQL ddl query. Both ways doesn't support functional indexes. Then, implementation will have next steps:
...
Code Block | ||
---|---|---|
| ||
// Dynamic index creation. QueryIndex qryIdx = new QueryIndex() .setName("GOOD_DISCOUNT_PRICE_IDX") .setFieldNames(Collections.singleton("discountPrice")) .setIndexType(QueryIndexType.FUNCTIONAL_SORTED); cache.createIndex(qryIdx) |
Goal is to provide opportunity to a user to result partially and skip unneeded hops to a Cache:
...
Goal is to provide "select for update" query with transactional support for index queries:
...
Deep design of solution is TBD.