You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

Status

Current state"Under Discussion"

Discussion thread

JIRA: KAFKA-13602

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Lot of times, users want to send a record to more than one partition on the sink topic. Currently, if a user wants to replicate a message into N partitions, the only way of doing that is to replicate the message N times and then plug-in a custom partitioner to write the message N times into N different partitions. Since this seems to be a common requirement for users, this KIP aims to make this process simpler.

Public Interfaces


https://github.com/apache/kafka/blob/cfe642edee80977173279f4a41e23aa822b9d19f/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java

A new abstract class called MultiCastStreamPartitioner would be added extending StreamPartitioner interface which would allow returning a List of integers indicating the partitions to which the given record should be multi-casted to. Note that partition method is marked final throwing an UnsupportedOperationException as otherwise the child classes can supply both which can lead to confusion on which one to choose. Also note that, if the implementor happens to return all the partitions for the topic, then this effectively becomes broadcasting the record to all the partitions

prefixScan
import java.util.List;

public abstract class MultiCastStreamPartitioner<K, V> implements StreamPartitioner<K, V>{

    @Override
    public final Integer partition(String topic, K key, V value, int numPartitions) {
        throw new UnsupportedOperationException();
    }

    /**
     * Determine the List of partition numbers to which a record, with the given key and value and the current number
     * of partitions, should be multi-casted to. Note that returning a single valued list with value -1 is a shorthand
     * for broadcasting the record to all the partitions of the topic.
     *
     * @param topic the topic name this record is sent to
     * @param key the key of the record
     * @param value the value of the record
     * @param numPartitions the total number of partitions
     * @return a List of integers between 0 and {@code numPartitions-1}, or [-1] if the record should be pushed to all
     * the partitions of the topic.
     */
    public abstract List<Integer> par
titions(String topic, K key, V value, int numPartitions);
}

Proposed Changes

  • As part of this KIP, the users would need to supply a custom partitioner extending MultiCastStreamPartitioner and overriding the partitions method. The reason an abstract class is being added instead of adding an extra method to StreamPartitioner interface is to maintain backward compatibility as StreamPartitioner is being used as a functional interface at multiple places in the AK codebase.
  • RecordCollector and it's implementation RecordCollectorImpl has the method which accepts a StreamPartitioner object and pushes it to the partitions returned by the partition method. Since the users would be passing in an object which is a child of MultiCaseStreamPartitioner class, the send method would also check that and once the partitions are returned, send to all partitions one by one. Here's the code snipper for the same:
    @Override
        public <K, V> void send(final String topic,
                                final K key,
                                final V value,
                                final Headers headers,
                                final Long timestamp,
                                final Serializer<K> keySerializer,
                                final Serializer<V> valueSerializer,
                                final StreamPartitioner<? super K, ? super V> partitioner) {
            Integer partition = null;
            final List<Integer> multicastPartitions;
    
            if (partitioner != null) {
                final List<PartitionInfo> partitions;
                try {
                    partitions = streamsProducer.partitionsFor(topic);
                } catch (final TimeoutException timeoutException) {
                    log.warn("Could not get partitions for topic {}, will retry", topic);
    
                    // re-throw to trigger `task.timeout.ms`
                    throw timeoutException;
                } catch (final KafkaException fatal) {
                    // here we cannot drop the message on the floor even if it is a transient timeout exception,
                    // so we treat everything the same as a fatal exception
                    throw new StreamsException("Could not determine the number of partitions for topic '" + topic +
                            "' for task " + taskId + " due to " + fatal.toString(),
                            fatal
                    );
                }
                if (partitions.size() > 0) {
                    if (partitioner instanceof MultiCastStreamPartitioner) {
                        multicastPartitions = ((MultiCastStreamPartitioner<? super K, ? super V>) partitioner).partitions(topic, key, value, partitions.size());
                        for (int p: multicastPartitions) {
                            send(topic, key, value, headers, p, timestamp, keySerializer, valueSerializer);
                            return;
                        }
                    } else {
                        partition = partitioner.partition(topic, key, value, partitions.size());
                    }
                } else {
                    throw new StreamsException("Could not get partition information for topic " + topic + " for task " + taskId +
                            ". This can happen if the topic does not exist.");
                }
            }
    
            send(topic, key, value, headers, partition, timestamp, keySerializer, valueSerializer);
    
        }

Compatibility, Deprecation, and Migration Plan

Since we are adding a new abstract class with no provision of overriding partition method in StreamPartitioner, this should keep the current usages of StreamPartitioner interface backward-compatible. No deprecation/migration plan needed as well.

Rejected Alternatives

  • extend to() / addSink() with a "broadcast" option/config in KafkaStreams.
  • add toAllPartitions() / addBroadcastSink() methods in KafkaStreams.
  • allow StreamPartitioner to return `-1` for "all partitions"

These alternatives were rejected as these support only a full broadcast,so while they are simpler to implement, they are less flexible. 


  • No labels