...
Our monitoring is done though a centralized monitoring system custom to LinkedIn, but it keys off the JMX stats exposed from kafka. To see what is available the easiest thing is just to start a kafka broker and/or client and fire up JConsole and take a look.
Server Stats
- JMX Stats
- Number of fetch and produce requests per second
- MB produced and fetched per second
- Avg and max fetch and produce request time
- Number of log flushes on the server
- Average and max log flush time
- System Stats
- CPU
- Network throughput
- Disk throughput (MB/sec, reads and writes)
- Disk space remaining
- IO service time
- IO Operations/sec
- # files descriptors used/available
Application stats
These are monitored per-application
- bean name: kafka:type=kafka.SocketServerStats
Code Block def getProduceRequestsPerSecond: Double def getFetchRequestsPerSecond: Double def getAvgProduceRequestMs: Double def getMaxProduceRequestMs: Double def getAvgFetchRequestMs: Double def getMaxFetchRequestMs: Double def getBytesReadPerSecond: Double def getBytesWrittenPerSecond: Double def getNumFetchRequests: Long def getNumProduceRequests: Long def getTotalBytesRead: Long def getTotalBytesWritten: Long def getTotalFetchRequestMs: Long def getTotalProduceRequestMs: Long
Wiki Markup bean name: kafka:type=kafka.BrokerAllTopicStat kafka:type=kafka.BrokerAllTopicStat.\[topic\]
Code Block def getMessagesIn: Long def getBytesIn: Long def getBytesOut: Long def getFailedProduceRequest: Long def getFailedFetchRequest: Long
- bean name: kafka:type=kafka.LogFlushStats
Code Block def getFlushesPerSecond: Double def getAvgFlushMs: Double def getTotalFlushMs: Long def getMaxFlushMs: Double def getNumFlushes: Long
Producer stats
- bean name: kafka:type=kafka.KafkaProducerStats
Code Block def getProduceRequestsPerSecond: Double def getAvgProduceRequestMs: Double def getMaxProduceRequestMs: Double def getNumProduceRequests: Long
- bean name: kafka.producer.Producer:type=AsyncProducerStats
Code Block def getAsyncProducerEvents: Int def getAsyncProducerDroppedEvents: Int
Consumer stats
- bean name: kafka:type=kafka.ConsumerStats
Code Block def getPartOwnerStats: String def getConsumerGroup: String def getOffsetLag(topic: String, brokerId: Int, partitionId: Int): Long def getConsumedOffset(topic: String, brokerId: Int, partitionId: Int): Long def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long
Wiki Markup bean name: kafka:type=kafka.ConsumerAllTopicStat kafka:type=kafka.ConsumerTopicStat.\[topic\]
Code Block def getMessagesPerTopic: Long def getBytesPerTopic: Long
- bean name: kafka:type=kafka.SimpleConsumerStats
Code Block def getFetchRequestsPerSecond: Double def getAvgFetchRequestMs: Double def getMaxFetchRequestMs: Double def getNumFetchRequests: Long def getConsumerThroughput: Double
- Number of produce and fetch requests sent
- Request latency
- Queued messages not yet sent (we use the async producer)
Audit
The final alerting we do is on the correctness of the data delivery. We audit that every message that is sent is consumed by all consumers and measure the lag for this to occur. For important topics we alert if a certain completeness is not achieved in a certain time period. The details of this are discussed in KAFKA-260.
...