...
ID | IEP-71 | ||||||
Author | |||||||
Sponsor | |||||||
Created | 06.04.2021 | ||||||
Status |
|
...
...
For Java API (ways 1, 2) an index is described with (IgniteCache, CacheValue.class, fields, indexName).
For SQL API (way 3) an index is described with (Schema, Table, fields, indexName).Every index created with those API can be described with: CacheContext, cache value class / type, index name, fields
Index Query API will support those different index descriptions. API provides:
Internal changes for the query are:
Internal changes for the query are:
So this is how user So this is how user will specify an index query:
Code Block | ||
---|---|---|
| ||
// Creates an index query for an index, createdspecifying withvalue Java API.class or value type: // 1. Specify index description at constructor. // 2. Index name is optional param, can try do best to find right index basing on specified Value.class in constructor andcriteria fields in conditions. // 3. Index conditionscriteria (joint with AND operation) with methods. Order of fields in criteria doesn't matter QueryCursor<Cache.Entry<Long, Good>> cursor = ignite.cache("GOOD").query( IndexQuery.forTypenew IndexQuery<Long, Good>(Good.class, idxName?) // idxName is optional. .setCriteria(gt("ts", lastMidnightTs) ., lt("price", 123.0)) ); // Create index with SQL query: "CREATE INDEX GOOD_TS_DESC_PRICE_ASC_IDX on GOOD_TBL (ts DESC, price ASC)" // 1. Table name should be specified because it is possible to have the same index name for different tables (e.g., __key_PK). // 2. Index name is optional too (do our best to find right index to run the query). QueryCursor<Cache.Entry<Long, Good>> cursor = ignite.cache("GOOD").query( IndexQuery.forTable("GOOD_TBL", "GOOD_TS_DESC_PRICE_ASC_IDX"?) // idxName is optional. .gt("ts", lastMidnightTs) .lt("price", 123.0) ); // The "predicate" operation can't find best index for a user query. So user must provide an index name for this operation. QueryCursor<Cache.Entry<Long, Good>> cursor = ignite.cache("GOOD").query( new IndexQuery<>("GOOD_TBL", "GOOD_TS_DESC_PRICE_ASC_IDX") .predicate((good) -> good.ts > lastMidnightTs || price > 100) ); |
Classes to implement the API:
QueryCursor<Cache.Entry<Long, Good>> cursor = ignite.cache("GOOD").query(
new IndexQuery<Long, Good>("GOOD_TYPE", idxName?) // idxName is optional.
.setCriteria(gt("ts", lastMidnightTs), lt("price", 123.0))
);
|
Classes to implement the API:
Code Block | ||
---|---|---|
| ||
// Public packages.
// IndexQuery extends
public IndexQuery<K, V> extends Query<Cache.Entry<K, V>> {
private List<IndexCriteria> criteria = new ArrayList<>();
// Index description.
private @Nullable String idxName;
private String valType;
public IndexQuery lt(String field, Object val) {
criteria.add(IndexCriteriaBuilder.lt(field, val));
return this;
}
// Other methods are:
// eq, notEq, gt, gte, lt, lte, between, in, notIn, min, max, predicate
}
// Internal packages.
class IndexCriteriaBuilder {
public static IndexCriteria lt(String field, Object val);
}
abstract class IndexCriteria extends Serializable {
private final List<String> fields;
}
// min, max
class MinMaxIndexCriteria extends IndexCondition {
private final boolean max;
}
// gt, gte, lt, lte, between
class RangeIndexCriteria extends IndexCriteria {
private final @Nullable Object lower;
private final @Nullable Object upper;
private final boolean lowerInclusive;
private final boolean upperInclusive;
}
// in, notIn, eq, notEq
class InIndexCriteria extends IndexCriteria {
private final Object[] vals;
// Flag for not-in condition.
private final boolean inverse;
}
// predicate
class PredicateIndexCriteria extends IndexCriteria {
private IgnitePredicate<?> predicate;
}
| ||
Code Block | ||
| ||
// Public packages.
// IndexQuery extends
public IndexQuery<K, V> extends Query<Cache.Entry<K, V>> {
private List<IndexCondition> idxConds = new ArrayList<>();
private QueryIndexDesc idxDesc;
private IndexQuery(QueryIndexDesc idxDesc) {
this.idxDesc = idxDesc;
}
public static IndexQuery forType(Class<?> valClass, String? idxName) {
QueryIndexDesc idxDesc = new QueryIndexDesc(valClass, idxName);
return new IndexQuery(idxDesc);
}
public static IndexQuery forTable(String table, String? schema, String? idxName) {
QueryIndexDesc idxDesc = new QueryIndexDesc(table, schema, idxName);
return new IndexQuery(idxDesc);
}
public IndexQuery lt(String field, Object val) {
IndexCondition cond = IndexConditions.lt(field, val));
idxConds.add(cond);
return this;
}
// Other methods are:
// eq, notEq, gt, gte, lt, lte, between, in, notIn, min, max, predicate
}
// Internal packages.
/** Describes an index to query. */
class QueryIndexDesc implements Serializable {
private @Nullable String idxName;
private @Nullable String valClass;
private @Nullable String tblName;
private @Nullable String schemaName;
}
class IndexConditions {
/** Find index keys that are less than a specified key. */
public static IndexCondition lt(String field, Object val) {
return new RangeIndexCondition(field, null, val, false, false);
}
// eq, notEq, gt, gte, lt, lte, between, in, notIn, min, max, predicate
}
abstract class IndexCondition extends Serializable {
private final String field;
}
// min, max
class MinMaxIndexCondition extends IndexCondition {
private final boolean max;
}
// gt, gte, lt, lte, between
class RangeIndexCondition extends IndexCondition {
private final @Nullable Object lower;
private final @Nullable Object upper;
private final boolean lowerInclusive;
private final boolean upperInclusive;
}
// in, notIn, eq, notEq
class InIndexCondition extends IndexCondition {
private final Object[] vals;
// Flag for not-in condition.
private final boolean inverse;
}
// predicate
class PredicateIndexCondition extends IndexCondition {
private IgnitePredicate<?> predicate;
}
|
Currently Ignite have infrastructure for processing index queries: GridIndexingManager, IndexingSpi, SpiQuery. So it is possible to use this code to implement processing of index queries: provide a default implementation for IndexingSpi and make IndexQuery as a wrapper on SpiQuery. But there are some issues:
...
Rules to write criteria:
Rules to choose index:
Code Block | ||
---|---|---|
| ||
// Internal package. public class IndexQueryProcessor extends GridProcessorAdapter { // Provides access to indexes. private final IndexProcessor processor; // Listen for messages from other nodes (index queries, fetch data) private final GridMessageListener idxQryLsnr; // Executor of map queries (index.find). private final IndexQueryMapExecutor mapExec; // Executor for reduce query (MergeSort). private final IndexQueryReduceExecutor rdcExec; // Entrypoint for distributed query. public Iterator<Cache.Entry<?,?>> queryDistributed(IndexQuery idxQuery) throws IgniteException { if (containsPredicate(idxQuery)) deployPredicate(idxQuery); IndexQueryTwoStepQuery twoStepQry = prepareTwoStepQuery(); return rdcExec.execute(twoStepQuery); } // Entrypoint for local query. public Iterator<Cache.Entry<?,?>> queryLocal(IndexQuery idxQuery, @Nullable IndexingQueryFilter filters) throws IgniteException { // 1. If user specified index name, then check a query:// Entrypoint for local query. public Iterator<Cache.Entry<?,?>> queryLocal(IndexQuery idxQuery, @Nullable IndexingQueryFilter filters) throws IgniteException { // 1. If user specified index name, then check a query: // - sort query fields in index key order; // - check that it's a valid index query: fields covers index keys (from left to right) // - fail otherwise. // 2. If user doesn't specified index name: // - get all indexes for specified cache and Value.class; // - find index by filtering by a query fields (index keys must be in the same order as specified query fields, try sort fields to match any index). // - sortvalidate index query fields in index key order; (see validation steps from 1.) Index idx = index(idxQuery.desc()); // -1. check that it's a valid index query: fields covers index keys (from left to right)Parse index conditions. // 2. Validate index condition, index type. // - fail otherwise 3. Maps index conditions to an index query methods. // 2. If user doesn't specified index name:4. Perform index operations, get Cursor. GridCursor<IndexRow> cursor = query(idx, idxQry.conditions()); // -1. getMap allIndexRow indexesto for specified cache and Value.class;Cache entry. // -2. findApply indexspecified bycache filtering by a query fields (index keys must be in the same order as specified query fields, try sort fields to match any index). // - validate index query (see validation steps from 1.) Index idx = index(idxQuery.desc()); // 1. Parse index conditions. // 2. Validate index condition, index type. // 3. Maps index conditions to an index query methods. // 4. Perform index operations, get Cursor. GridCursor<IndexRow> cursor = query(idx, idxQry.conditions()); // 1. Map IndexRow to Cache entry. // 2. Apply specified cache filters (primary partitions, MVCC versions, etc.) // 3. Wrap cursor to iterator and return. return map_and_filter(cursor, filters); } private GridCursor<IndexRow> query(Index idx, List<IndexCondition> conditions) { // eq(key) -> idx.findOne(key) // notEq(key) -> idx.find(null, null, current -> current != key) // gt(key) -> idx.find(key, null, current -> current != key) // gte(key) -> idx.find(key, null) // lt(key) -> idx.find(null, key, current -> current != key) // lte(key) -> idx.find(null, key) // between(left, right) -> idx.find(left, right), inclusive only // in(keys...) -> idx.findOne(keys[0]) && idx.findOne(keys[1]) ... // notIn(keys...) -> idx.find(null, null, current -> !keys.contains(current)) // min() -> idx.findFirst() // max() -> idx.findLast() // predicate(p) -> idx.find(null, null, p) } } |
...
filters (primary partitions, MVCC versions, etc.)
// 3. Wrap cursor to iterator and return.
return map_and_filter(cursor, filters);
}
private GridCursor<IndexRow> query(Index idx, List<IndexCondition> conditions) {
// eq(key) -> idx.findOne(key)
// notEq(key) -> idx.find(null, null, current -> current != key)
// gt(key) -> idx.find(key, null, current -> current != key)
// gte(key) -> idx.find(key, null)
// lt(key) -> idx.find(null, key, current -> current != key)
// lte(key) -> idx.find(null, key)
// between(left, right) -> idx.find(left, right), inclusive only
// in(keys...) -> idx.findOne(keys[0]) && idx.findOne(keys[1]) ...
// notIn(keys...) -> idx.find(null, null, current -> !keys.contains(current))
// min() -> idx.findFirst()
// max() -> idx.findLast()
// predicate(p) -> idx.find(null, null, p)
}
} |
For queries that doesn't match index Ignite will run the index scan find(null, null) and apply user conditions on it. Cases are:
IndexName is optional. If it's not specified than take PK index of specified table.
Code Block | ||
---|---|---|
| ||
// The "predicate" operation can't find best index for a user query. So user must provide an index name for this operation.
QueryCursor<Cache.Entry<Long, Good>> cursor = ignite.cache("GOOD").query(
new IndexQuery<>("GOOD_TBL", "GOOD_TS_DESC_PRICE_ASC_IDX")
.setFilter((good) -> good.ts > lastMidnightTs || price > 100)
);
|
Currently an index can be declared statically (with CacheConfiguration) or created dynamically only with SQL ddl query. Both ways doesn't support functional indexes. Then, implementation will have next steps:
...
Code Block | ||
---|---|---|
| ||
// Dynamic index creation. QueryIndex qryIdx = new QueryIndex() .setName("GOOD_DISCOUNT_PRICE_IDX") .setFieldNames(Collections.singleton("discountPrice")) .setIndexType(QueryIndexType.FUNCTIONAL_SORTED); cache.createIndex(qryIdx) |
...
Goal is to provide opportunity to a user to result partially and skip unneeded hops to a Cache:
...
...
Goal is to provide "select for update" query with transactional support for index queries:
...