Note, this IEP is obsolete and kept for historical purposes. It was superseded by IEP-37: New query execution engine
ID | IEP-33 |
Author | |
Sponsor | |
Created | 13 Mar 2018 |
Status | |
GreyDRAFT
Motivation
...
- Partition reservation - all necessary partition should be reserved and released properly for all stages
- Cancellation - it should be possible to cancel any query type
- Streaming - it should be possible to send many packets to destination nodes in advance because
- Reactivity - all query stages should operate in non-blocking fashing, no waits should occur inside query pool threads
- Manageability - it should be possible to get clear picture of query plan and influence some stages with query hints
- Pipeline-agnostic - design should allow us to have both classical ("volcano") and code-generated pipelines in future
- Result-agnostic - it should be possible to work with both heap, offheap and out-process intermediate results
- Abstract out source nature - for future extensibility (e.g. "EXTERNAL TABLE", foreign data wrappers)
"Source" - fundamental concept. Types:
- Local - getting local node's data
- Ephemeral - any kind of temporal data retrieved from anywhere. This might be: intermediate query results, results of non-correlated subquery, results of REPLICATED LEFT JOIN PARTITIONED)
Code Block |
---|
language | java |
---|
title | Message model |
---|
|
class QueryRequest {
QueryId queryId; // Global query ID
Collection<UUID> cacheDescriptorIds; // IDs of the caches being queried
Collection<UUID> nodeIds; // IDs of all nodes participating in the query
AffinityTopologyVersion topVer; // Topology version
SqlSchemaVersion schemaVer; // Expected schema version
MvccSnapshot mvccSnapshot; // MVCC snapshot
}
class QueryFragment {
QueryId queryId; // Global query ID
FragmentId fragmentId; // Unique fragment ID within a query
FragmentId sinkId; // Who will consume results of this fragment
String sql; // SQL to be executed
Map<UUID, QueryFragmentPartitions> part; // Involved partitions for every real cache participating in query
}
class QueryFragmentPartitions {
Map<UUID, Integer> partCnts; // Number of expected partitions to be reserved on nodes
Map<UUID, Collection<Integer>> parts; // Explicit partitions
}
class QueryBatch {
QueryId queryId; // Global query ID
FragmentId sourceId; // Source ID (producer)
FragmentId sinkId; // Sink ID (consumer)
UUID sourceNodeId; // Source node ID
int sourceParallelism; // Parallelism of a fragment on a given node
long batchId; // ID of the batch for the given source/sink pair (used for congestion control)
List<Row> rows; // Data rows
boolean last; // Whether this is the last batch
}
class QueryBatchAck {
QueryId queryId; // Global query ID
FragmentId sourceId // Source ID
int lastBatchId; // ID of the last acked batch
} |
Risks and Assumptions
// TODO: Describe project risks, such as API or binary compatibility issues, major protocol changes, etc.
...
// TODO: Links to various reference documents, if applicable.
Tickets
Tickets caused by H2 limitations:
Jira |
---|
server | ASF JIRA |
---|
columns | key,summary,type,created,updated,due,assignee,reporter,priority,status,resolution |
---|
maximumIssues | 20 |
---|
jqlQuery | project = IGNITE AND labels = h2-limitation |
---|
serverId | 5aa69414-a9e9-3523-82ec-879b028fb15b |
---|
|
// TODO: Links or report with relevant JIRA tickets.