Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Here is some information on actually running Kafka as a production system. This is meant as a page for people to record their operational and monitoring practices to help people gather knowledge about successfully running Kafka in production. Feel free to add a section for your configuration if you have anything you want to share. There is nothing magically about most of these configurations, you may be able to improve on them, but they may serve as a helpful starting place.

...

We have added two tuning changes: (1) we upped the number of file descriptors since we have lots of topics and lots of connections, and (2) we upped the max socket buffer size to enable high-performance data transfer between data centers (described here).

Java

Code Block

$ java -version
java version "1.6.0_21"
Java(TM) SE Runtime Environment (build 1.6.0_21-b06)
Java HotSpot(TM) 64-Bit Server VM (build 17.0-b16, mixed mode)

Here are our command line options:

Code Block

java -server -Xms3072m -Xmx3072m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70
     -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution -Xloggc:logs/gc.log -Djava.awt.headless=true
     -Dcom.sun.management.jmxremote -classpath <long list of jars>

 
In 0.8,the GC setting is changed slightly to:
-Xms3g -Xmx3g -XX:NewSize=256m -XX:MaxNewSize=256m -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:CMSInitiatingOccupancyFraction=30 -XX:+UseCMSInitiatingOccupancyOnly -XX:+CMSConcurrentMTEnabled -XX:+CMSScavengeBeforeRemark -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution -Xloggc:logs/gc.log

Kafka

We are running Kafka 0.7 right now but may move to trunk as we fix bugs.

...

Here is our server configuration:

Code Block

kafka.log.default.flush.interval.ms=10000
kafka.log.file.size=1073741824
kafka.log.default.flush.scheduler.interval.ms=2000
kafka.log.flush.interval=3000
kafka.socket.send.buffer=2097152
kafka.socket.receive.buffer=2097152
kafka.monitoring.period.secs=30
kafka.num.threads=8
kafka.log.cleanup.interval.mins=30
kafka.log.retention.hours=168
kafka.zookeeper.sessiontimeoutms=6000
kafka.zookeeper.connection.timeout=2000
kafka.num.partitions=1

...

Our monitoring is done though a centralized monitoring system custom to LinkedIn, but it keys off the JMX stats exposed from Kafka. To see what is available the easiest thing is just to start a Kafka broker and/or client and fire up JConsole and take a look.

Server Stats

  • bean name: kafka:type=kafka.SocketServerStats

    Code Block
    
      def getProduceRequestsPerSecond: Double
      def getFetchRequestsPerSecond: Double
      def getAvgProduceRequestMs: Double
      def getMaxProduceRequestMs: Double
      def getAvgFetchRequestMs: Double
      def getMaxFetchRequestMs: Double
      def getBytesReadPerSecond: Double
      def getBytesWrittenPerSecond: Double
      def getNumFetchRequests: Long
      def getNumProduceRequests: Long
      def getTotalBytesRead: Long
      def getTotalBytesWritten: Long
      def getTotalFetchRequestMs: Long
      def getTotalProduceRequestMs: Long
    
  • bean name: kafka:type=kafka.BrokerAllTopicStat kafka:type=kafka.BrokerAllTopicStat.[topic]

    Code Block
    
      def getMessagesIn: Long
      def getBytesIn: Long
      def getBytesOut: Long
      def getFailedProduceRequest: Long
      def getFailedFetchRequest: Long
    
  • bean name: kafka:type=kafka.LogFlushStats

    Code Block
    
      def getFlushesPerSecond: Double
      def getAvgFlushMs: Double
      def getTotalFlushMs: Long
      def getMaxFlushMs: Double
      def getNumFlushes: Long
    

Producer stats

  • bean name: kafka:type=kafka.KafkaProducerStats

    Code Block
    
      def getProduceRequestsPerSecond: Double
      def getAvgProduceRequestMs: Double
      def getMaxProduceRequestMs: Double
      def getNumProduceRequests: Long
    
  • bean name: kafka.producer.Producer:type=AsyncProducerStats

    Code Block
    
      def getAsyncProducerEvents: Int
      def getAsyncProducerDroppedEvents: Int
    

Consumer stats

  • bean name: kafka:type=kafka.ConsumerStats

    Code Block
    
      def getPartOwnerStats: String
      def getConsumerGroup: String
      def getOffsetLag(topic: String, brokerId: Int, partitionId: Int): Long
      def getConsumedOffset(topic: String, brokerId: Int, partitionId: Int): Long
      def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long
    
  • bean name: kafka:type=kafka.ConsumerAllTopicStat kafka:type=kafka.ConsumerTopicStat.[topic]

    Code Block
    
      def getMessagesPerTopic: Long
      def getBytesPerTopic: Long
    
  • bean name: kafka:type=kafka.SimpleConsumerStats

    Code Block
    
      def getFetchRequestsPerSecond: Double
      def getAvgFetchRequestMs: Double
      def getMaxFetchRequestMs: Double
      def getNumFetchRequests: Long
      def getConsumerThroughput: Double
    

...