Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Lot of times, in Kafka Streams users want to send a record to more than one partition on the sink topic. Currently, if a user wants to replicate a message into N partitions, the only way of doing that is to replicate the message N times and then plug-in a custom partitioner to write the message N times into N different partitions. Since this seems to be a common requirement for users, this To give a more concrete example, let's say there's a sink topic with 10 partitions and a user wants to send records only to even numbered partitions.


In today's world, this is how the user can do the same:

Code Block
languagejava
final int numPartitions = 10;

final KStream<String, String> inputStream = builder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()));

for (int i = 0; i < numPartitions; i += 2) {
	final int evenNumberedPartition = i;
    inputStream.to("output-topic", Produced.with(Serdes.String(), Serdes.String(), (topic, key, val, numPartitions) -> evenNumberedPartition));
}


This seems a little cumbersome way to broadcast. Also, there seems to be no way of dropping a record within the partitioner. This KIP aims to make this process simpler in Kafka Streams.

Public Interfaces

The StreamPartitioner method would have a new method added called partitions()  and the current partition()  method would be marked as deprecated. The partitions()  method would be marked as  default within which it would invoke partitionspartition()  method and construct a singleton list out of it. Here's how the interface would look like now:

Code Block
languagejava
titleprefixScan
import java.util.List;

public interface StreamPartitioner<K, V> {

    /**
     * Determine the partition number for a record with the given key and value and the current number of partitions.
     *
     * @param topic the topic name this record is sent to
     * @param key the key of the record
     * @param value the value of the record
     * @param numPartitions the total number of partitions
     * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
     */
    @Deprecated
    Integer partition(String topic, K key, V value, int numPartitions);


 	/**
     * Determine the List of partition numbers to which a record, with the given key and value and the current number
     * of partitions, should be multi-casted to. Note that returning a single valued list with value -1 is a shorthand
     * for broadcasting the record to all the partitions of the topic.
     *
     * @param topic the topic name this record is sent to
     * @param key the key of the record
     * @param value the value of the record
     * @param numPartitions the total number of partitions
     * @return a List of integers between 0 and {@code numPartitions-1}, null would mean a broadcast to all partitions
	 * while an empty list would mean the record would not be sent to any partitions.
     * the partitions of the topic.
     */

    default List<Integer> partitions(String topic, K key, V value, int numPartitions) {
        return Collections.singletonList(partition(topic, key, value, numPartitions));
    }
}      

...

  • RecordCollector and it's implementation RecordCollectorImpl has the method which accepts a StreamPartitioner object and pushes it to the partitions returned by the partition method. This is the core piece of logic which would allow multi/broadcasting records. Here are the high level changes. Note that when the partitions()  method return an empty list meaning we won't send the record to any partition, we would also be updating the droppedRecordsSensor. 


    Code Block
    languagejava
    @Override
        public <K, V> void send(final String topic,
                                final K key,
                                final V value,
                                final Headers headers,
                                final Long timestamp,
                                final Serializer<K> keySerializer,
                                final Serializer<V> valueSerializer,
                                final StreamPartitioner<? super K, ? super V> partitioner) {         
    		List<Integer> multicastPartitions; 
    
            if (partitioner != null) {
    
    
                // Existing logic finalto List<PartitionInfo> partitions;fetch partitions
    
    
    			// Changes to be made due to the KIP
                tryif (partitions.size() > 0) {
                    partitionsmulticastPartitions = streamsProducerpartitioner.partitionsForpartitions(topic, key, value, partitions.size());
                }  catch (final TimeoutException timeoutExceptionif (multicastPartitions.isEmpty()) {
    					// New change. If a record is not to be sent to any partitions, mark it  log.warn("Could not get partitions for topic {}, will retry", topic);
    
    being
    					// as a dropped record.
                        log.info("Not sending // re-throwthe record to trigger `task.timeout.ms`any partition");
                    throw timeoutException    droppedRecordsSensor.record();
                }  catch (final KafkaException} fatal)else {
    
    					// broadcast 
                   // here we cannot drop theif message(multicastPartitions on== thenull) floor{
     even if it is a transient timeout exception,
                    // Broadcast so we treat everything the same as a fatal exception
    to all partitions
                            throwmulticastPartitions new StreamsException("Could not determine the number of partitions for topic '" + topic +
                 = partitions.stream().map(PartitionInfo::partition).collect(Collectors.toList());
                        }
    					// iterate over the subset of partitions and send to those.
           "' for task " + taskId + " due to " + fatal,
     for (int p: multicastPartitions) {
                   fatal
             send(topic, key, value, headers, p, timestamp, keySerializer, valueSerializer);
                        }
                if (partitions.size() > 0) {}
                } 
    
    			// Existing logic.
     multicastPartitions  = partitioner.partitions(topic, key, value, partitions.size());   }


  • StreamPartitioner is also used in FK-join and Interactive queries. The invocation of partition()  would be replaced by partitions()  and a runtime check would be added to ensure that the returned list is singleton. 


Example Usage

Continuing the example from the motivation section, with the new interface, here's how users can send to even number partitions:


Code Block
languagejava
// Define a partitioner class which sends to even numbered partitions

public static class EvenPartitioner<K, V> implements StreamPartitioner<K, V> {

        @Override
        

...

public Integer 

...

partition(String topic, K key, V value, int numPartitions) {
            return null;
       

...

 }

        @Override
        public List<Integer> partitions(String topic, K key, V value, int 

...

numPartitions) {
            final List<Integer> partitions = 

...

new ArrayList<>();
            for (int i = 0; i < numPartitions; 

...

i 

...

+=

...

 

...

2) {
                partitions.add(i);
        

...

 

...

 

...

 

...

 

...

}
            return partitions;
        }
   

...

 }

// Broadcasting 
public static class BroadcastingPartitioner<K, V> implements StreamPartitioner<K, V> {

        @Override
    

...

    public Integer partition(String topic, K key, V value, int numPartitions) {
         

...

 

...

 

...

 

...

return 

...

null;
        }

        @Override
        public 

...

List<Integer> partitions(String topic, K key, V value,

...

 int numPartitions) {
            return null;
        }
    }

// Don't send to any partitions
public static class DroppingPartitioner<K, V> implements StreamPartitioner<K, V> {

        

...

@Override
        public Integer partition(String topic, 

...

K 

...

key, 

...

V value, int numPartitions) {
            

...

return 

...

null;
        }

        @Override
        public List<Integer> partitions(String topic, K key, V value, int numPartitions) {
    

...

 

...

 

...

 

...

 

...

 

...

 

...

 

...

 

...

return 

...

Collections.

...

emptyList();
        }
    }


// Build Stream.
final KStream<String, String> inputStream = 

...

builder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()));

// Send to even numbered partitions
inputStream.to("output-topic", Produced.with(Serdes.String(), Serdes.String(), (topic, key, val, numPartitions) -> new EvenPartitioner()));
// Broadcase
inputStream.to("output-topic", Produced.with(Serdes.String(), Serdes.String(), (topic, key, val, numPartitions) -> new BroadcastingPartitioner()));
// Send to even numbered partitions
inputStream.to("output-topic", Produced.with(Serdes.String(), Serdes.String(), (topic, key, val, numPartitions) -> new DroppingPartitioner()));
}


This way users just need to define the partitioner and the internal routing mechanism would take care of sending the records to relevant or no partitions.

Compatibility, Deprecation, and Migration Plan

...