Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Please help us keep this FAQ up-to-date. If there is an answer that you think can be improved, please help improve it. If you look for an answer that isn't here, and later figure it out, please add it. You don't need permission, it's a wiki. (smile)

Table of Contents

Producers

Why do I get QueueFullException in my producer when running in async mode?

...

Code Block
titleConsumerOffsetChecker
borderStylesolid

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group consumer-group1 --zkconnect zkhost:zkport --topic topic1
consumer-group1,topic1,0-0 (Group,Topic,BrokerId-PartitionId)
Owner = consumer-group1-consumer1
Consumer offset = 70121994703
= 70,121,994,703 (65.31G)
Log size = 70122018287
= 70,122,018,287 (65.31G)
Consumer lag = 23584
= 23,584 (0.00G)

...

Code Block
name := "OptionsWatcher"

version := "1.0-SNAPSHOT"

scalaVersion := "2.9.3"

resolvers += "Apache repo" at "https://repository.apache.org/content/repositories/releases"

libraryDependencies ++= Seq(
  jdbc,
  anorm,
  cache,
  "joda-time" % "joda-time" % "2.2",
    "org.joda" % "joda-convert" % "1.3.1",
   "ch.qos.logback" % "logback-classic" % "1.0.13",
   "org.mashupbots.socko" % "socko-webserver_2.9.2" % "0.2.2",
   "nl.grons" % "metrics-scala_2.9.2" % "3.0.0",
   "com.codahale.metrics" % "metrics-core" % "3.0.0",
   "io.backchat.jerkson" % "jerkson_2.9.2" % "0.7.0",
   "com.amazonaws" % "aws-java-sdk" % "1.3.8",
   "net.databinder.dispatch" %% "dispatch-core" % "0.9.5",
   "org.apache.kafka" % "kafka_2.9.2" % "0.8.0-beta1" excludeAll (
     ExclusionRule(organization = "com.sun.jdmk"),
     ExclusionRule(organization = "com.sun.jmx"),
     ExclusionRule(organization = "javax.jms"),
     ExclusionRule(organization = "org.slf4j")
  )
)

Unit testing

How do I write unit tests using Kafka?

First, you need to include the test stuff from Kafka. If using Maven, this
does the trick:

Code Block
<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka_2.10</artifactId>
   <classifier>test</classifier>
   <scope>test</scope>
     </dependency>

 

I used Apache Curator to get my test ZooKeeper server:

 

Code Block
<dependency>
   <groupId>org.apache.curator</groupId>
   <artifactId>curator-test</artifactId>
   <scope>test</scope>
     </dependency>

 

And my code looks like this:

 

Code Block
import java.io.IOException;
 import java.util.Properties;
 
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServerStartable;
 import kafka.utils.TestUtils;
 
 import org.apache.curator.test.TestingServer;
 
 public class TestKafkaCluster {
     KafkaServerStartable kafkaServer;
     TestingServer zkServer;
 
     public TestKafkaCluster() throws Exception {
         zkServer = new TestingServer();
         KafkaConfig config = getKafkaConfig(zkServer.getConnectString());
         kafkaServer = new KafkaServerStartable(config);
         kafkaServer.startup();
     }
 
     private static KafkaConfig getKafkaConfig(final String
 zkConnectString) {
         scala.collection.Iterator<Properties> propsI =
             TestUtils.createBrokerConfigs(1).iterator();
         assert propsI.hasNext();
         Properties props = propsI.next();
         assert props.containsKey("zookeeper.connect");
         props.put("zookeeper.connect", zkConnectString);
         return new KafkaConfig(props);
     }
 
     public String getKafkaBrokerString() {
         return String.format("localhost:%d",
                 kafkaServer.serverConfig().port());
     }
 
     public String getZkConnectString() {
         return zkServer.getConnectString();
     }
 
     public int getKafkaPort() {
         return kafkaServer.serverConfig().port();
     }
 
     public void stop() throws IOException {
         kafkaServer.shutdown();
         zkServer.stop();
     }
 }