Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state"Under DiscussionAdopted"

Discussion thread

JIRA: KAFKA-13602

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Lot of times, in Kafka Streams users want to send a record to more than one partition on the sink topic. Currently, if a user wants to replicate a message into N partitions, the only way of doing that is to replicate the message N times and then plug-in a new custom partitioner to write the message N times into N different partitions. Since this seems to be a common requirement for users, this To give a more concrete example, let's say there's a sink topic with 10 partitions and a user wants to send records only to even numbered partitions.


In today's world, this is how the user can do the same:

Code Block
languagejava
final int numPartitions = 10;

final KStream<String, String> inputStream = builder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()));

for (int i = 0; i < numPartitions; i += 2) {
	final int evenNumberedPartition = i;
    inputStream.to("output-topic", Produced.with(Serdes.String(), Serdes.String(), (topic, key, val, numPartitions) -> evenNumberedPartition));
}


As can be seen, there's no implicit way of sending the records to multiple partitions. This KIP aims to make this process simpler in Kafka Streams. As a side note, this KIP also adds a provision to drop records using a custom partitioner.

Public Interfaces

https://github.com/apache/kafka/blob/cfe642edee80977173279f4a41e23aa822b9d19f/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java

A new abstract class called MultiCastStreamPartitioner would be added extending StreamPartitioner interface which would allow returning a List of integers indicating the partitions to which the given record should be multi-casted to. Note that partition method is marked final throwing an UnsupportedOperationException as otherwise the child classes can supply both which can lead to confusion on which one to choose. Also note that, if the implementor happens to return all the partitions for the topic, then this effectively becomes broadcasting the record to all the partitionsThe StreamPartitioner method would have a new method added called partitions()  and the current partition()  method would be marked as deprecated. The partitions()  method would be marked as  default within which it would invoke partition()  method and construct a singleton set out of it. Here's how the interface would look like now:

Code Block
languagejava
titleprefixScan
import java.util.Set;
import java.util.ListCollections;

public abstract public classinterface MultiCastStreamPartitioner<KStreamPartitioner<K, V> implements StreamPartitioner<K, V>{

    @Override/**
    public * finalDetermine Integerthe partition(String topic, K key, V value, int numPartitions) { number for a record with the given key and value and the current number of partitions.
     *
   throw  new UnsupportedOperationException();
    }

    /*** @param topic the topic name this record is sent to
     * Determine@param key the Listkey of partition numbers to which a record, with the given key and value and the current numberthe record
     * @param value the value of the record
     * @param numPartitions the total number of partitions
     * of partitions, should be multi-casted to. Note that returning a single valued list with value -1 is a shorthand@return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
     */
    @Deprecated
    Integer partition(String topic, K key, V value, int numPartitions);     


	/**
     * forDetermine broadcastingthe thepartition recordnumbers to allwhich a record, with the partitions ofgiven key and value and the current topic.number
     * of partitions, should be multi-casted to.
     * @param topic the topic name this record is sent to
     * @param key the key of the record
     * @param value the value of the record
     * @param numPartitions the total number of partitions
     * @return an aOptional of ListSet of integers between 0 and {@code numPartitions-1}, or [-1] if
     * Empty optional means use default partitioner
     * Optional of an empty set means the record shouldwon't be pushedsent to allany partitions i.e dropped.
     * Optional of Set of integers means the partitions ofto which the topic record should be sent to.
     * */
    publicdefault abstractOptional<Set<Integer>> List<Integer> partitions(String topic, K key, V value, int numPartitions) {
        Integer partition = partition(topic, key, value, numPartitions);
        return partition == null ? Optional.empty() : Optional.of(Collections.singleton(partition(topic, key, value, numPartitions)));
    }

Proposed Changes

  

}      


The return type is a Set so that for cases of a faulty implementation of partitions  method which might return same partition number multiple times, we would still record it only once.

Proposed Changes

  • As part of this KIP, the users would need to supply a custom partitioner extending MultiCastStreamPartitioner and overriding the partitions method. The reason an abstract class is being added instead of adding an extra method to StreamPartitioner interface is to maintain backward compatibility as StreamPartitioner is being used as a functional interface at multiple places in the AK codebase.
  • RecordCollector and it's implementation RecordCollectorImpl has the method which accepts a StreamPartitioner object and pushes it to the partitions returned by the partition method. Since the users would be passing in an object which is a child of MultiCaseStreamPartitioner class, the send method would also check that and once the partitions are returned, send to all partitions one by one. Here's the code snipper for the same:This is the core piece of logic which would allow multi/broadcasting records. Here are the high level changes. Note that when the partitions()  method return an empty set meaning we won't send the record to any partition, we would also be updating the droppedRecordsSensor. 


    Code Block
    languagejava
    @Override
        public <K, V> void send(final String topic,
                                final K key,
                                final V value,
                                final Headers headers,
                                final Long timestamp,
                                final Serializer<K> keySerializer,
                                final Serializer<V> valueSerializer,
                                final StreamPartitioner<? super K, ? super V> partitioner) {         
    
            Integerif partition(partitioner != null;) {
    
                // Existing logic finalto List<Integer>fetch multicastPartitions;partitions
    
    			// Changes to be made due to the KIP             
    			if (partitions.size(partitioner) !=> null0) {
                    final List<PartitionInfo> partitions;
     Optional<Set<Integer>> maybeMulticastPartitions = partitioner.partitions(topic, key, value, partitions.size());
                    tryif (!maybeMulticastPartitions.isPresent()) {
                    partitions = streamsProducer.partitionsFor(topic    // New change. Use default partitioner
                        send(topic, key, value, headers, null, timestamp, keySerializer, valueSerializer, processorNodeId, context);
                }  catch (final TimeoutException} timeoutException)else {
                        log.warn("Could not get partitions for topic {}, will retry", topic);
    
    Set<Integer> multicastPartitions = maybeMulticastPartitions.get();
                        if (multicastPartitions.isEmpty()) {
                            // re-throwIf a record is not to be sent to trigger `task.timeout.ms`
    any partition, mark it as dropped.
                            log.info("Not sending the record to any partition");
                     throw timeoutException       droppedRecordsSensor.record();
                        } catch else {
                            for (final int KafkaExceptionmulticastPartition: fatalmulticastPartitions) {
                    //    here we cannot drop the message on the floor even if it is a transient timeout exception,send(topic, key, value, headers, multicastPartition, timestamp, keySerializer, valueSerializer, processorNodeId, context);
                    // so we treat everything the same as a fatal exception        }
                        }
                    }
                } else {
                    throw new StreamsException("Could not determineget thepartition number ofinformation partitions for topic '" + topic +
     " for task " + taskId +
                     "' for task " + taskId + ". dueThis tocan "happen + fatal.toString(),
        if the topic does not exist.");
                }
      		} else {
         fatal
           send(topic, key, value, headers, null, timestamp, keySerializer, valueSerializer, processorNodeId, context);
            }       }
                if (partitions.size() > 0) {
    	}


  • StreamPartitioner is also used in FK-join and Interactive queries. For FK-join and IQ, the invocation of partition()  would be replaced by partitions()  and a runtime check would be added to ensure that the returned set is singleton. 


Example Usage

Continuing the example from the motivation section, with the new interface, here's how users can send to even number partitions:


Code Block
languagejava
// Define a partitioner class which sends to even numbered partitions

public static class EvenPartitioner<K, V> implements StreamPartitioner<K, V> {

        @Override
        

...

public Integer 

...

partition(String topic, K key, V value, int numPartitions) {
            return null;
        }

  

...

      @Override
        public Set<Integer> partitions(String topic, K key, V value, int numPartitions) {
            final Set<Integer> partitions = new HashSet<>();
   

...

 

...

 

...

 

...

 

...

     for (int i = 0; i < numPartitions; i += 2) {
         

...

 

...

 

...

 

...

 

...

 

...

 

...

 partitions.add(i);
            }
            return partitions;
        }
    }

// Broadcasting 
public static class BroadcastingPartitioner<K, V> implements StreamPartitioner<K, V> 

...

{

        @Override
        

...

public Integer partition(String topic, K key, V value, int numPartitions) {
            return null;
        

...

}

        @Override
        public Set<Integer> partitions(String topic, K key, V value,

...

 int numPartitions) {                     
				return IntStream.range(0, numPartitions).boxed().collect(Collectors.toSet());
         }
    }

// Don't send to any partitions
public static 

...

class DroppingPartitioner<K, V> implements StreamPartitioner<K, V> {

      

...

 

...

 

...

@Override
        public Integer partition(String topic, K key, V value, int numPartitions) {
  

...

 

...

 

...

 

...

 

...

 

...

 

...

 

...

 

...

 

...

 

...

return 

...

null;
 

...

 

...

 

...

 

...

 

...

 

...

 

...

 

...

}

        @Override
        public Set<Integer> partitions(String topic, K key, V value, 

...

int numPartitions) {
            return Collections.emptySet();
        }
    }


// Build Stream.
final KStream<String, String> inputStream = 

...

builder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()));

// Send to even numbered partitions
inputStream.to("output-topic", Produced.with(Serdes.String(), Serdes.String(), (topic, key, val, numPartitions) -> new EvenPartitioner()));
// Broadcast
inputStream.to("output-topic", Produced.with(Serdes.String(), Serdes.String(), (topic, key, val, numPartitions) -> new BroadcastingPartitioner()));
// Send to even numbered partitions
inputStream.to("output-topic", Produced.with(Serdes.String(), Serdes.String(), (topic, key, val, numPartitions) -> new DroppingPartitioner()));
}


This way users just need to define the partitioner and the internal routing mechanism would take care of sending the records to relevant or no partitions.

Compatibility, Deprecation, and Migration Plan

Since we are adding a new abstract class with no provision of overriding partition method in StreamPartitioner, this should keep the current usages of StreamPartitioner interface backward-compatible. No deprecation/migration plan needed as well.

This KIP deprecates partition()  method in StreamPartitioner . Here is what we would be doing as part of the same:

  • StreamPartitioner is also used in FK-join and IQ. The invocation of partition()  would be replaced by partitions()  and a runtime check would be added to ensure that the returned set is singleton. 
  • Adding sufficient logging

Rejected Alternatives

The first 3 alternatives were rejected as they focussed only broadcasting to all partitions which seemed restrictive.

...

  • extend to() / addSink() with a "broadcast" option/config in KafkaStreams.
  • add toAllPartitions() / addBroadcastSink() methods in KafkaStreams.
  • allow StreamPartitioner to return `-1` for "all partitions"

...

  • .
  • Adding a separate class to handle multi casting. This was rejected in favour of enhancing the current StreamPartitioner  interface.

Test Plan

  • Add new tests(unit/integration) similar to the partition  tests for the newly introduced partitions  method.
  • Add tests to fail when a non singleton set is returned for FK joins and IQ when querying.