ID | IEP-71 | ||||||
Author | |||||||
Sponsor | |||||||
Created | 06.04.2021 | ||||||
Status |
|
Table of Contents |
---|
Ignite will provide an API for querying and creating secondary (non-SQL) indexes with next features:
Ignite has limitations in search by non-key fields. The proposal's goal is to overcome the issues described below.
Show this limitations with an example entity: Good(long uid, @Index long ts). Task is to get all cache entries that are created since the last midnight.
Code Block | ||
---|---|---|
| ||
FieldsQueryCursor<List<?>> result = cache.query( new SqlFieldsQuery("select _val from Good where ts > ?").setArgs(lastMidnightTs)); List<Good> goods = (List<Good>) result.getAll().get(0); |
Pros are:
Cons are:
Code Block | ||
---|---|---|
| ||
QueryCursor<Cache.Entry<Long, Good>> cursor = cache.query(new ScanQuery<>((k, v) -> v.ts > lastMidnightTs)); List<Cache.Entry<Long, Good>> result = cursor.getAll(); |
Pros are:
Cons are:
Ignite provides an opportunity to define AffinityKey for a cache and optionally custom AffinityFunction.
Pros is:
Cons are:
It is suggested to implement the proposal step by step in 4 sequential phases.
Phase 1 (Index Queries API). User will get:
Phase 2 (Index Creation API). User will get:
Phase 3 (Return Flexibility). User will get:
Phase 4 (Select for update support):
In this step we will provide Index Query API for existing indexes. Currently Ignite supports 3 ways to declare indexes:
For Java API (ways 1, 2) an index is described with (IgniteCache, CacheValue.class, fields, indexName).
...
Every index created with those API can be described with: CacheContext, cache value class / type, index name, fields
Index Query API will support those different index descriptions. API provides:
Internal changes for the query are:
Internal changes for the query are:
So this is how user will specify an index query:
Code Block | ||
---|---|---|
| ||
// Creates an index query for an index, created with Java API.specifying value class or value type: // 1. Specify index description at constructor. // 2. Index name is optional param, can try do best to find right index basing on specified Value.class in constructor and fields in conditionscriteria fields. // 3. Index conditionscriteria (joint with AND operation) with methods. Order of fields in criteria doesn't matter QueryCursor<Cache.Entry<Long, Good>> cursor = ignite.cache("GOOD").query( IndexQuery.forTypenew IndexQuery<Long, Good>(Good.class, idxName?) // idxName is optional. .setCriteria(gt("ts", lastMidnightTs) ., lt("price", 123.0)) ); // Create index with SQL query: "CREATE INDEX GOOD_TS_DESC_PRICE_ASC_IDX on GOOD_TBL (ts DESC, price ASC)" // 1. Table name should be specified because it is possible to have the same index name for different tables (e.g., __key_PK). // 2. Index name is optional too (do our best to find right index to run the query). QueryCursor<Cache.Entry<Long, Good>> cursor = ignite.cache("GOOD").query( IndexQuery.forTable("GOOD_TBL", "GOOD_TS_DESC_PRICE_ASC_IDX"?) // idxName is optional. .gt("ts", lastMidnightTs) .lt("price", 123.0) ); // The "predicate" operation can't find best index for a user query. So user must provide an index name for this operation. QueryCursor<Cache.Entry<Long, Good>> cursor = ignite.cache("GOOD").query( new IndexQuery<>("GOOD_TBL", "GOOD_TS_DESC_PRICE_ASC_IDX") .predicate((good) -> good.ts > lastMidnightTs || price > 100) ); |
Classes to implement the API:
Code Block | ||
---|---|---|
| ||
// Public packages.
// IndexQuery extends
public IndexQuery<K, V> extends Query<Cache.Entry<K, V>> {
private List<IndexCondition> idxConds = new ArrayList<>();
private QueryIndexDesc idxDesc;
private IndexQuery(QueryIndexDesc idxDesc) {
this.idxDesc = idxDesc;
}
public static IndexQuery forType(Class<?> valClass, String? idxName) {
QueryIndexDesc idxDesc = new QueryIndexDesc(valClass, idxName);
return new IndexQuery(idxDesc);
}
public static IndexQuery forTable(String table, String? schema, String? idxName) {
QueryIndexDesc idxDesc = new QueryIndexDesc(table, schema, idxName);
return new IndexQuery(idxDesc);
}
public IndexQuery lt(String field, Object val) {
IndexCondition cond = IndexConditions.lt(field, val));
idxConds.add(cond);
return this;
}
// Other methods are:
// eq, notEq, gt, gte, lt, lte, between, in, notIn, min, max, predicate
}
// Internal packages.
/** Describes an index to query. */
class QueryIndexDesc implements Serializable {
private @Nullable String idxName;
private @Nullable String valClass;
private @Nullable String tblName;
private @Nullable String schemaName;
}
class IndexConditions {
/** Find index keys that are less than a specified key. */
public static IndexCondition lt(String field, Object val) {
return new RangeIndexCondition(field, null, val, false, false);
}
// eq, notEq, gt, gte, lt, lte, between, in, notIn, min, max, predicate
}
abstract class IndexCondition extends Serializable {
private final String field;
}
// min, max
class MinMaxIndexCondition extends IndexCondition {
private final boolean max;
}
// gt, gte, lt, lte, between
class RangeIndexCondition extends IndexCondition {
private final @Nullable Object lower;
private final @Nullable Object upper;
private final boolean lowerInclusive;
private final boolean upperInclusive;
}
// in, notIn, eq, notEq
class InIndexCondition extends IndexCondition {
private final Object[] vals;
// Flag for not-in condition.
private final boolean inverse;
}
// predicate
class PredicateIndexCondition extends IndexCondition {
private IgnitePredicate<?> predicate;
}
|
Currently Ignite have infrastructure for processing index queries: GridIndexingManager, IndexingSpi, SpiQuery. So it is possible to use this code to implement processing of index queries: provide a default implementation for IndexingSpi and make IndexQuery as a wrapper on SpiQuery. But there are some issues:
QueryCursor<Cache.Entry<Long, Good>> cursor = ignite.cache("GOOD").query(
new IndexQuery<Long, Good>("GOOD_TYPE", idxName?) // idxName is optional.
.setCriteria(gt("ts", lastMidnightTs), lt("price", 123.0))
);
|
Classes to implement the API:
Code Block | ||
---|---|---|
| ||
// Public packages.
// IndexQuery extends
public IndexQuery<K, V> extends Query<Cache.Entry<K, V>> {
private List<IndexCriteria> criteria = new ArrayList<>();
// Index description.
private @Nullable String idxName;
private String valType;
public IndexQuery lt(String field, Object val) {
criteria.add(IndexCriteriaBuilder.lt(field, val));
return this;
}
// Other methods are:
// eq, notEq, gt, gte, lt, lte, between, in, notIn, min, max, predicate
}
// Internal packages.
class IndexCriteriaBuilder {
public static IndexCriteria lt(String field, Object val);
}
abstract class IndexCriteria extends Serializable {
private final List<String> fields;
}
// min, max
class MinMaxIndexCriteria extends IndexCondition {
private final boolean max;
}
// gt, gte, lt, lte, between
class RangeIndexCriteria extends IndexCriteria {
private final @Nullable Object lower;
private final @Nullable Object upper;
private final boolean lowerInclusive;
private final boolean upperInclusive;
}
// in, notIn, eq, notEq
class InIndexCriteria extends IndexCriteria {
private final Object[] vals;
// Flag for not-in condition.
private final boolean inverse;
}
// predicate
class PredicateIndexCriteria extends IndexCriteria {
private IgnitePredicate<?> predicate;
}
|
Currently Ignite have infrastructure for processing index queries: GridIndexingManager, IndexingSpi, SpiQuery. So it is possible to use this code to implement processing of index queries: provide a default implementation for IndexingSpi and make IndexQuery as a wrapper on SpiQuery. But there are some issues:
Then proposal is:
Rules to write criteria:
Rules to choose index:
Code Block | ||
---|---|---|
| ||
// Internal package.
public class IndexQueryProcessor extends GridProcessorAdapter {
// Provides access to indexes | ||
Code Block | ||
| ||
// Internal package. public class IndexQueryProcessor extends GridProcessorAdapter { // Provides access to indexes. private final IndexProcessor processor; // Listen for messages from other nodes (index queries, fetch data) private final GridMessageListener idxQryLsnr; // Executor of map queries (index.find). private final IndexQueryMapExecutor mapExec; // Executor for reduce query (MergeSort). private final IndexQueryReduceExecutorIndexProcessor rdcExecprocessor; // Entrypoint for distributedlocal query. public Iterator<Cache.Entry<?,?>> queryDistributedqueryLocal(IndexQuery idxQuery, @Nullable IndexingQueryFilter filters) throws IgniteException { if (containsPredicate(idxQuery)) deployPredicate(idxQuery); IndexQueryTwoStepQuery twoStepQry = prepareTwoStepQuery(); return rdcExec.execute(twoStepQuery); } // Entrypoint for local query. public Iterator<Cache.Entry<?,?>> queryLocal(IndexQuery idxQuery, @Nullable IndexingQueryFilter filters) throws IgniteException {// 1. If user specified index name, then check a query: // - sort query fields in index key order; // - check that it's a valid index query: fields covers index keys (from left to right) // - fail otherwise. // 12. If user doesn't specified index name, then check a query: // - sortget all queryindexes fieldsfor inspecified indexcache keyand orderValue.class; // - check that it's a valid index query: fields covers index keys (from left to right)find index by filtering by a query fields (index keys must be in the same order as specified query fields, try sort fields to match any index). // - fail otherwise. // 2. If user doesn't specified index name: // - get all indexes for specified cache and Value.class; validate index query (see validation steps from 1.) Index idx = index(idxQuery.desc()); // -1. findParse index by filtering by a query fields (index keys must be in the same order as specified query fields, try sort fields to match any index)conditions. // 2. Validate index condition, index type. // 3. Maps index conditions to an index query methods. // -4. validatePerform index queryoperations, (see validation steps from 1.)get Cursor. IndexGridCursor<IndexRow> idxcursor = index(idxQuery.descquery(idx, idxQry.conditions()); // 1. Parse index conditions. Map IndexRow to Cache entry. // 2. Apply specified cache filters (primary partitions, MVCC versions, etc.) // 23. Wrap Validatecursor indexto condition,iterator indexand typereturn. // 3. Maps index conditions to an index query methods. return map_and_filter(cursor, filters); } private GridCursor<IndexRow> query(Index idx, List<IndexCondition> conditions) { // eq(key) -> idx.findOne(key) // 4. Perform index operations, get Cursor. GridCursor<IndexRow> cursor = query(idx, idxQry.conditions()); notEq(key) -> idx.find(null, null, current -> current != key) // 1. Map IndexRow to Cache entry.gt(key) -> idx.find(key, null, current -> current != key) // 2. Apply specified cache filters (primary partitions, MVCC versions, etc.gte(key) -> idx.find(key, null) // 3. Wrap cursor to iterator and return. return map_and_filter(cursor, filters); } private GridCursor<IndexRow> query(Index idx, List<IndexCondition> conditions) { lt(key) -> idx.find(null, key, current -> current != key) // lte(key) -> idx.find(null, key) // between(left, right) -> idx.find(left, right), inclusive only // eq(key) ->in(keys...) -> idx.findOne(keys[0]) && idx.findOne(key)keys[1]) ... // notEq(keynotIn(keys...) -> idx.find(null, null, current -> current != key!keys.contains(current)) // gtmin(key) -> idx.find(key, null, current -> current != keyfindFirst() // gtemax(key) -> idx.findfindLast(key, null) // ltpredicate(keyp) -> idx.find(null, key, current -> current != key) // lte(key) -> idx.find(null, key) // between(left, right) -> idx.find(left, right), inclusive only // in(keys...) -> idx.findOne(keys[0]) && idx.findOne(keys[1]) ... // notIn(keys...) -> idx.find(null, null, current -> !keys.contains(current)) // min() -> idx.findFirst() // max() -> idx.findLast() // predicate(p) -> idx.find(null, null, p) } } |
...
null, p)
}
} |
For queries that doesn't match index Ignite will run the index scan find(null, null) and apply user conditions on it. Cases are:
IndexName is optional. If it's not specified than take PK index of specified table.
Code Block | ||
---|---|---|
| ||
// The "predicate" operation can't find best index for a user query. So user must provide an index name for this operation.
QueryCursor<Cache.Entry<Long, Good>> cursor = ignite.cache("GOOD").query(
new IndexQuery<>("GOOD_TBL", "GOOD_TS_DESC_PRICE_ASC_IDX")
.setFilter((good) -> good.ts > lastMidnightTs || price > 100)
);
|
Currently an index can be declared statically (with CacheConfiguration) or created dynamically only with SQL ddl query. Both ways doesn't support functional indexes. Then, implementation will have next steps:
API for creating non-SQL indexes:
User will apply SecondaryIndex on POJO field or function:
Code Block | ||
---|---|---|
| ||
// Using annotation @SecondaryIndex that is actually alias for QuerySqlField. public class Good { private long uid; @SecondaryIndex(descending = true) private long ts; private float price; private String title; @SecondaryIndex(name = "GOOD_DISCOUNT_PRICE_IDX") public float discontPrice() { return price * discont(); } /** Function that depends on attributes like count of goods, geography, etc. */ protected float discont(); } |
Dynamic index creation is a new method in IgniteCache, ClientCache:
Code Block | ||
---|---|---|
| ||
// Dynamic index creation. QueryIndex qryIdx = new QueryIndex() .setName("GOOD_DISCOUNT_PRICE_IDX") .setFieldNames(Collections.singleton("discountPrice")) .setIndexType(QueryIndexType.FUNCTIONAL_SORTED); cache.createIndex(qryIdx) |
...
Goal is to provide opportunity to a user to result partially and skip unneeded hops to a Cache:
User will specify a flag in IndexQuery: KeyOnly, ValueOnly, IndexFieldsOnly. Result for queries are:
...
Goal is to provide "select for update" query with transactional support for index queries:
Deep design of solution is TBD.