Table of Contents |
---|
Here is some information on actually running Kafka as a production system. This is meant as a page for people to record their operational and monitoring practices to help people gather knowledge about successfully running Kafka in production. Feel free to add a section for your configuration if you have anything you want to share. There is nothing magically about most of these configurations, you may be able to improve on them, but they may serve as a helpful starting place.
...
We have added two tuning changes: (1) we upped the number of file descriptors since we have lots of topics and lots of connections, and (2) we upped the max socket buffer size to enable high-performance data transfer between data centers (described here).
Java
Code Block |
---|
$ java -version
java version "1.6.0_21"
Java(TM) SE Runtime Environment (build 1.6.0_21-b06)
Java HotSpot(TM) 64-Bit Server VM (build 17.0-b16, mixed mode)
|
Here are our command line options:
Code Block |
---|
java -server -Xms3072m -Xmx3072m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution -Xloggc:logs/gc.log -Djava.awt.headless=true -Dcom.sun.management.jmxremote -classpath <long list of jars> In 0.8,the GC setting is changed slightly to: -Xms3g -Xmx3g -XX:NewSize=256m -XX:MaxNewSize=256m -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:CMSInitiatingOccupancyFraction=30 -XX:+UseCMSInitiatingOccupancyOnly -XX:+CMSConcurrentMTEnabled -XX:+CMSScavengeBeforeRemark -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution -Xloggc:logs/gc.log |
Kafka
We are running Kafka 0.7 right now but may move to trunk as we fix bugs.
...
Here is our server configuration:
Code Block |
---|
kafka.log.default.flush.interval.ms=10000
kafka.log.file.size=1073741824
kafka.log.default.flush.scheduler.interval.ms=2000
kafka.log.flush.interval=3000
kafka.socket.send.buffer=2097152
kafka.socket.receive.buffer=2097152
kafka.monitoring.period.secs=30
kafka.num.threads=8
kafka.log.cleanup.interval.mins=30
kafka.log.retention.hours=168
kafka.zookeeper.sessiontimeoutms=6000
kafka.zookeeper.connection.timeout=2000
kafka.num.partitions=1
|
...
Our monitoring is done though a centralized monitoring system custom to LinkedIn, but it keys off the JMX stats exposed from Kafka. To see what is available the easiest thing is just to start a Kafka broker and/or client and fire up JConsole and take a look.
Server Stats
bean name: kafka:type=kafka.SocketServerStats
Code Block unmigrated-wiki-markupdef getProduceRequestsPerSecond: Double def getFetchRequestsPerSecond: Double def getAvgProduceRequestMs: Double def getMaxProduceRequestMs: Double def getAvgFetchRequestMs: Double def getMaxFetchRequestMs: Double def getBytesReadPerSecond: Double def getBytesWrittenPerSecond: Double def getNumFetchRequests: Long def getNumProduceRequests: Long def getTotalBytesRead: Long def getTotalBytesWritten: Long def getTotalFetchRequestMs: Long def getTotalProduceRequestMs: Long
bean
name:
kafka:type=kafka.BrokerAllTopicStat
kafka:type=kafka.BrokerAllTopicStat.
\[topic
\]
Code Block def getMessagesIn: Long def getBytesIn: Long def getBytesOut: Long def getFailedProduceRequest: Long def getFailedFetchRequest: Long
bean name: kafka:type=kafka.LogFlushStats
Code Block def getFlushesPerSecond: Double def getAvgFlushMs: Double def getTotalFlushMs: Long def getMaxFlushMs: Double def getNumFlushes: Long
Producer stats
bean name: kafka:type=kafka.KafkaProducerStats
Code Block def getProduceRequestsPerSecond: Double def getAvgProduceRequestMs: Double def getMaxProduceRequestMs: Double def getNumProduceRequests: Long
bean name: kafka.producer.Producer:type=AsyncProducerStats
Code Block def getAsyncProducerEvents: Int def getAsyncProducerDroppedEvents: Int
Consumer stats
bean name: kafka:type=kafka.ConsumerStats
Code Block def getPartOwnerStats: String def getConsumerGroup: String def getOffsetLag(topic: String, brokerId: Int, partitionId: Int): Long def getConsumedOffset(topic: String, brokerId: Int, partitionId: Int): Long def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long
Wiki Markup bean
name:
kafka:type=kafka.ConsumerAllTopicStat
kafka:type=kafka.ConsumerTopicStat.
\[topic
\]
Code Block def getMessagesPerTopic: Long def getBytesPerTopic: Long
bean name: kafka:type=kafka.SimpleConsumerStats
Code Block def getFetchRequestsPerSecond: Double def getAvgFetchRequestMs: Double def getMaxFetchRequestMs: Double def getNumFetchRequests: Long def getConsumerThroughput: Double
...