Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state"AcceptedUnder Discussion"

Discussion threadhere

JIRA: KAFKA-1064813602

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

From time to timeLot of times, users ask how they can want to send a record to more than one partition in a on the sink topic. Currently, this is only possible by if a user wants to replicate a message into N partitions, the only way of doing that is to replicate the message N times before the sink and use and then plug-in a custom partitioner to write the message N messages times into the N different partitions.

It might be worth to make this easier and add a new feature for it. There are multiple options:

  • extend `to()` / `addSink()` with a "broadcast" option/config
  • add `toAllPartitions()` / `addBroadcastSink()` methods
  • allow StreamPartitioner to return `-1` for "all partitions"
  • extend `StreamPartitioner` to allow returning more than one partition (ie a list/collection of integers instead of a single int)

The first three options imply that a "full broadcast" is supported only, so it's less flexible. On the other hand, it's easier to use (especially the first two options are easy as they do not require to implement a custom partitioner).

The last option would be most flexible and also allow for a "partial broadcast" (aka multi-cast) pattern. It might also be possible to combine two options, or maye even a totally different idea.

Since this seems to be a common requirement for users, this KIP aims to make this process simpler.

Public Interfaces


https://github.com/apache/kafka/blob/cfe642edee80977173279f4a41e23aa822b9d19f/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java

The proposal is to add a new method to ReadOnlyKeyValueStore called prefixScan. This method takes 2 arguments, 1) the prefix to search for and 2) A serializer for the Prefix Key Type. A new abstract class called MultiCastStreamPartitioner would be added extending StreamPartitioner interface which would allow returning a List of integers indicating the partitions to which the given record should be multi-casted to. Note that partition method is marked final throwing an UnsupportedOperationException as otherwise the child classes can supply both which can lead to confusion on which one to choose. Also note that, if the implementor happens to return all the partitions for the topic, then this effectively becomes broadcasting the record to all the partitions

Code Block
languagejava
titleprefixScan
	import java.util.List;

public abstract class MultiCastStreamPartitioner<K, V> implements StreamPartitioner<K, V>{

    @Override
    public final Integer partition(String topic, K key, V value, int numPartitions) {
        throw new UnsupportedOperationException();
    }

    /**
     * Determine Getthe List anof iteratorpartition overnumbers keysto which a record, havewith the specifiedgiven prefix.key Theand typevalue ofand the prefix can current number
     * of partitions, should be different frommulti-casted to. Note that of
returning a single valued list *with thevalue key. That's why, callers should also pass a serializer for the prefix to convert the prefix into the-1 is a shorthand
     * for broadcasting the record to all the partitions of the topic.
     * format in which
     * @param topic the keystopic arename storedthis underneathrecord inis thesent storesto
     * @param prefix The prefix.key the key of the record
     * @param prefixKeySerializervalue the Serializervalue forof the Prefix key typerecord
     * @param <PS> Prefix Serializer typenumPartitions the total number of partitions
     * @param <P> Prefix Type.
     * @return The iterator for keys having the specified prefix @return a List of integers between 0 and {@code numPartitions-1}, or [-1] if the record should be pushed to all
     * the partitions of the topic.
     */
    public abstract <PS extends Serializer<P>List<Integer> par
titions(String topic, P>K KeyValueIterator<Kkey, V> prefixScan(P prefixV value, PSint prefixKeySerializernumPartitions);

This method would also have a default implementation that throws an UnsupportedOperationException for source compatibility with existing state stores.

Proposed Changes

All the classes which implement ReadOnlyKeyValueStore and which support prefixScan would implement this method. Rest of the classes would throw use the default implementation which throws an UnsupportedOperationException

As an example, plz review the following code for RocksDB store:

...


}

Proposed Changes

  • As part of this KIP, the users would need to supply a custom partitioner extending MultiCastStreamPartitioner and overriding the partitions method. The reason an abstract class is being added instead of adding an extra method to StreamPartitioner interface is to maintain backward compatibility as StreamPartitioner is being used as a functional interface at multiple places in the AK codebase.
  • RecordCollector and it's implementation RecordCollectorImpl has the method which accepts a StreamPartitioner object and pushes it to the partitions returned by the partition method. Since the users would be passing in an object which is a child of MultiCaseStreamPartitioner class, the send method would also check that and once the partitions are returned, send to all partitions one by one. Here's the code snipper for the same:


    Code Block
    languagejava

...

  • @Override
        public <K, V> void send(final String topic,
           

...

  •  

...

  •  

...

  •  

...

  •  

...

  •  

...

  •  

...

  •  

...

  •  

...

  •  

...

  •  

...

  •  

...

  •         

...

  •  

...

  •  

...

  • final 

...

  • K key,
           

...

  •  

...

  •  

...

  •  

...

  •  

...

  •         

...

  •         

...

  •  

...

  • final 

...

  • V value,
            

...

  •  

...

  •  

...

  •  

...

  •  

...

  •  

...

  •         

...

  •        final 

...

  • Headers 

...

  • headers,
            

...

  •  

...

  •  

...

  •  

...

  •  

...

  •  

...

  •  

...

  •  

...

  •  

...

  •  

...

  •  

...

  •  

...

  •  

...

  •  

...

  •  

...

  •  

...

  •  

...

  •  

...

  •  

...

  •  

...

  •  

...

  • final 

...

  • Long timestamp,
           

...

  •   

...

  •  

...

  •  

...

  •  

...

  •  

...

  •  

...

languagejava
titleRocksDBPrefixIterator

...

  •  

...

  •  

...

  •  

...

  •  

...

  •     

...

  •  

...

  •  

...

  •     

...

  • final 

...

  • Serializer<K> 

...

  • keySerializer,
                                final 

...

  • Serializer<V> 

...

  • valueSerializer,
                                final

...

  •  StreamPartitioner<? super K, ? super V> partitioner) {
            Integer partition = null;
            final List<Integer> multicastPartitions;
    
           

...

  •  if (partitioner != null) {
            

...

  •     final List<PartitionInfo> partitions;
               

...

  •  

...

  • try {
             

...

  •        partitions = streamsProducer.partitionsFor(topic);
        

...

  •      

...

  •  

...

  •  

...

  •  } catch (final TimeoutException timeoutException) {
             

...

  •     

...

  •  

...

  •  

...

  •  

...

  • log.

...

  • warn("Could not get partitions for topic {}, will retry", topic);
    
            

...

  •  

...

  •  

...

  •  

...

  •  

...

  •  

...

  •  

...

  •   // re-throw to trigger `task.timeout.ms`
      

...

  •  

...

  •  

...

  •  

...

  •  

...

  •  

...

  •  

...

  •         

...

  • throw timeoutException;
        

...

  •      

...

  •    } 

...

  • catch 

...

  • (final KafkaException fatal) {
            

...

  •  

...

  •  

...

  •  

...

  •  

...

  •  

...

  •    // here we cannot drop 

...

  • the 

...

  • message 

...

  • on 

...

  • the 

...

  • floor even if it is a transient timeout exception,
                  

...

  •  

...

  •  // so we treat everything the same as a fatal exception
        

...

  •  

...

  •  

...

  •         

...

  •   throw 

...

This is a new iterator extending the current RocksDbIterator. It invokes the built-in seek() method in Java Rocks-DB which is a wrapper over the Prefix Seek apis exposed. More information can be found here:

https://github.com/facebook/rocksdb/wiki/Prefix-Seek

Similar to the implementation for RocksDB, we would implement the prefix scan for InMemoryKeyValueStore as well.

Prefix Key Serializer

One thing which should be highlighted about the prefixScan method is the prefixKeySerializer argument. This is needed because the type of the prefix could be different from the type of the actual key. For example, the key in the store could be of type UUID like 123e4567-e89b-12d3-a456-426614174000. The user wants to get all keys which have the prefix 123e4567 which could be represented as a String/byte array but not a UUID. But, since all the keys are serialized in the form of byte arrays if we can serialize the prefix key, then it would be possible to do a prefix scan over the byte array key space. The argument prefixKeySerializer is provided precisely for this purpose. 

Here is a test case which highlights the above point:

...

languagejava
titlePrefixKeySerializer Usage Example

...

  • new StreamsException("Could not determine the number of partitions for topic '" + topic +
                       

...

  •  

...

  •  

...

  •  

...

  •  

...

  •  

...

  • "' for task " + taskId + " due to " + fatal.toString(),
            

...

  •    

...

  •  

...

  •  

...

  •         

...

  •  

...

  •  

...

  •  fatal
            

...

  •      

...

  •  

...

  •  

...

  •  

...

  • );
            

...

  •  

...

  •    }
                

...

  • if 

...

  • (

...

  • partitions.

...

  • size() > 0) {
                    

...

  • if (partitioner instanceof MultiCastStreamPartitioner) {
            

...

  •  

...

  •            multicastPartitions = ((MultiCastStreamPartitioner<? super 

...

  • K, ? super V>) partitioner).partitions(topic, key, value, partitions.size());
                     

...

  •    for (int p: multicastPartitions) {
                            

...

  • send(topic, key, value, headers, p, timestamp, keySerializer, valueSerializer);
                 

...

  •            return;
               

...

  •          }
                    } else {
                 

...

  •        partition = 

...

  • partitioner.

...

  • partition(topic, key, value, partitions.size());
               

...

  •   

...

  •  

...

  •  

...

  •  

...

  • }
            

...

  •     

...

  • } 

...

  • else 

...

  • {
                    

...

  • throw new 

...

  • StreamsException("Could not get partition information for topic " + topic + " for task " + taskId +
                

...

  •             ". This can happen if 

...

  • the topic does not exist.");
                }

...

  • 
            

...

  • }
    
            

...

  • send(topic, key, value, headers, partition, timestamp, keySerializer, valueSerializer);
    
        }

...


Compatibility, Deprecation, and Migration Plan

Since the proposal adds the prefixScan method at ReadOnlyKeyValueStore interface level, we will need to either implement the method for those stores which support prefixScan. For source compatibility purposes, rest of the stores would use the default implementation which throws UnsupportedOperationException

Rejected Alternatives

The first alternative to add a separate interface called PrefixSeekableStore and have only relevant stores implement it was rejected. This is because it was not compatible with the way users of Kafka Streams define state stores in the application. 

Current Implementation Details

Basic implementation described above can be found here:

https://github.com/confluentinc/kafka/pull/242

Other considerations

we are adding a new abstract class with no provision of overriding partition method in StreamPartitioner, this should keep the current usages of StreamPartitioner interface backward-compatible. No deprecation/migration plan needed as well.

Rejected Alternatives

  • extend to() / addSink() with a "broadcast" option/config in KafkaStreams.
  • add toAllPartitions() / addBroadcastSink() methods in KafkaStreams.
  • allow StreamPartitioner to return `-1` for "all partitions"

These alternatives were rejected as these support only a full broadcast,so while they are simpler to implement, they are less flexible. While having discussions with the Kafka Streams core members on the Slack channel, one of the points brought up by Sophie Blee-Goldman was the performance of Prefix Seek API of Rocks DB Java. There was an active issue on the Facebook RocksDB side which I had mentioned to them on this issue here: https://github.com/facebook/rocksdb/issues/6004  which got closed recently. I had created a JIRA on Kafka to track the activity of this issue and if at all it makes sense to integrate these changes back. Here's the JIRA issue for reference :  JiraserverASF JIRAserverId5aa69414-a9e9-3523-82ec-879b028fb15bkeyKAFKA-9168