Status
Current state: "Under Discussion"
Discussion thread: here
JIRA: Not available yet.
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The idea of this KIP is to add prefix scan support to State Stores. This would be helpful for cases when the keys share common prefixes and users want to iterate over them. With prefix scan, the overhead of several I/O requests can be reduced. Currently, this is not possible and, users, despite knowing prefix patterns in the keys, would still need to make multiple calls to iterate over them.
Public Interfaces
The proposal is to add a new interface called PrefixSeekableStore.
package org.apache.kafka.streams.state; public interface PrefixSeekableStore<K, V> { KeyValueIterator<K, V> prefixSeek(K prefix); }
The main idea is that only those stores for which we want to add the Prefix Scan feature, can implement this interface, leaving the rest untouched. This way, it's similar to the current BulkLoadingStore interface, which is implemented only by those state-stores which support bulk loading.
RocksDBStore(Highlighting only the changes)
public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingStore, PrefixSeekableStore<Bytes, byte[]> { // Current code // Implement prefixseek() @Override public KeyValueIterator<Bytes, byte[]> prefixSeek(final Bytes prefix) { validateStoreOpen(); return dbAccessor.prefixSeek(prefix); } interface RocksDBAccessor { // Current RocksDBAccessor code KeyValueIterator<Bytes, byte[]> prefixSeek(final Bytes prefix); } class SingleColumnFamilyAccessor implements RocksDBAccessor { // Current Accessor code. @Override public KeyValueIterator<Bytes, byte[]> prefixSeek(final Bytes prefix) { return new RocksDBPrefixIterator(name, db.newIterator(columnFamily), openIterators, prefix); } } }
Proposed Changes
As already mentioned, with the addition of the new interface PrefixSeekableStore, we can add this support incrementally to each of the stores. For example, RocksDB which supports prefix scan over its keys is a natural fit for this. As part of the initial implementation of this feature, I have made changes to enable prefix scanning for RocksDB state-store.
class RocksDBPrefixIterator extends RocksDbIterator { private byte[] rawPrefix; RocksDBPrefixIterator(final String name, final RocksIterator newIterator, final Set<KeyValueIterator<Bytes, byte[]>> openIterators, final Bytes prefix) { super(name, newIterator, openIterators); this.rawPrefix = prefix.get(); newIterator.seek(rawPrefix); } private boolean prefixEquals(final byte[] x, final byte[] y) { final int min = Math.min(x.length, y.length); final ByteBuffer xSlice = ByteBuffer.wrap(x, 0, min); final ByteBuffer ySlice = ByteBuffer.wrap(y, 0, min); return xSlice.equals(ySlice); } @Override public KeyValue<Bytes, byte[]> makeNext() { final KeyValue<Bytes, byte[]> next = super.makeNext(); if (next == null) return allDone(); else { if (prefixEquals(this.rawPrefix, next.key.get())) return next; else return allDone(); } } }
This is a new iterator extending the current RocksDbIterator. It invokes the built-in seek() method in Java Rocks-DB which is a wrapper over the Prefix Seek apis exposed. More information can be found here:
https://github.com/facebook/rocksdb/wiki/Prefix-Seek
To make the RocksDBStore support prefix seek, it would be needing to implement PrefixSeekableStore. The code has been published above. I have only implemented only SingleColumnFamilyAccessor. It invokes the RocksDBPrefixIterator to perform prefix scan.
Compatibility, Deprecation, and Migration Plan
Since Prefix scan has been abstracted out as a new behaviour, adding it for 1 state-store has not impact on the others. It also doesn't change anything on the state store which implements Prefix Scan making this change backward compatible.
Rejected Alternatives
None
Current Implementation Details
Basic implementation described above can be found here:
https://github.com/confluentinc/kafka/pull/242. It has test cases and also includes JMH benchmarks.
Benchmark Results
running JMH with args [-f 2 RocksDbPrefixStateStoreBenchmark] # JMH version: 1.23 # VM version: JDK 1.8.0_181, Java HotSpot(TM) 64-Bit Server VM, 25.181-b13 # VM invoker: /Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home/jre/bin/java # VM options: <none> # Warmup: 5 iterations, 10 s each # Measurement: 5 iterations, 10 s each # Timeout: 10 min per iteration # Threads: 1 thread, will synchronize iterations # Benchmark mode: Throughput, ops/time # Benchmark: org.apache.kafka.jmh.streams.processor.internals.RocksDbPrefixStateStoreBenchmark.testCachePerformance # Run progress: 0.00% complete, ETA 00:03:20 # Fork: 1 of 2 # Warmup Iteration 1: 23.692 ops/ms # Warmup Iteration 2: 25.236 ops/ms # Warmup Iteration 3: 22.429 ops/ms # Warmup Iteration 4: 18.085 ops/ms # Warmup Iteration 5: 16.214 ops/ms Iteration 1: 17.363 ops/ms Iteration 2: 14.264 ops/ms Iteration 3: 20.890 ops/ms Iteration 4: 11.031 ops/ms Iteration 5: 7.267 ops/ms # Run progress: 50.00% complete, ETA 00:01:43 # Fork: 2 of 2 # Warmup Iteration 1: 7.332 ops/ms # Warmup Iteration 2: 6.843 ops/ms # Warmup Iteration 3: 7.026 ops/ms # Warmup Iteration 4: 7.753 ops/ms # Warmup Iteration 5: 8.071 ops/ms Iteration 1: 7.819 ops/ms Iteration 2: 13.593 ops/ms Iteration 3: 14.494 ops/ms Iteration 4: 12.701 ops/ms Iteration 5: 10.605 ops/ms Result "org.apache.kafka.jmh.streams.processor.internals.RocksDbPrefixStateStoreBenchmark.testCachePerformance": 13.003 ?(99.9%) 6.272 ops/ms [Average] (min, avg, max) = (7.267, 13.003, 20.890), stdev = 4.148 CI (99.9%): [6.731, 19.275] (assumes normal distribution) # Run complete. Total time: 00:03:26 REMEMBER: The numbers below are just data. To gain reusable insights, you need to follow up on why the numbers are the way they are. Use profilers (see -prof, -lprof), design factorial experiments, perform baseline and negative tests that provide experimental control, make sure the benchmarking environment is safe on JVM/OS/HW level, ask for reviews from the domain experts. Do not assume the numbers tell you what you want them to tell. Benchmark Mode Cnt Score Error Units RocksDbPrefixStateStoreBenchmark.testCachePerformance thrpt 10 13.003 ? 6.272 ops/ms JMH benchmarks done
Other considerations
While having discussions with the Kafka Streams core members on the Slack channel, one of the points brought up by Sophie Blee-Goldman was the performance of Prefix Seek API of Rocks DB Java. There was an active issue on the Facebook RocksDB side which I had mentioned to them on this issue here: https://github.com/facebook/rocksdb/issues/6004 which got closed recently. I had created a JIRA on Kafka to track the activity of this issue and if at all it makes sense to integrate these changes back. Here's the JIRA issue for reference :