Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state"AcceptedAdopted"

Discussion threadhere

JIRA: KAFKA-1064813602

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

...

Lot of times, in Kafka Streams users want to send a record to more than one partition

...

on the sink topic. Currently,

...

if a user wants to replicate a message into N partitions, the only way of doing that is to replicate the message N times

...

and then plug-in a new custom partitioner to write the message N

...

times into

...

N different partitions.

...

To give a more concrete example, let's say there's a sink topic with 10 partitions and a user wants to send records only to even numbered partitions.


In today's world, this is how the user can do the same:

Code Block
languagejava
final int numPartitions = 10;

final KStream<String, String> inputStream = builder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()));

for (int i = 0; i < numPartitions; i += 2) {
	final int evenNumberedPartition = i;
    inputStream.to("output-topic", Produced.with(Serdes.String(), Serdes.String(), (topic, key, val, numPartitions) -> evenNumberedPartition));
}


As can be seen, there's no implicit way of sending the records to multiple partitions. This KIP aims to make this process simpler in Kafka Streams. As a side note, this KIP also adds a provision to drop records using a custom partitioner.

Public Interfaces

The StreamPartitioner method would have a new method added called partitions()  and the current partition()  method would be marked as deprecated. The partitions()  method would be marked as  default within which it would invoke partition()  method and construct a singleton set out of it. Here's how the interface would look like now:

Code Block
languagejava
import java.util.Set;
import java.util.Collections;

  public interface StreamPartitioner<K, V> {

    /**

It might be worth to make this easier and add a new feature for it. There are multiple options:

  • extend `to()` / `addSink()` with a "broadcast" option/config
  • add `toAllPartitions()` / `addBroadcastSink()` methods
  • allow StreamPartitioner to return `-1` for "all partitions"
  • extend `StreamPartitioner` to allow returning more than one partition (ie a list/collection of integers instead of a single int)

The first three options imply that a "full broadcast" is supported only, so it's less flexible. On the other hand, it's easier to use (especially the first two options are easy as they do not require to implement a custom partitioner).

The last option would be most flexible and also allow for a "partial broadcast" (aka multi-cast) pattern. It might also be possible to combine two options, or maye even a totally different idea.

Public Interfaces

https://github.com/apache/kafka/blob/cfe642edee80977173279f4a41e23aa822b9d19f/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java

The proposal is to add a new method to ReadOnlyKeyValueStore called prefixScan. This method takes 2 arguments, 1) the prefix to search for and 2) A serializer for the Prefix Key Type. 

Code Block
languagejava
titleprefixScan
	/**
     * Get an iterator over keys which have the specified prefix. The type of the prefix can be different from that of
     * the key. That's why, callers should also pass a serializer for the prefix to convert the prefix into the
     * format in which the keys are stored underneath in the stores
     * @param prefix The prefix.
     * @param prefixKeySerializer Serializer for the Prefix key type
     * @paramDetermine <PS>the Prefix Serializer type
     * @param <P> Prefix Typepartition number for a record with the given key and value and the current number of partitions.
     *
  @return The iterator for* keys@param havingtopic the specifiedtopic prefix.
name this record is sent */to
    <PS extends Serializer<P>, P> KeyValueIterator<K, V> prefixScan(P prefix, PS prefixKeySerializer);

This method would also have a default implementation that throws an UnsupportedOperationException for source compatibility with existing state stores.

Proposed Changes

All the classes which implement ReadOnlyKeyValueStore and which support prefixScan would implement this method. Rest of the classes would throw use the default implementation which throws an UnsupportedOperationException

As an example, plz review the following code for RocksDB store:

RocksDBStore(Highlighting only the changes)

Code Block
languagejava
titleRockDBStore
public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingStore {

	// Current code
	// implement prefixScan
	@Override
    public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(P prefix, PS prefixKeySerializer) {
        Objects.requireNonNull(prefix, "prefix cannot be null");
        Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer cannot be null");

        validateStoreOpen();
        Bytes prefixBytes = Bytes.wrap(prefixKeySerializer.serialize(null, prefix));

        final KeyValueIterator<Bytes, byte[]> rocksDbPrefixSeekIterator = dbAccessor.prefixSeek(prefixBytes);
        openIterators.add(rocksDbPrefixSeekIterator);

        return rocksDbPrefixSeekIterator;
    }

	interface RocksDBAccessor {
	// Current RocksDBAccessor code

		KeyValueIterator<Bytes, byte[]> prefixScan(final Bytes prefix);
	}

	class SingleColumnFamilyAccessor implements RocksDBAccessor {
		// Current Accessor code.

	 	@Override
		public KeyValueIterator<Bytes, byte[]> prefixScan(final Bytes prefix) {
    	  return new RocksDBPrefixIterator(name, db.newIterator(columnFamily), openIterators, prefix);
		}
	}
}
Code Block
languagejava
titleRocksDBPrefixIterator
class RocksDBPrefixIterator extends RocksDbIterator {
    private byte[] rawPrefix;

    RocksDBPrefixIterator(final String name,
                          final RocksIterator newIterator,
                          final Set<KeyValueIterator<Bytes, byte[]>> openIterators,
     * @param key the key of the record
     * @param value the value of the record
     * @param numPartitions the total number of partitions
     * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
     */
    @Deprecated
    Integer partition(String topic, K key, V value, int numPartitions);     


	/**
     * Determine the partition numbers to which a record, with the given key and value and the current number
     * of partitions, should be multi-casted to.
     * @param topic the topic name this record is sent to
     * @param key the key of the record
     * @param value the value of the record
     * @param numPartitions the total number of partitions
     * @return an Optional of Set of integers between 0 and {@code numPartitions-1},
     * Empty optional means use default partitioner
     * Optional of an empty set means the record won't be sent to any partitions i.e dropped.
     * Optional of Set of integers means the partitions to which the record should be sent to.
     * */
    default Optional<Set<Integer>> partitions(String topic, K key, V value, int numPartitions) {
        Integer partition = partition(topic, key, value, numPartitions);
       final Bytesreturn prefix) {
        super(name, newIterator, openIterators);
        this.rawPrefix = prefix.get();
        newIterator.seek(rawPrefixpartition == null ? Optional.empty() : Optional.of(Collections.singleton(partition(topic, key, value, numPartitions)));
    }  

}     private boolean prefixEquals(final byte[] x, final byte[] y) {
        final int min = Math.min(x.length, y.length);
        final ByteBuffer xSlice = ByteBuffer.wrap(x, 0, min);
        final ByteBuffer ySlice = ByteBuffer.wrap(y, 0, min);
   


The return type is a Set so that for cases of a faulty implementation of partitions  method which might return same partition number multiple times, we would still record it only once.

Proposed Changes

  • RecordCollector and it's implementation RecordCollectorImpl has the method which accepts a StreamPartitioner object and pushes it to the partitions returned by the partition method. This is the core piece of logic which would allow multi/broadcasting records. Here are the high level changes. Note that when the partitions()  method return an empty set meaning we won't send the record to any partition, we would also be updating the droppedRecordsSensor. 


    Code Block
    languagejava
    @Override
        public <K, V> void send(final String topic,
                 

...

  •  

...

  •     

...

  •     

...

  •     

...

  •  

...

  •  final K key,
            

...

  •  

...

  •  

...

  •  

...

  •  

...

  •  

...

  •         

...

  •  

...

  •  

...

  •  

...

  •  

...

  •    final V value,
            

...

  •                   

...

  •  

...

  •  final Headers headers,
                  

...

  •  

...

  •  

...

  •         

...

  •     final Long timestamp,
                                final Serializer<K> keySerializer,
                                final Serializer<V> valueSerializer,
                                final StreamPartitioner<? super K, ? super V> partitioner) {         
    
            if (partitioner != null) {
    
                // Existing logic to fetch partitions
    
    			// Changes to be made due to the KIP             
    			if (partitions.size() > 0) {
                    final Optional<Set<Integer>> maybeMulticastPartitions = partitioner.partitions(topic, key, value, partitions.size());
                    if (!maybeMulticastPartitions.isPresent()) {
                        // New change. Use default partitioner
                        send(topic, key, value, headers, null, timestamp, keySerializer, valueSerializer, processorNodeId, context);
                    } else {
                        Set<Integer> multicastPartitions = maybeMulticastPartitions.get();
                        if (multicastPartitions.isEmpty()) {
                            // If a record is not to be sent to any partition, mark it as dropped.
                            log.info("Not sending the record to any partition");
                            droppedRecordsSensor.record();
                        } else {
                            for (final int multicastPartition: multicastPartitions) {
                                send(topic, key, value, headers, multicastPartition, timestamp, keySerializer, valueSerializer, processorNodeId, context);
                            }
                        }
                    }
                } else {
                    throw new StreamsException("Could not get partition information for topic " + topic + " for task " + taskId +
                            ". This can happen if the topic does not exist.");
                }
      		} else {
                send(topic, key, value, headers, null, timestamp, keySerializer, valueSerializer, processorNodeId, context);
            }       
    	}


  • StreamPartitioner is also used in FK-join and Interactive queries. For FK-join and IQ, the invocation of partition()  would be replaced by partitions()  and a runtime check would be added to ensure that the returned set is singleton. 


Example Usage

Continuing the example from the motivation section, with the new interface, here's how users can send to even number partitions:


Code Block
languagejava
// Define a partitioner class which sends to even numbered partitions

public static class EvenPartitioner<K, V> implements StreamPartitioner<K, V> {

        @Override
        public Integer partition(String topic, K key, V value, int numPartitions) {
            return null;
        }

        @Override
        public Set<Integer> partitions(String topic, K key, V value, int numPartitions) {
            final Set<Integer> partitions = new HashSet<>();
            for (int i = 0; i < numPartitions; i += 2) {
                partitions.add(i);
            }
            return partitions;
        }
    }

// Broadcasting 
public static class BroadcastingPartitioner<K, V> implements StreamPartitioner<K, V> {

        @Override
        public Integer partition(String topic, K key, V value, int numPartitions) {

This is a new iterator extending the current RocksDbIterator. It invokes the built-in seek() method in Java Rocks-DB which is a wrapper over the Prefix Seek apis exposed. More information can be found here:

https://github.com/facebook/rocksdb/wiki/Prefix-Seek

Similar to the implementation for RocksDB, we would implement the prefix scan for InMemoryKeyValueStore as well.

Prefix Key Serializer

One thing which should be highlighted about the prefixScan method is the prefixKeySerializer argument. This is needed because the type of the prefix could be different from the type of the actual key. For example, the key in the store could be of type UUID like 123e4567-e89b-12d3-a456-426614174000. The user wants to get all keys which have the prefix 123e4567 which could be represented as a String/byte array but not a UUID. But, since all the keys are serialized in the form of byte arrays if we can serialize the prefix key, then it would be possible to do a prefix scan over the byte array key space. The argument prefixKeySerializer is provided precisely for this purpose. 

Here is a test case which highlights the above point:

Code Block
languagejava
titlePrefixKeySerializer Usage Example

@Test
    public void shouldReturnUUIDsWithStringPrefix() {
        final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();
        Serializer<UUID> uuidSerializer = Serdes.UUID().serializer();
        UUID uuid1 = UUID.randomUUID();
        UUID   uuid2 = UUID.randomUUID()return null;
        String prefix = uuid1.toString().substring(0, 4);}

        entries.add(new KeyValue<>(@Override
          public Set<Integer> partitions(String topic, K key, new Bytes(uuidSerializer.serialize(null, uuid1)),
       V value, int numPartitions) {                     stringSerializer.serialize(null, "a") 
				return IntStream.range(0, numPartitions).boxed().collect(Collectors.toSet());

         }
   entries.add(new KeyValue<>(
                new Bytes(uuidSerializer.serialize(null, uuid2)), }

// Don't send to any partitions
public static class DroppingPartitioner<K, V> implements StreamPartitioner<K, V> {

        @Override
         stringSerializer.serialize(null, "b")));


        rocksDBStore.init(context, rocksDBStore);
public Integer partition(String topic, K key, V value, int numPartitions) {
            rocksDBStore.putAll(entries)return null;
        rocksDBStore.flush();}

        final@Override
 KeyValueIterator<Bytes, byte[]> keysWithPrefix = rocksDBStore.prefixScan(prefix, stringSerializer);
  public Set<Integer> partitions(String topic, K key, String[] valuesWithPrefix = new String[1];
V value, int numPartitions) {
          int numberOfKeysReturned = 0;
return Collections.emptySet();
        while (keysWithPrefix.hasNext()) {}
    }


// Build Stream.
final KStream<String, String> inputStream =  KeyValue<Bytes, byte[]> next = keysWithPrefix.next();
            valuesWithPrefix[numberOfKeysReturned++] = new String(next.value);
        }

        assertEquals(1, numberOfKeysReturned);
        assertEquals(valuesWithPrefix[0], "a");
    }

Compatibility, Deprecation, and Migration Plan

Since the proposal adds the prefixScan method at ReadOnlyKeyValueStore interface level, we will need to either implement the method for those stores which support prefixScan. For source compatibility purposes, rest of the stores would use the default implementation which throws UnsupportedOperationException

Rejected Alternatives

The first alternative to add a separate interface called PrefixSeekableStore and have only relevant stores implement it was rejected. This is because it was not compatible with the way users of Kafka Streams define state stores in the application. 

Current Implementation Details

Basic implementation described above can be found here:

https://github.com/confluentinc/kafka/pull/242

Other considerations

...

builder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()));

// Send to even numbered partitions
inputStream.to("output-topic", Produced.with(Serdes.String(), Serdes.String(), (topic, key, val, numPartitions) -> new EvenPartitioner()));
// Broadcast
inputStream.to("output-topic", Produced.with(Serdes.String(), Serdes.String(), (topic, key, val, numPartitions) -> new BroadcastingPartitioner()));
// Send to even numbered partitions
inputStream.to("output-topic", Produced.with(Serdes.String(), Serdes.String(), (topic, key, val, numPartitions) -> new DroppingPartitioner()));
}


This way users just need to define the partitioner and the internal routing mechanism would take care of sending the records to relevant or no partitions.

Compatibility, Deprecation, and Migration Plan

This KIP deprecates partition()  method in StreamPartitioner . Here is what we would be doing as part of the same:

  • StreamPartitioner is also used in FK-join and IQ. The invocation of partition()  would be replaced by partitions()  and a runtime check would be added to ensure that the returned set is singleton. 
  • Adding sufficient logging

Rejected Alternatives

The first 3 alternatives were rejected as they focussed only broadcasting to all partitions which seemed restrictive.

  • extend to() / addSink() with a "broadcast" option/config in KafkaStreams.
  • add toAllPartitions() / addBroadcastSink() methods in KafkaStreams.
  • allow StreamPartitioner to return `-1` for "all partitions".
  • Adding a separate class to handle multi casting. This was rejected in favour of enhancing the current StreamPartitioner  interface.

Test Plan

  • Add new tests(unit/integration) similar to the partition  tests for the newly introduced partitions  method.
  • Add tests to fail when a non singleton set is returned for FK joins and IQ when querying.

...