Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Partition reservation - all necessary partition should be reserved and released properly for all stages
  • Cancellation - it should be possible to cancel any query type
  • Streaming - it should be possible to send many packets to destination nodes in advance because 
  • Reactivity - all query stages should operate in non-blocking fashing, no waits should occur inside query pool threads
  • Manageability - it should be possible to get clear picture of query plan and influence some stages with query hints
  • Pipeline-agnostic - design should allow us to have both classical ("volcano") and code-generated pipelines in future
  • Result-agnostic - it should be possible to work with both heap, offheap and out-process intermediate results
  • Abstract out source nature - for future extensibility (e.g. "EXTERNAL TABLE", foreign data wrappers)

"Source" - fundamental concept. Types:

  • Local - getting local node's data
  • Ephemeral - any kind of temporal data retrieved from anywhere. This might be: intermediate query results, results of non-correlated subquery, results of REPLICATED LEFT JOIN PARTITIONED)


Code Block
languagejava
titleMessage model
class QueryRequest  {
    QueryId queryId;                          // Global query ID
	Collection<UUID> cacheDescriptorIds;      // IDs of the caches being queried
	Collection<UUID> nodeIds;                 // IDs of all nodes participating in the query
	AffinityTopologyVersion topVer;           // Topology version
	SqlSchemaVersion schemaVer;               // Expected schema version
	MvccSnapshot mvccSnapshot;                // MVCC snapshot
}

class QueryFragment {
    QueryId queryId;                          // Global query ID
    FragmentId fragmentId;                    // Unique fragment ID within a query
	FragmentId sinkId;                        // Who will consume results of this fragment
	String sql;                               // SQL to be executed
	Map<UUID, QueryFragmentPartitions> part;  // Involved partitions for every real cache participating in query
}

class QueryFragmentPartitions {
    Map<UUID, Integer> partCnts;              // Number of expected partitions to be reserved on nodes
	Map<UUID, Collection<Integer>> parts;     // Explicit partitions
}

class QueryBatch {
    QueryId queryId;                          // Global query ID
	FragmentId sourceId;                      // Source ID (producer)
	FragmentId sinkId;                        // Sink ID (consumer)	
	UUID sourceNodeId;                        // Source node ID
    int sourceParallelism;                    // Parallelism of a fragment on a given node
	long batchId;                             // ID of the batch for the given source/sink pair (used for congestion control)
	List<Row> rows;                           // Data rows
	boolean last;                             // Whether this is the last batch
}

class QueryBatchAck {
    QueryId queryId;                          // Global query ID 
    FragmentId sourceId                       // Source ID
	int lastBatchId;                          // ID of the last acked batch
}


Risks and Assumptions

// TODO: Describe project risks, such as API or binary compatibility issues, major protocol changes, etc.

...