Table of Contents |
---|
Status
Current state: Under discussionAccepted
Discussion thread: here
JIRA: KAFKA-10232 here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Code Block | ||
---|---|---|
| ||
package org.apache.kafka.common; import java.io.PrintStream; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; /** * This interface allows to define Formatters that can be used to parse and format records read by a * Consumer instance for display. * The kafka-console-consumer has built-in support for MessageFormatter, via the --formatter flag. * * Kafka provides a few implementations to display records of internal topics such as __consumer_offsets, * __transaction_state and the MirrorMaker2 topics. * */ public interface MessageFormatter extends Configurable, Closeable { /** * InitializesInitialises the MessageFormatter * @param props Properties to configure the formatter * @deprecated Use {@link #configure(Map)} instead, this method is for backward compatibility with the older Formatter interface */ @Deprecated default public void init(Properties props) {} /** * Configures the MessageFormatter * @param configs Map to configure the formatter */ default public void configure(Map<String, ?> configs) { Properties properties = new Properties(); properties.putAll(configs); init(properties); } /** * Parses and formats a record for display * @param consumerRecord the record to format * @param output the print stream used to displayoutput the record */ public void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord, PrintStream output); /** * Closes the formatter */ default public void close() {} } |
...
The 3 formatters will be in a new package named org.apache.kafka.connect.mirror.formatters in the mirror project:
- HeartbeatFormatter
Example output: Source alias: B, Target alias: A, Timestamp: 1588502119726
- CheckpointFormatter
Example output: Group: qwerty, TopicPartition: A.heartbeats-0, Upstream offset: 631, Downstream offset: 631, Metadata:
- OffsetSyncFormatter
- :
> ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic heartbeats --formatter org.apache.kafka.connect.mirror.formatters.HeartbeatFormatter --from-beginning
Heartbeat{sourceClusterAlias=B, targetClusterAlias=A, timestamp=1588502119726}
- CheckpointFormatter:
> ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic A.checkpoints.internal --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter --from-beginning
Checkpoint{consumerGroupId=qwert, topicPartition=A.heartbeat, upstreamOffset=631, downstreamOffset=631, metatadata=}
- OffsetSyncFormatter:
> ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mm2-offset-syncs.B.internal --formatter org.apache.kafka.connect.mirror.formatters.OffsetSyncFormatter --from-beginning
OffsetSync{topicPartition=heartbeat-0, upstreamOffset=0, downstreamOffset=0}
Example output: Topic: heartbeats-0, Upstream offset: 0, Downstream offset: 0
Proposed Changes
1) kafka.common.MessageFormatter
...
Code Block | ||||
---|---|---|---|---|
| ||||
package kafka.common /** * Typical implementations of this interface convert a `ConsumerRecord` into a type that can then be passed to * a `PrintStream`. * * This is used by the `ConsoleConsumer`. */ @deprecated("This class is deprecated and will be replaced by org.apache.kafka.common.MessageFormatter.", "2.6.0") trait MessageFormatter extends org.apache.kafka.common.MessageFormatter { } |
2) Update existing Formatter to use the new interface
Existing Formatters:
- DefaultMessageFormatter
- LoggingMessageFormatter
- NoOpMessageFormatter
- OffsetsMessageFormatter
- GroupMetadataMessageFormatter
- TransactionLogMessageFormatter
Compatibility, Deprecation, and Migration Plan
Existing MessageFormatters implementations will continue to workrequire no changes beyond recompilation. This allows users to transition to the new interface before the trait is deleted in the next major release.
...
- Provide a tool or a new mechanism to format binary topics: While this may not require add a new class in the public API, it would not be consistent with the tools users and administrators are already used to.
- Keep the Scala MessageFormatter interface to retain full compatibility: This seems excessive to keep both interfaces as it was not part of the public API and the interface was not mentioned in the console-consumer output.