Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
package org.apache.kafka.common;

import java.io.PrintStream;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;

/**
 * This interface allows to define Formatters that can be used to parse and format records read by a
 *  Consumer instance for display.
 * The kafka-console-consumer has built-in support for MessageFormatter, via the --formatter flag.
 * 
 * Kafka provides a few implementations to display records of internal topics such as __consumer_offsets, 
 * __transaction_state and the MirrorMaker2 topics.
 *
 */
public interface MessageFormatter {

    /**
     * Initializes the MessageFormatter
     * @param props Properties to configure the formatter
     */
    default public void init(Properties props) {}

    /**
     * Parses and formats a record for display
     * @param consumerRecord the record to format
     * @param output the print stream used to display the record
     */
    public void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord, PrintStream output);

    /**
     * Closes the formatter
     */
    default public void close() {}
}

...

The 3 formatters will be in a new package named org.apache.kafka.connect.mirror.formatters in the mirror project:

  • HeartbeatFormatter: > ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic heartbeats --formatter org.apache.kafka.connect.mirror.formatters.HeartbeatFormatter --from-beginning

Example output: Source alias: B, Target alias: A, Timestamp: 1588502119726

  • CheckpointFormatter:

   > ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic A.checkpoints.internal --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter --from-beginning

Example output: Group: qwerty, TopicPartition: A.heartbeats-0, Upstream offset: 631, Downstream offset: 631, Metadata:

  • OffsetSyncFormatter:

  > ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mm2-offset-syncs.B.internal --formatter org.apache.kafka.connect.mirror.formatters.OffsetSyncFormatter --from-beginning

Example output: Topic: heartbeats-0, Upstream offset: 0, Downstream offset: 0

...

Existing MessageFormatters implementations will continue to work but will need to be recompiledrequire no changes beyond recompilation. This allows users to transition to the new interface before the trait is deleted in the next major release.

...