Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The proposal is to add a new interface called PrefixSeekableStoremethod to ReadOnlyKeyValueStore called prefixScan. This method takes 2 arguments, 1) the prefix to search for and 2) A serializer for the Prefix Key Type


Code Block
languagejava
titlePrefixSeekableStoreprefixScan
	/**
     * Get an iterator over keys which have the specified prefix. The type of the prefix can be different from that of
     * the key. That's why, callers should also pass a serializer for the prefix to convert the prefix into the
     * format in which the keys are stored underneath in the stores
     * @param prefix The prefix.
     * @param prefixKeySerializer Serializer for the Prefix key type
     * @param <PS> Prefix Serializer type
     * @param <P> Prefix Type.
     * @return The iterator for keys having the specified prefix.
     */
    <PS extends Serializer<P>, P>package org.apache.kafka.streams.state;

public interface PrefixSeekableStore<K, V> {

    KeyValueIterator<K, V> prefixSeekprefixScan(KP prefix, PS prefixKeySerializer);
}


All the classes which implement ReadOnlyKeyValueStore and which support prefixScan would implement this method. Rest of the classes would throw UnsupportedOperationExceptionThe main idea is that only those stores for which we want to add the Prefix Scan feature, can implement this interface, leaving the rest untouched. This way, it's similar to the current BulkLoadingStore interface, which is implemented only by those state-stores which support bulk loading.

RocksDBStore(Highlighting only the changes)

...

Code Block
languagejava
titleRockDBStore
public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingStore, PrefixSeekableStore<Bytes, byte[]> BatchWritingStore {

	// Current code

	// Implementimplement prefixseek()prefixScan
	@Override
	public     public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixSeek(final Bytes prefix) {
		validateStoreOpen();
		return dbAccessor.prefixSeek(prefix);
	prefixScan(P prefix, PS prefixKeySerializer) {
        Objects.requireNonNull(prefix, "prefix cannot be null");
        Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer cannot be null");

        validateStoreOpen();
        Bytes prefixBytes = Bytes.wrap(prefixKeySerializer.serialize(null, prefix));

        final KeyValueIterator<Bytes, byte[]> rocksDbPrefixSeekIterator = dbAccessor.prefixSeek(prefixBytes);
        openIterators.add(rocksDbPrefixSeekIterator);

        return rocksDbPrefixSeekIterator;
    }

	interface RocksDBAccessor {
	// Current RocksDBAccessor code

		KeyValueIterator<Bytes, byte[]> prefixSeekprefixScan(final Bytes prefix);
	}

	class SingleColumnFamilyAccessor implements RocksDBAccessor {
		// Current Accessor code.

	 	@Override
		public KeyValueIterator<Bytes, byte[]> prefixSeekprefixScan(final Bytes prefix) {
    	  return new RocksDBPrefixIterator(name, db.newIterator(columnFamily), openIterators, prefix);
		}
	}
}

...