Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Formatting, small edits

...

fetch.wait.max.ms controls how long a fetch request will wait on the broker in the normal case. The issue is that if there is a hard crash on the broker (host is down), the client may not realize this immediately since TCP will try very hard to maintain the socket connection. By setting socket.timeout.ms, we allow the client to break out sooner in this case. Typically, socket.timeout.ms should be set to be at least fetch.wait.max.ms or a bit larger. It's possible to specify an indefinite long poll by setting fetch.wait.max.ms to a very large value. It's not recommended right now due to https://issues.apache.org/jira/browse/KAFKA-1016. The consumer-config documentation states that "The actual timeout set
will be max.fetch.wait + socket.timeout.ms." - however, that change seems to have been lost in the code a while ago. https://issues.apache.org/jira/browse/KAFKA-1147 is filed to fix it.

How do I get exactly

...

-once messaging from Kafka?

Exactly once semantics has two parts: avoiding duplication during data
production and avoiding duplicates during data consumption.

There are two approaches to getting exactly once semantics during data
production.:

  1. Use a single-writer per partition and every time you get a network error check the last message in that partition to see if your last write succeeded
  2. Include a primary key (UUID or something) in the message and deduplicate on the consumer.


If you do one of these things, the log that Kafka hosts will be duplicate
-free. However, reading without duplicates depends on some co-operation from
the consumer too. If the consumer is periodically checkpointing its
position then if it fails and restarts it will restart from the
checkpointed position. Thus if the data output and the checkpoint are not
written atomically it will be possible to get duplicates here as well. This
problem is particular to your storage system. For example, if you are using
a database you could commit these together in a transaction. The HDFS
loader Camus that LinkedIn wrote does something like this for Hadoop loads.
The other alternative that doesn't require a transaction is to store the
offset with the data loaded and deduplicate using the
topic/partition/offset combination.

I think there are two improvements that would make this a lot easier:

  1. I think producer idempotence is something that Producer idempotence could be done automatically and much more cheaply by optionally integrating support for this on the server.
  2. The existing high-level consumer doesn't expose a lot of the more fine grained control of offsets (e.g. to reset your position). We will be working on that soon

...

Code Block
<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka_2.10</artifactId>
   <classifier>test</classifier>
   <scope>test</scope>
     </dependency>

 

I used Apache Curator to get my test ZooKeeper server:

 

Code Block
<dependency>
   <groupId>org.apache.curator</groupId>
   <artifactId>curator-test</artifactId>
   <scope>test</scope>
     </dependency>

 

And my code looks like this:

 

Code Block
import java.io.IOException;
 import java.util.Properties;
 
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServerStartable;
 import kafka.utils.TestUtils;
 
 import org.apache.curator.test.TestingServer;
 
 public class TestKafkaCluster {
     KafkaServerStartable kafkaServer;
     TestingServer zkServer;
 
     public TestKafkaCluster() throws Exception {
         zkServer = new TestingServer();
         KafkaConfig config = getKafkaConfig(zkServer.getConnectString());
         kafkaServer = new KafkaServerStartable(config);
         kafkaServer.startup();
     }
 
     private static KafkaConfig getKafkaConfig(final String
 zkConnectString) {
         scala.collection.Iterator<Properties> propsI =
             TestUtils.createBrokerConfigs(1).iterator();
         assert propsI.hasNext();
         Properties props = propsI.next();
         assert props.containsKey("zookeeper.connect");
         props.put("zookeeper.connect", zkConnectString);
         return new KafkaConfig(props);
     }
 
     public String getKafkaBrokerString() {
         return String.format("localhost:%d",
                 kafkaServer.serverConfig().port());
     }
 
     public String getZkConnectString() {
         return zkServer.getConnectString();
     }
 
     public int getKafkaPort() {
         return kafkaServer.serverConfig().port();
     }
 
     public void stop() throws IOException {
         kafkaServer.shutdown();
         zkServer.stop();
     }
 }