Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state"Accepted"

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

From time to time, users ask how they can send a record to more than one partition in a sink topic. Currently, this is only possible by replicate the message N times before the sink and use a custom partitioner to write the N messages into the N different partitions.

It might be worth to make this easier and add a new feature for it. There are multiple options:

  • extend `to()` / `addSink()` with a "broadcast" option/config
  • add `toAllPartitions()` / `addBroadcastSink()` methods
  • allow StreamPartitioner to return `-1` for "all partitions"
  • extend `StreamPartitioner` to allow returning more than one partition (ie a list/collection of integers instead of a single int)

The first three options imply that a "full broadcast" is supported only, so it's less flexible. On the other hand, it's easier to use (especially the first two options are easy as they do not require to implement a custom partitioner).

The last option would be most flexible and also allow for a "partial broadcast" (aka multi-cast) pattern. It might also be possible to combine two options, or maye even a totally different idea.

Public Interfaces


https://github.com/apache/kafka/blob/cfe642edee80977173279f4a41e23aa822b9d19f/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java

...

This method would also have a default implementation that throws an UnsupportedOperationException for source compatibility with existing state stores.

Proposed Changes


All the classes which implement ReadOnlyKeyValueStore and which support prefixScan would implement this method. Rest of the classes would throw use the default implementation which throws an UnsupportedOperationException

...

Similar to the implementation for RocksDB, we would implement the prefix scan for InMemoryKeyValueStore as well.

Prefix Key Serializer

One thing which should be highlighted about the prefixScan method is the prefixKeySerializer argument. This is needed because the type of the prefix could be different from the type of the actual key. For example, the key in the store could be of type UUID like 123e4567-e89b-12d3-a456-426614174000. The user wants to get all keys which have the prefix 123e4567 which could be represented as a String/byte array but not a UUID. But, since all the keys are serialized in the form of byte arrays if we can serialize the prefix key, then it would be possible to do a prefix scan over the byte array key space. The argument prefixKeySerializer is provided precisely for this purpose. 

...

Code Block
languagejava
titlePrefixKeySerializer Usage Example
@Test
    public void shouldReturnUUIDsWithStringPrefix() {
        final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();
        Serializer<UUID> uuidSerializer = Serdes.UUID().serializer();
        UUID uuid1 = UUID.randomUUID();
        UUID uuid2 = UUID.randomUUID();
        String prefix = uuid1.toString().substring(0, 4);
        entries.add(new KeyValue<>(
                new Bytes(uuidSerializer.serialize(null, uuid1)),
                stringSerializer.serialize(null, "a")));

        entries.add(new KeyValue<>(
                new Bytes(uuidSerializer.serialize(null, uuid2)),
                stringSerializer.serialize(null, "b")));


        rocksDBStore.init(context, rocksDBStore);
        rocksDBStore.putAll(entries);
        rocksDBStore.flush();

        final KeyValueIterator<Bytes, byte[]> keysWithPrefix = rocksDBStore.prefixScan(prefix, stringSerializer);
        String[] valuesWithPrefix = new String[1];
        int numberOfKeysReturned = 0;

        while (keysWithPrefix.hasNext()) {
            KeyValue<Bytes, byte[]> next = keysWithPrefix.next();
            valuesWithPrefix[numberOfKeysReturned++] = new String(next.value);
        }

        assertEquals(1, numberOfKeysReturned);
        assertEquals(valuesWithPrefix[0], "a");
    }


Compatibility, Deprecation, and Migration Plan

Since the proposal adds the prefixScan method at ReadOnlyKeyValueStore interface level, we will need to either implement the method for those stores which support prefixScan. For source compatibility purposes, rest of the stores would use the default implementation which throws UnsupportedOperationException

Rejected Alternatives

The first alternative to add a separate interface called PrefixSeekableStore and have only relevant stores implement it was rejected. This is because it was not compatible with the way users of Kafka Streams define state stores in the application. 

Current Implementation Details

Basic implementation described above can be found here:

https://github.com/confluentinc/kafka/pull/242

Other considerations


While having discussions with the Kafka Streams core members on the Slack channel, one of the points brought up by Sophie Blee-Goldman was the performance of Prefix Seek API of Rocks DB Java. There was an active issue on the Facebook RocksDB side which I had mentioned to them on this issue here: https://github.com/facebook/rocksdb/issues/6004  which got closed recently. I had created a JIRA on Kafka to track the activity of this issue and if at all it makes sense to integrate these changes back. Here's the JIRA issue for reference : 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-9168

...