Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...


Kafka already has the concept of MessageFormatters used for other internal topics using binary format. We have OffsetsMessageFormatter, GroupMetadataMessageFormatter, TransactionLogMessageFormatter and the console consumer tool is able to use them to dump the content of __consumer_offsets for example.

The proposal is to provide similar Formatters for the MirrorMaker2 topics: checkpoints, heartbeats and offset-sync. At the same time, I propose publicly exposing a MessageFormatter interface that replaces the internal MessageFormatter trait we currently have. That way the new Formatters can be in the mirror project.

Additionally, having a public interface will also enable users to build their own formatters that can be reused with the console-consumer tool. For example, one could create a formatter that works with a schema registry, or a formatter that hides some fields based on the user identity.

Public Interfaces

1)

...

New MessageFormatter interface


It makes sense to reuse the existing MessageFormatter interface. However at the moment, it's not public and it is in the core project. I propose making this a new interface public and moving it in the org.apache.kafka.common package in clients:

...

The 3 formatters will be in a new package named org.apache.kafka.connect.mirror.formatters in the mirror project:

  • HeartbeatFormatter

Example output: Source alias: B, Target alias: A, Timestamp: 1588502119726

  • CheckpointFormatter

Example output: Group: qwerty, TopicPartition: A.heartbeats-0, Upstream offset: 631, Downstream offset: 631, Metadata:

  • OffsetSyncFormatter

Example output: Topic: heartbeats-0, Upstream offset: 0, Downstream offset: 0

Proposed Changes

1) kafka.common.MessageFormatter

This old The existing Scala MessageFormatter trait will be deleted (it's not public) and the existing implementation will be updated to implement the new Java interface. deprecated and changed to extend the new MessageFormatter interface. This will allow users to keep their custom implementation and easily transition to the new interface. Finally, the trait will be deleted in the next major release.

Code Block
languagescala
titlekafka.common.MessageFormatter
package kafka.common

/**
  * Typical implementations of this interface convert a `ConsumerRecord` into a type that can then be passed to
  * a `PrintStream`.
  *
  * This is used by the `ConsoleConsumer`.
  */
@deprecated("This class is deprecated and will be replaced by org.apache.kafka.common.MessageFormatter.", "2.6.0")
trait MessageFormatter extends org.apache.kafka.common.MessageFormatter {
}

Compatibility, Deprecation, and Migration Plan

None as existing MessageFormatters are not changing and continue to be used the same wayExisting MessageFormatters implementations will continue to work. This allows users to transition to the new interface before the trait is deleted in the next major release.

Rejected Alternatives

  • Provide a tool or a new mechanism to format binary topics: While this may not require add a new class in the public API, it would not be consistent with the tools users and administrators are already used to.