Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state"Under DiscussionAccepted"

Discussion threadhere

JIRA: Not available yet. KAFKA-10648

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

The proposal is to add a new interface called PrefixSeekableStoremethod to ReadOnlyKeyValueStore called prefixScan. This method takes 2 arguments, 1) the prefix to search for and 2) A serializer for the Prefix Key Type


Code Block
languagejava
titlePrefixSeekableStoreprefixScan
	/**
     * Get an iterator over keys which have the specified prefix. The type of the prefix can be different from that of
     * the key. That's why, callers should also pass a serializer for the prefix to convert the prefix into the
     * format in which the keys are stored underneath in the stores
     * @param prefix The prefix.
     * @param prefixKeySerializer Serializer for the Prefix key type
     * @param <PS> Prefix Serializer type
     * @param <P> Prefix Type.
     * @return The iterator for keys having the specified prefix.
     */
    <PS extends Serializer<P>, P>package org.apache.kafka.streams.state;

public interface PrefixSeekableStore<K, V> {

    KeyValueIterator<K, V> prefixSeekprefixScan(KP prefix, PS prefixKeySerializer);
}

This method would also have a default implementation that throws an UnsupportedOperationException for source compatibility with existing state stores.

Proposed Changes


All the classes which implement ReadOnlyKeyValueStore and which support prefixScan would implement this method. Rest of the classes would throw use the default implementation which throws an UnsupportedOperationException

As an example, plz review the following code for RocksDB store:The main idea is that only those stores for which we want to add the Prefix Scan feature, can implement this interface, leaving the rest untouched. This way, it's similar to the current BulkLoadingStore interface, which is implemented only by those state-stores which support bulk loading.

RocksDBStore(Highlighting only the changes)

...

Code Block
languagejava
titleRockDBStore
public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingStore, PrefixSeekableStore<Bytes, byte[]>BatchWritingStore {

	// Current code

	// Implementimplement prefixseek()prefixScan
	@Override
	public    public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixSeek(final Bytes prefix) {
		validateStoreOpen();
		return dbAccessor.prefixSeek(prefix);
	prefixScan(P prefix, PS prefixKeySerializer) {
        Objects.requireNonNull(prefix, "prefix cannot be null");
        Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer cannot be null");

        validateStoreOpen();
        Bytes prefixBytes = Bytes.wrap(prefixKeySerializer.serialize(null, prefix));

        final KeyValueIterator<Bytes, byte[]> rocksDbPrefixSeekIterator = dbAccessor.prefixSeek(prefixBytes);
        openIterators.add(rocksDbPrefixSeekIterator);

        return rocksDbPrefixSeekIterator;
    }

	interface RocksDBAccessor {
	// Current RocksDBAccessor code

		KeyValueIterator<Bytes, byte[]> prefixSeekprefixScan(final Bytes prefix);
	}

	class SingleColumnFamilyAccessor implements RocksDBAccessor {
		// Current Accessor code.

	 	@Override
		public KeyValueIterator<Bytes, byte[]> prefixSeekprefixScan(final Bytes prefix) {
    	  return new RocksDBPrefixIterator(name, db.newIterator(columnFamily), openIterators, prefix);
		}
	}
}

...

As already mentioned, with the addition of the new interface PrefixSeekableStore, we can add this support incrementally to each of the stores. For example, RocksDB which supports prefix scan over its keys is a natural fit for this. As part of the initial implementation of this feature, I have made changes to enable prefix scanning for RocksDB state-store. 



Code Block
languagejava
titleRocksDBPrefixIterator
class RocksDBPrefixIterator extends RocksDbIterator {
    private byte[] rawPrefix;

    RocksDBPrefixIterator(final String name,
                          final RocksIterator newIterator,
                          final Set<KeyValueIterator<Bytes, byte[]>> openIterators,
                          final Bytes prefix) {
        super(name, newIterator, openIterators);
        this.rawPrefix = prefix.get();
        newIterator.seek(rawPrefix);
    }

    private boolean prefixEquals(final byte[] x, final byte[] y) {
        final int min = Math.min(x.length, y.length);
        final ByteBuffer xSlice = ByteBuffer.wrap(x, 0, min);
        final ByteBuffer ySlice = ByteBuffer.wrap(y, 0, min);
        return xSlice.equals(ySlice);
    }

    @Override
    public KeyValue<Bytes, byte[]> makeNext() {
        final KeyValue<Bytes, byte[]> next = super.makeNext();
        if (next == null) return allDone();
        else {
            if (prefixEquals(this.rawPrefix, next.key.get())) return next;
            else return allDone();
        }
    }
}

...

https://github.com/facebook/rocksdb/wiki/Prefix-Seek

To make the RocksDBStore support prefix seek, it would be needing to implement PrefixSeekableStore. The code has been published above. I have only implemented only SingleColumnFamilyAccessor. It invokes the RocksDBPrefixIterator to perform prefix scan.

Compatibility, Deprecation, and Migration Plan

Since Prefix scan has been abstracted out as a new behaviour, adding it for 1 state-store has not impact on the others. It also doesn't change anything on the state store which implements Prefix Scan making this change backward compatible.

Rejected Alternatives

None

Current Implementation Details

Basic implementation described above can be found here:

https://github.com/confluentinc/kafka/pull/242It has test cases and also includes JMH benchmarks.

Benchmark Results

Similar to the implementation for RocksDB, we would implement the prefix scan for InMemoryKeyValueStore as well.

Prefix Key Serializer

One thing which should be highlighted about the prefixScan method is the prefixKeySerializer argument. This is needed because the type of the prefix could be different from the type of the actual key. For example, the key in the store could be of type UUID like 123e4567-e89b-12d3-a456-426614174000. The user wants to get all keys which have the prefix 123e4567 which could be represented as a String/byte array but not a UUID. But, since all the keys are serialized in the form of byte arrays if we can serialize the prefix key, then it would be possible to do a prefix scan over the byte array key space. The argument prefixKeySerializer is provided precisely for this purpose. 

Here is a test case which highlights the above point:


Code Block
languagejava
titlePrefixKeySerializer Usage Example

@Test
    public void shouldReturnUUIDsWithStringPrefix() {
        final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();
        Serializer<UUID> uuidSerializer = Serdes.UUID().serializer();
        UUID uuid1 = UUID.randomUUID();
        UUID uuid2 = UUID.randomUUID();
        String prefix = uuid1.toString().substring(0, 4);
        entries.add(new KeyValue<>(
                new Bytes(uuidSerializer.serialize(null, uuid1)),
                stringSerializer.serialize(null, "a")));

        entries.add(new KeyValue<>(
                new Bytes(uuidSerializer.serialize(null, uuid2)),
                stringSerializer.serialize(null, "b")));


        rocksDBStore.init(context, rocksDBStore);
        rocksDBStore.putAll(entries);
        rocksDBStore.flush();

        final KeyValueIterator<Bytes, byte[]> keysWithPrefix = rocksDBStore.prefixScan(prefix, stringSerializer);
        String[] valuesWithPrefix = new String[1];
        int numberOfKeysReturned = 0;

        while (keysWithPrefix.hasNext()) {
Code Block
languagebash
themeMidnight
titlePrefix Iterator Benchmarking Results
running JMH with args [-f 2 RocksDbPrefixStateStoreBenchmark]
# JMH version: 1.23
# VM version: JDK 1.8.0_181, Java HotSpot(TM) 64-Bit Server VM, 25.181-b13
# VM invoker: /Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home/jre/bin/java
# VM options: <none>
# Warmup: 5 iterations, 10 s each
# Measurement: 5 iterations, 10 s each
# Timeout: 10 min per iteration
# Threads: 1 thread, will synchronize iterations
# Benchmark mode: Throughput, ops/time
# Benchmark: org.apache.kafka.jmh.streams.processor.internals.RocksDbPrefixStateStoreBenchmark.testCachePerformance

# Run progress: 0.00% complete, ETA 00:03:20
# Fork: 1 of 2
# Warmup Iteration   1: 23.692 ops/ms
# Warmup Iteration   2: 25.236 ops/ms
# Warmup Iteration   3: 22.429 ops/ms
# Warmup Iteration   4: 18.085 ops/ms
# Warmup Iteration   5: 16.214 ops/ms
Iteration   1: 17.363 ops/ms
Iteration   2: 14.264 ops/ms
Iteration   3: 20.890 ops/ms
Iteration   4: 11.031 ops/ms
Iteration   5: 7.267 ops/ms

# Run progress: 50.00% complete, ETA 00:01:43
# Fork: 2 of 2
# Warmup Iteration   1: 7.332 ops/ms
# Warmup Iteration   2: 6.843 ops/ms
# Warmup Iteration   3: 7.026 ops/ms
# Warmup Iteration   4: 7.753 ops/ms
# Warmup Iteration   5: 8.071 ops/ms
Iteration   1: 7.819 ops/ms
Iteration   2: 13.593 ops/ms
Iteration   3: 14.494 ops/ms
Iteration   4: 12.701 ops/ms
Iteration   5: 10.605 ops/ms


Result "org.apache.kafka.jmh.streams.processor.internals.RocksDbPrefixStateStoreBenchmark.testCachePerformance":
  13.003 ?(99.9%) 6.272 ops/ms [Average]
  (min, avg, max) = (7.267, 13.003, 20.890), stdev = 4.148
  CI (99.9%): [6.731, 19.275] (assumes normal distribution)


# Run complete. Total time: 00:03:26

REMEMBER: The numbers below are just data. To gain reusable insights, you need to follow up on
why the numbers are the way they are. Use profilers (see -prof, -lprof), design factorial
experiments, perform baseline and negative tests that provide experimental control, make sure
the benchmarking environment is safe on JVM/OS/HW level, ask for reviews from the domain experts.
Do not assume the numbers tell you what you want them to tell.

Benchmark            KeyValue<Bytes, byte[]> next = keysWithPrefix.next();
            valuesWithPrefix[numberOfKeysReturned++] = new String(next.value);
        }

        Mode  CntassertEquals(1, numberOfKeysReturned);
   Score   Error   Units
RocksDbPrefixStateStoreBenchmark.testCachePerformance  thrpt   10  13.003 ? 6.272  ops/ms
JMH benchmarks done

assertEquals(valuesWithPrefix[0], "a");
    }


Compatibility, Deprecation, and Migration Plan

Since the proposal adds the prefixScan method at ReadOnlyKeyValueStore interface level, we will need to either implement the method for those stores which support prefixScan. For source compatibility purposes, rest of the stores would use the default implementation which throws UnsupportedOperationException

Rejected Alternatives

The first alternative to add a separate interface called PrefixSeekableStore and have only relevant stores implement it was rejected. This is because it was not compatible with the way users of Kafka Streams define state stores in the application. 

Current Implementation Details

Basic implementation described above can be found here:

https://github.com/confluentinc/kafka/pull/242

Other considerations


While having discussions with the Kafka Streams core members on the Slack channel, one of the points brought up by Sophie Blee-Goldman was the performance of Prefix Seek API of Rocks DB Java. There was an active issue on the Facebook RocksDB side which I had mentioned to them on this issue here: https://github.com/facebook/rocksdb/issues/6004  which got closed recently. I had created a JIRA on Kafka to track the activity of this issue and if at all it makes sense to integrate these changes back. Here's the JIRA issue for reference : 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-9168

...