THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
- RecordCollector and it's implementation RecordCollectorImpl has the method which accepts a StreamPartitioner object and pushes it to the partitions returned by the partition method. This is the core piece of logic which would allow multi/broadcasting records. Here are the high level changes. Note that when the
partitions()
method return an empty set meaning we won't send the record to any partition, we would also be updating the droppedRecordsSensor.Code Block language java @Override public <K, V> void send(final String topic, final K key, final V value, final Headers headers, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final StreamPartitioner<? super K, ? super V> partitioner) { Set<Integer> multicastPartitions; if (partitioner != null) { // Existing logic to fetch partitions // Changes to be made due to the KIP if (partitions.size() > 0) { multicastPartitions = partitioner.partitions(topic, key, value, partitions.size()); if (multicastPartitions.isEmpty()) { // New change. If a record is not to be sent to any partitions, mark it being // as a dropped record. log.info("Not sending the record to any partition"); droppedRecordsSensor.record(); } else { // iterate over the all the partitions and send to the valid ones. for (int p: multicastPartitions) { // Check if valid partition number if (p >= 0 && p < partitions.size() - 1) { send(topic, key, value, headers, p, timestamp, keySerializer, valueSerializer); } else { log.debug("Not sending record to invalid partition number: {} for topic {}, p, topic); } } } } // Existing logic. }
- StreamPartitioner is also used in FK-join and Interactive queries. The invocation of
partition()
would be replaced bypartitions()
and a runtime check would be added to ensure that the returned set is singleton.
...
- extend to() / addSink() with a "broadcast" option/config in KafkaStreams.
- add toAllPartitions() / addBroadcastSink() methods in KafkaStreams.
- allow StreamPartitioner to return `-1` for "all partitions".
- Adding a separate class to handle multi casting. This was rejected in favour of enhancing the current
StreamPartitioner
interface.
Test Plan
- Add new tests(unit/integration) similar to the
partition
tests for the newly introducedpartitions
method. - Add tests to fail when a non singleton set is returned for FK joins/IQ cases.
...