Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Start the primary cluster:
    bin/zookeeper-server-start.sh config/zookeeper.properties
    bin/kafka-server-start.sh config/server.properties
  • Start the mirror cluster:
    bin/zookeeper-server-start.sh config/mirror-zookeeper.properties
    JMX_PORT=8888 bin/kafka-server-start.sh config/mirror-server.properties config/mirror-consumer.properties config/mirror-producer.properties
  • If you now send messages to topics on the source cluster, the mirror cluster will eventually mirror those topics.

...

In order to sustain a higher throughput, you would typically use an asynchronous embedded producer and it should be configured to be in blocking mode (i.e., queue.enqueueTimeout.ms=-1). This recommendation is to ensure that messages will not be lost. Otherwise, the default enqueue timeout of the asynchronous producer is zero which means if the producer's internal queue is full, then messages will be dropped due to QueueFullExceptions QueueFullExceptions. A blocking producer however, will wait if the queue is full, and effectively throttle back the embedded consumer's consumption rate. You can enable trace logging in the producer to observe the remaining queue size over time. If the producer's queue is consistently full, it indicates that the mirror cluster is bottle-necked on re-publishing messages to the local (mirror) cluster and/or flushing messages to disk.

...

If you do not wish to have full mirroring, you may use either the whitelist(mirror.topics.whitelist) configuration option to specify which topics to include, or the blacklist (mirror.topics.blacklist) configuration option to specify which topics to exclude. (It is invalid to specify both a whitelist and a blacklist.) These options accept a comma-separated list of topics.

...

Mirroring is often used in cross-DC scenarios, and there are a few configuration options that you may want to tune to help deal with inter-DC communication latencies and performance bottlenecks on your specific hardware. In general, you should set a high value for the socket buffer size on the mirror cluster's consumer configuration (socket.buffersize) and the source cluster's broker configuration (socket.send.buffer). Also, the embedded consumer's fetch size (fetch.size) should be higher than the consumer's socket buffer size. Note that the socket buffer size configurations are a hint to the underlying platform's networking code. If you enable trace logging, you can check the actual receive buffer size and determine whether the setting in the OS networking layer also needs to be adjusted.

...

The consumer offset checker tool is useful to gauge how well your mirror is keeping up with the source cluster. For example, the following was executed during the above demo:

Code Block

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group KafkaMirror --zkconnect localhost:2181 --topic test-topic

...


KafkaMirror,topic1,0-0 (Group,Topic,BrokerId-PartitionId)

...


            Owner = KafkaMirror_jkoshy-ld-1320972386342-beb4bfc9-0

...


  Consumer offset =

...

 561154288
                  = 561,154,288 (0.52G)

...


         Log size = 2231392259
                  = 2,231,392,259 (2.08G)

...


     Consumer lag = 1670237971
                  = 1,670,237,971 (1.56G)

...


BROKER INFO

...


0 -> 127.0.0.1:9092

Note that the --zkconnect argument should point to the source cluster's ZooKeeper. Also, if the topic is not specified, then the tool prints information for all topics under the given consumer group.

...

  • The embedded producer uses the default (random) partitioner. So if you use a custom partitioner in your source cluster, the mirror cluster is not capable of using the same partitioning scheme.
  • The embedded consumer instantiates the same number of message streams for each topic according to the mirror.consumer.numthreads configuration option. There is no per-topic override at the moment.
  • The whitelist and blacklist configuration options do not accept regular expressions and only one of these options can be used in a given mirror setup.