You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 5 Next »

Status

Current state"Under Discussion"

Discussion thread

JIRA: KAFKA-13602

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Lot of times, users want to send a record to more than one partition on the sink topic. Currently, if a user wants to replicate a message into N partitions, the only way of doing that is to replicate the message N times and then plug-in a custom partitioner to write the message N times into N different partitions. Since this seems to be a common requirement for users, this KIP aims to make this process simpler.

Public Interfaces

The StreamPartitioner method would have a new method added called partitions()  and the current partition()  method would be marked as deprecated. The partitions()  method would be marked as  default within which it would invoke partitions()  method and construct a singleton list out of it. Here's how the interface would look like now:

prefixScan
import java.util.List;

public abstract class MultiCastStreamPartitioner<K, V> implements StreamPartitioner<K, V>{      public interface StreamPartitioner<K, V> {

    /**
     * Determine the partition number for a record with the given key and value and the current number of partitions.
     *
     * @param topic the topic name this record is sent to
     * @param key the key of the record
     * @param value the value of the record
     * @param numPartitions the total number of partitions
     * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
     */
    @Deprecated
    Integer partition(String topic, K key, V value, int numPartitions);


 	/**
     * Determine the List of partition numbers to which a record, with the given key and value and the current number
     * of partitions, should be multi-casted to. Note that returning a single valued list with value -1 is a shorthand
     * for broadcasting the record to all the partitions of the topic.
     *
     * @param topic the topic name this record is sent to
     * @param key the key of the record
     * @param value the value of the record
     * @param numPartitions the total number of partitions
     * @return a List of integers between 0 and {@code numPartitions-1}, null would mean a broadcast to all partitions
	 * while an empty list would mean the record would not be sent to any partitions.
     * the partitions of the topic.
     */

    default List<Integer> partitions(String topic, K key, V value, int numPartitions) {
        return Collections.singletonList(partition(topic, key, value, numPartitions));
    }
}      

Proposed Changes

  • RecordCollector and it's implementation RecordCollectorImpl has the method which accepts a StreamPartitioner object and pushes it to the partitions returned by the partition method. This is the core piece of logic which would allow multi/broadcasting records. Here are the high level changes. Note that when the partitions()  method return an empty list meaning we won't send the record to any partition, we would also be updating the droppedRecordsSensor. 
    @Override
        public <K, V> void send(final String topic,
                                final K key,
                                final V value,
                                final Headers headers,
                                final Long timestamp,
                                final Serializer<K> keySerializer,
                                final Serializer<V> valueSerializer,
                                final StreamPartitioner<? super K, ? super V> partitioner) {         
    		List<Integer> multicastPartitions;
    
            if (partitioner != null) {
                final List<PartitionInfo> partitions;
                try {
                    partitions = streamsProducer.partitionsFor(topic);
                } catch (final TimeoutException timeoutException) {
                    log.warn("Could not get partitions for topic {}, will retry", topic);
    
                    // re-throw to trigger `task.timeout.ms`
                    throw timeoutException;
                } catch (final KafkaException fatal) {
                    // here we cannot drop the message on the floor even if it is a transient timeout exception,
                    // so we treat everything the same as a fatal exception
                    throw new StreamsException("Could not determine the number of partitions for topic '" + topic +
                        "' for task " + taskId + " due to " + fatal,
                        fatal
                    );
                }
                if (partitions.size() > 0) {
                    multicastPartitions = partitioner.partitions(topic, key, value, partitions.size());
                    if (multicastPartitions.isEmpty()) {
                        log.info("Not sending the record to any partition");
                        droppedRecordsSensor.record();
                    } else {
    
                        if (multicastPartitions == null) {
                            // Broadcast to all partitions
                            multicastPartitions = partitions.stream().map(PartitionInfo::partition).collect(Collectors.toList());
                        }
    
                        for (int p: multicastPartitions) {
                            send(topic, key, value, headers, p, timestamp, keySerializer, valueSerializer);
                        }
                    }
                } else {
                    throw new StreamsException("Could not get partition information for topic " + topic + " for task " + taskId +
                        ". This can happen if the topic does not exist.");
                }
            } else {
                send(topic, key, value, headers, null, timestamp, keySerializer, valueSerializer);
            }
          }
  • StreamPartitioner is also used in FK-join and Interactive queries. The invocation of partition()  would be replaced by partitions()  and a runtime check would be added to ensure that the returned list is singleton. 

Compatibility, Deprecation, and Migration Plan

This KIP deprecates partition()  method in StreamPartitioner . Here is what we would be doing as part of the same:

  • StreamPartitioner is also used in FK-join and Interactive queries. The invocation of partition()  would be replaced by partitions()  and a runtime check would be added to ensure that the returned list is singleton. 
  • Adding sufficient logging

Rejected Alternatives

  • extend to() / addSink() with a "broadcast" option/config in KafkaStreams.
  • add toAllPartitions() / addBroadcastSink() methods in KafkaStreams.
  • allow StreamPartitioner to return `-1` for "all partitions".
  • Adding a separate class to handle multi casting.



  • No labels