Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
package org.apache.kafka.common;

import java.io.PrintStream;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;

/**
 * This interface allows to define Formatters that can be used to parse and format records read by a
 *  Consumer instance for display.
 * The kafka-console-consumer has built-in support for MessageFormatter, via the --formatter flag.
 * 
 * Kafka provides a few implementations to display records of internal topics such as __consumer_offsets, 
 * __transaction_state and the MirrorMaker2 topics.
 *
 */
public interface MessageFormatter extends Configurable, Closeable {

    /**
     * InitializesInitialises the MessageFormatter
     * @param props Properties to configure the formatter
     * @deprecated Use {@link #configure(Map)} instead, this method is for backward compatibility with the older Formatter interface
     */
    @Deprecated
    default public void init(Properties props) {}

    /**
     * Configures the MessageFormatter
     * @param configs Map to configure the formatter
     */
    default public void configure(Map<String, ?> configs) {
        Properties properties = new Properties();
        properties.putAll(configs);
        init(properties);
    }

    /**
     * Parses and formats a record for display
     * @param consumerRecord the record to format
     * @param output the print stream used to displayoutput the record
     */
    public void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord, PrintStream output);

    /**
     * Closes the formatter
     */
    default public void close() {}
}

...

  • HeartbeatFormatter: > ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic heartbeats --formatter org.apache.kafka.connect.mirror.formatters.HeartbeatFormatter --from-beginning

Source alias: Heartbeat{sourceClusterAlias=B, Target alias: targetClusterAlias=A, Timestamp: timestamp=1588502119726}

  • CheckpointFormatter:

   > ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic A.checkpoints.internal --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter --from-beginning

Group: qwerty, TopicPartition: A.heartbeats-0, Upstream offset: 631, Downstream offset: 631, Metadata:Checkpoint{consumerGroupId=qwert, topicPartition=A.heartbeat, upstreamOffset=631, downstreamOffset=631, metatadata=}

  • OffsetSyncFormatter:

  > ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mm2-offset-syncs.B.internal --formatter org.apache.kafka.connect.mirror.formatters.OffsetSyncFormatter --from-beginning

Topic: heartbeatsOffsetSync{topicPartition=heartbeat-0, Upstream offset: upstreamOffset=0, Downstream offset: downstreamOffset=0}

Proposed Changes

1) kafka.common.MessageFormatter

...

Code Block
languagescala
titlekafka.common.MessageFormatter
package kafka.common

/**
  * Typical implementations of this interface convert a `ConsumerRecord` into a type that can then be passed to
  * a `PrintStream`.
  *
  * This is used by the `ConsoleConsumer`.
  */
@deprecated("This class is deprecated and will be replaced by org.apache.kafka.common.MessageFormatter.", "2.6.0")
trait MessageFormatter extends org.apache.kafka.common.MessageFormatter {
}

2) Update existing Formatter to use the new interface

Existing Formatters:

  • DefaultMessageFormatter
  • LoggingMessageFormatter
  • NoOpMessageFormatter
  • OffsetsMessageFormatter
  • GroupMetadataMessageFormatter
  • TransactionLogMessageFormatter

Compatibility, Deprecation, and Migration Plan

...

  • Provide a tool or a new mechanism to format binary topics: While this may not require add a new class in the public API, it would not be consistent with the tools users and administrators are already used to.
  • Keep the Scala MessageFormatter interface to retain full compatibility: This seems excessive to keep both interfaces as it was not part of the public API and the interface was not mentioned in the console-consumer output.