Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state"AcceptedAdopted"

Discussion thread

JIRA: KAFKA-13602

...

  • RecordCollector and it's implementation RecordCollectorImpl has the method which accepts a StreamPartitioner object and pushes it to the partitions returned by the partition method. This is the core piece of logic which would allow multi/broadcasting records. Here are the high level changes. Note that when the partitions()  method return an empty set meaning we won't send the record to any partition, we would also be updating the droppedRecordsSensor. 


    Code Block
    languagejava
    @Override
        public <K, V> void send(final String topic,
                                final K key,
                                final V value,
                                final Headers headers,
                                final Long timestamp,
                                final Serializer<K> keySerializer,
                                final Serializer<V> valueSerializer,
                                final StreamPartitioner<? super K, ? super V> partitioner) {         
    
            if (partitioner != null) {
    
                // Existing logic to fetch partitions
    
    			// Changes to be made due to the KIP             
    			if (partitions.size() > 0) {
                    final Optional<Set<Integer>> maybeMulticastPartitions = partitioner.partitions(topic, key, value, partitions.size());
                    if (!maybeMulticastPartitions.isPresent()) {
                        // New change. Use default partitioner
                        send(topic, key, value, headers, null, timestamp, keySerializer, valueSerializer, processorNodeId, context);
                    } else {
                        Set<Integer> multicastPartitions = maybeMulticastPartitions.get();
                        if (multicastPartitions.isEmpty()) {
                            // If a record is not to be sent to any partition, mark it as dropped.
                            log.info("Not sending the record to any partition");
                            droppedRecordsSensor.record();
                        } else {
                            for (final int multicastPartition: multicastPartitions) {
                                send(topic, key, value, headers, multicastPartition, timestamp, keySerializer, valueSerializer, processorNodeId, context);
                            }
                        }
                    }
                } else {
                    throw new StreamsException("Could not get partition information for topic " + topic + " for task " + taskId +
                            ". This can happen if the topic does not exist.");
                }
      		} else {
                send(topic, key, value, headers, null, timestamp, keySerializer, valueSerializer, processorNodeId, context);
            }       
    	}


  • StreamPartitioner is also used in FK-join and Interactive queries. The For FK-join, the invocation of partition()  would be replaced by partitions()  and a runtime check would be added to ensure that the returned set is singleton. 

...