...
Code Block | ||
---|---|---|
| ||
package org.apache.kafka.common; import java.io.PrintStream; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; /** * This interface allows to define Formatters that can be used to parse and format records read by a * Consumer instance for display. * The kafka-console-consumer has built-in support for MessageFormatter, via the --formatter flag. * * Kafka provides a few implementations to display records of internal topics such as __consumer_offsets, * __transaction_state and the MirrorMaker2 topics. * */ public interface MessageFormatter extends Configurable, Closeable { /** * InitializesInitialises the MessageFormatter * @param props Properties to configure the formatter * @deprecated Use {@link #configure(Map)} instead, this method is for backward compatibility with the older Formatter interface */ @Deprecated default public void init(Properties props) {} /** * Configures the MessageFormatter * @param configs Map to configure the formatter */ default public void configure(Map<String, ?> configs) { Properties properties = new Properties(); properties.putAll(configs); init(properties); } /** * Parses and formats a record for display * @param consumerRecord the record to format * @param output the print stream used to displayoutput the record */ public void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord, PrintStream output); /** * Closes the formatter */ default public void close() {} } |
...
- HeartbeatFormatter:
> ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic heartbeats --formatter org.apache.kafka.connect.mirror.formatters.HeartbeatFormatter --from-beginning
Source alias: Heartbeat{sourceClusterAlias=B, Target alias: targetClusterAlias=A, Timestamp: timestamp=1588502119726}
- CheckpointFormatter:
> ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic A.checkpoints.internal --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter --from-beginning
Group: qwerty, TopicPartition: A.heartbeats-0, Upstream offset: 631, Downstream offset: 631, Metadata:
Checkpoint{consumerGroupId=qwert, topicPartition=A.heartbeat, upstreamOffset=631, downstreamOffset=631, metatadata=}
- OffsetSyncFormatter:
> ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mm2-offset-syncs.B.internal --formatter org.apache.kafka.connect.mirror.formatters.OffsetSyncFormatter --from-beginning
Topic: heartbeatsOffsetSync{topicPartition=heartbeat-0, Upstream offset: upstreamOffset=0, Downstream offset: downstreamOffset=0}
Proposed Changes
1) kafka.common.MessageFormatter
...
Code Block | ||||
---|---|---|---|---|
| ||||
package kafka.common /** * Typical implementations of this interface convert a `ConsumerRecord` into a type that can then be passed to * a `PrintStream`. * * This is used by the `ConsoleConsumer`. */ @deprecated("This class is deprecated and will be replaced by org.apache.kafka.common.MessageFormatter.", "2.6.0") trait MessageFormatter extends org.apache.kafka.common.MessageFormatter { } |
2) Update existing Formatter to use the new interface
Existing Formatters:
- DefaultMessageFormatter
- LoggingMessageFormatter
- NoOpMessageFormatter
- OffsetsMessageFormatter
- GroupMetadataMessageFormatter
- TransactionLogMessageFormatter
Compatibility, Deprecation, and Migration Plan
...
- Provide a tool or a new mechanism to format binary topics: While this may not require add a new class in the public API, it would not be consistent with the tools users and administrators are already used to.
- Keep the Scala MessageFormatter interface to retain full compatibility: This seems excessive to keep both interfaces as it was not part of the public API and the interface was not mentioned in the console-consumer output.