THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
Please help us keep this FAQ up-to-date. If there is an answer that you think can be improved, please help improve it. If you look for an answer that isn't here, and later figure it out, please add it. You don't need permission, it's a wiki.
Table of Contents |
---|
Producers
Why do I get QueueFullException in my producer when running in async mode?
...
Code Block | ||||
---|---|---|---|---|
| ||||
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group consumer-group1 --zkconnect zkhost:zkport --topic topic1
consumer-group1,topic1,0-0 (Group,Topic,BrokerId-PartitionId)
Owner = consumer-group1-consumer1
Consumer offset = 70121994703
= 70,121,994,703 (65.31G)
Log size = 70122018287
= 70,122,018,287 (65.31G)
Consumer lag = 23584
= 23,584 (0.00G)
|
...
Code Block |
---|
name := "OptionsWatcher" version := "1.0-SNAPSHOT" scalaVersion := "2.9.3" resolvers += "Apache repo" at "https://repository.apache.org/content/repositories/releases" libraryDependencies ++= Seq( jdbc, anorm, cache, "joda-time" % "joda-time" % "2.2", "org.joda" % "joda-convert" % "1.3.1", "ch.qos.logback" % "logback-classic" % "1.0.13", "org.mashupbots.socko" % "socko-webserver_2.9.2" % "0.2.2", "nl.grons" % "metrics-scala_2.9.2" % "3.0.0", "com.codahale.metrics" % "metrics-core" % "3.0.0", "io.backchat.jerkson" % "jerkson_2.9.2" % "0.7.0", "com.amazonaws" % "aws-java-sdk" % "1.3.8", "net.databinder.dispatch" %% "dispatch-core" % "0.9.5", "org.apache.kafka" % "kafka_2.9.2" % "0.8.0-beta1" excludeAll ( ExclusionRule(organization = "com.sun.jdmk"), ExclusionRule(organization = "com.sun.jmx"), ExclusionRule(organization = "javax.jms"), ExclusionRule(organization = "org.slf4j") ) ) |
Unit testing
How do I write unit tests using Kafka?
First, you need to include the test stuff from Kafka. If using Maven, this
does the trick:
Code Block |
---|
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<classifier>test</classifier>
<scope>test</scope>
</dependency> |
I used Apache Curator to get my test ZooKeeper server:
Code Block |
---|
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<scope>test</scope>
</dependency> |
And my code looks like this:
Code Block |
---|
import java.io.IOException;
import java.util.Properties;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import kafka.utils.TestUtils;
import org.apache.curator.test.TestingServer;
public class TestKafkaCluster {
KafkaServerStartable kafkaServer;
TestingServer zkServer;
public TestKafkaCluster() throws Exception {
zkServer = new TestingServer();
KafkaConfig config = getKafkaConfig(zkServer.getConnectString());
kafkaServer = new KafkaServerStartable(config);
kafkaServer.startup();
}
private static KafkaConfig getKafkaConfig(final String
zkConnectString) {
scala.collection.Iterator<Properties> propsI =
TestUtils.createBrokerConfigs(1).iterator();
assert propsI.hasNext();
Properties props = propsI.next();
assert props.containsKey("zookeeper.connect");
props.put("zookeeper.connect", zkConnectString);
return new KafkaConfig(props);
}
public String getKafkaBrokerString() {
return String.format("localhost:%d",
kafkaServer.serverConfig().port());
}
public String getZkConnectString() {
return zkServer.getConnectString();
}
public int getKafkaPort() {
return kafkaServer.serverConfig().port();
}
public void stop() throws IOException {
kafkaServer.shutdown();
zkServer.stop();
}
} |