Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In my organisation, we have been using kafka as the basic publish-subscribe messaging system provider. Our goal is the send event-based (secure, encrypted) SQL messages reliably and securely, and perform data synchronisation based on the messagesprocess them accordingly. For us, the message keys represent some metadata which we use to either ignore messages (if a loopback to the sender), or log some information. We have the following use case for messaging:

1) A Database transaction event takes placebusiness object transaction occurs at SQL server/HANA Database.

2) The event is captured at the ORM layer, and messaged across 10 multiple data centres all around the world.

3) A group of consumers (for each data centre with a unique consumer-group ID) are will process messages from their respective partitions. 1 consumer per partition.

Under the circumstances, we only need a guarantee that same message won't be sent to multiple partitions. In other words, 1 partition will never be sought by multiple consumers.

Using DefaultPartitioner, we can achieve this only with NULL keys. But since we need keys for metadata, we cannot maintain "Round-robin" selection of partitions because a key hash will determine which partition to choose. We need both non-null key with to have round-robin partition selection for KafkaProducer.style selection regardless of key type (NULL or not-NULL)

Proposed Changes

We therefore, would like to propose an extension of DefaultPartitioner, "KeylessPartitioner". We use "Keyless" as the new partitioner does not focus on the key or paritions. The partitioner code will have code almost be identical to DefaultPartitioner#partition DefaultPartitioner.partition() method, except that it will simply execute the "Null Key and No Partition" logic from DefaultPartitioner. The following is the content of partition() method for our new partitioner.

...

                return Utils.toPositive(nextValue) % numPartitions;
            }

We would also like to clarify that this is not code "Duplication". We do not wish to change the DefaultPartitioner class, but want to "Reuse" certain portion of its logic to achieve our goal.

Public Interfaces

There is no requirement to change any interfaces. We simply use the existing paritioner.class config in server.properties and use a different class name. But we are not changing the default which is DefaultPartitioner.

We will be adding some unit tests, but they will simply be a re-use for round-robin tests already performed for DefaultPartitioner.

Compatibility, Deprecation, and Migration Plan

...