...
The return type is a Set so that for cases of a faulty implementation of partitions
method which might return same partition number multiple times, we would still record it only once.
Also, KeyQueryMetadata
class would be enhanced to also include a method called partitions
. This method would return all partitions for cases when a key is multicasted. When a record is sent to multiple partitions or is dropped via StreamPartitioner#partitions()
then the current partition()
method would return -1
Proposed Changes
- RecordCollector and it's implementation RecordCollectorImpl has the method which accepts a StreamPartitioner object and pushes it to the partitions returned by the partition method. This is the core piece of logic which would allow multi/broadcasting records. Here are the high level changes. Note that when the
partitions()
method return an empty set meaning we won't send the record to any partition, we would also be updating the droppedRecordsSensor.Code Block language java @Override public <K, V> void send(final String topic, final K key, final V value, final Headers headers, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final StreamPartitioner<? super K, ? super V> partitioner) { if (partitioner != null) { // Existing logic to fetch partitions // Changes to be made due to the KIP if (partitions.size() > 0) { final Optional<Set<Integer>> maybeMulticastPartitions = partitioner.partitions(topic, key, value, partitions.size()); if (!maybeMulticastPartitions.isPresent()) { // New change. Use default partitioner send(topic, key, value, headers, null, timestamp, keySerializer, valueSerializer, processorNodeId, context); } else { Set<Integer> multicastPartitions = maybeMulticastPartitions.get(); if (multicastPartitions.isEmpty()) { // If a record is not to be sent to any partition, mark it as dropped. log.info("Not sending the record to any partition"); droppedRecordsSensor.record(); } else { for (final int multicastPartition: multicastPartitions) { send(topic, key, value, headers, multicastPartition, timestamp, keySerializer, valueSerializer, processorNodeId, context); } } } } else { throw new StreamsException("Could not get partition information for topic " + topic + " for task " + taskId + ". This can happen if the topic does not exist."); } } else { send(topic, key, value, headers, null, timestamp, keySerializer, valueSerializer, processorNodeId, context); } }
- StreamPartitioner is also used in FK-join and Interactive queries. For FK-join and IQ, the invocation of
partition()
would be replaced bypartitions()
and a runtime check would be added to ensure that the returned set is singleton.
...
- StreamPartitioner is also used in FK-join and IQ. The invocation of
partition()
would be replaced bypartitions()
and a runtime check would be added to ensure that the returned set is singleton. - Adding sufficient logging
- For backward compatibility reasons, in
KeyQueryMetadata
if a record is sent to one partition viaStreamPartitioner#partitions()
thenpartition()
method would also return the single partition.
Rejected Alternatives
The first 3 alternatives were rejected as they focussed only broadcasting to all partitions which seemed restrictive.
...
- Add new tests(unit/integration) similar to the
partition
tests for the newly introducedpartitions
method. - Add tests to fail when a non singleton set is returned for FK joins and IQ when querying.