...
Tasks that are already assigned to an instance, already use the in-memory offsets when calculating partition assignments, so no change is necessary here.
Interactive Query .position Offsets
Input partition "Position
" offsets, introduced by KIP-796: Interactive Query v2, are currently stored in a .position
file by the RocksDBStore
implementation. To ensure consistency with the committed data and changelog offsets, these position offsets will be stored in RocksDB, in the same column family as the changelog offsets, instead of the .position
file. When a StateStore
that manages its own offsets is first initialized, if a .position
file exists in the store directory, its offsets will be automatically migrated into the store, and the file will be deleted.
When writing data to a RocksDBStore
(via put
, delete
, etc.), the input partition offsets will be read from the changelog record metadata (as before), and these offsets will be added to the current transactions WriteBatch
. When the StateStore
is committed, the position offsets in the current WriteBatch
will be written to RocksDB, alongside the records they correspond to. Alongside this, RocksDBStore
will maintain two Position
maps in-memory, one containing the offsets pending in the current transaction's WriteBatch
, and the other containing committed offsets. On commit(Map)
, the uncommitted Position
map will be merged into the committed Position
map such that both maps contain the same offsets. In this sense, the two Position
maps will diverge during writes, and re-converge on-commit.
When an interactive query is made under the READ_COMMITTED
IsolationLevel
the PositionBound
will constrain the committed Position
map, whereas under READ_UNCOMMITTED
, the PositionBound
will constrain the uncommitted Position map.
RocksDB Transactions
When the isolation level is READ_COMMITTED
, we will use RocksDB's WriteBatchWithIndex
as a means to accomplishing atomic writes when not using the RocksDB WAL. When reading records from the StreamThread
, we will use the WriteBatchWithIndex#getFromBatchAndDB
and WriteBatchWithIndex#newIteratorWithBase
utilities in order to ensure that uncommitted writes are available to query. When reading records from Interactive Queries, we will use the regular RocksDB#get
and RocksDB#newIterator
methods, to ensure we see only records that have been flushed (see above). The performance of this is expected to actually be better than the existing, non-batched write path. The main performance concern is that the buffer must reside completely in-memory until it is committed, which is addressed by statestore.uncommitted.max.bytes
, see above.
...