...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * The {@link IsolationLevel} that every transaction created by {@link StateStore#newTransaction()} should use. * <p> * In the context of a {@link org.apache.kafka.streams.processor.StateStore} transaction, these Isolation Levels * adhere to the <a href="https://en.wikipedia.org/wiki/Isolation_(database_systems)#Read_phenomena">ANSI SQL 92 * definitions</a>. * <p> * All isolation levels guarantee "read-your-own-writes", i.e. that writes in the transaction will be seen by * subsequent reads <em>from within the same transaction</em>. Other guarantees vary by isolation level: * <p> * <table> * <tr> * <th>Isolation Level</th> * <th>Description</th> * <th>Permitted Read Phenomena</th> * </tr> * <tr> * <td>{@link IsolationLevel#READ_UNCOMMITTED}</td> // todo: ALOS * <td>Allows queries to read writes from all ongoing transactions that have not-yet been committed.</td> * <td>dirty reads, non-repeatable reads, phantom reads</td> * </tr> * <tr> * <td>{@link IsolationLevel#READ_COMMITTED}</td> // todo: EOS * <td>Allows queries to only read writes that have been committed to the StateStore. Writes by an ongoing * transaction are not visible <em>until that transaction commits</em>.</td> * <td>non-repeatable reads, phantom reads</td> * </tr> * </table> * <p> * Under {@link IsolationLevel#READ_UNCOMMITTED}, there are no guarantees on when records from other transactions * become visible, therefore implementations <em>may</em> refrain from making uncommitted records visible to other * transactions until they're committed, if doing so would improve performance. * <p> * The default implementation of this method will use {@link IsolationLevel#READ_COMMITTED READ_COMMITTED} if the * app is {@link #appConfigs() configured} to use an {@link StreamsConfig#EXACTLY_ONCE_V2 exactly-once} {@link * StreamsConfig#PROCESSING_GUARANTEE_CONFIG processing guarantee}. Otherwise, it will be {@link * IsolationLevel#READ_UNCOMMITTED READ_UNCOMMITTED}. */ default IsolationLevel isolationLevel() { return StreamsConfigUtils.eosEnabled(new StreamsConfig(appConfigs())) ? IsolationLevel.READ_COMMITTED : IsolationLevel.READ_UNCOMMITTED; } |
...
Metrics
New
stream-state-metrics
flushcommit-rate
is renamed tocommit-rate
- the number of calls toStateStore#commit(Map)
commit-latency-avg
- the average time taken to callStateStore#commit(Map)
commit-latency-max
- the maximum time taken to callStateStore#commit(Map)
Deprecated
stream-state-metrics
flush-rate
flush-latency-avg
flush
flush-latency-avg
is renamed tocommit-latency-avg
flush-latency-max
is renamed tocommit-latency-max
These changes are necessary to ensure these metrics are not confused with orthogonal operations, like RocksDB memtable flushes or cache flushes. They will be measuring the invocation of StateStore#commit
, which replaces StateStore#flush
.
While the flush
metrics are only deprecated, they will no longer record any data under normal use, as Kafka Streams will no longer call StateStore#flush()
.
Proposed Changes
To ensure that data is not written to a state store until it has been committed to the changelog, we need to isolate writes from the underlying database until changelog commit. To achieve this, introduce the concept of transaction Isolation Levels, that dictate the visibility of records written by different threads.
...