Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Zookeeper FAQ

Please help us keep this FAQ up-to-date. If there is an answer that you think can be improved, please help improve it. If you look for an answer that isn't here, and later figure it out, please add it. You don't need permission, it's a wiki. (smile)

Table of Contents

...

Exactly-Once Processing

What is the difference between an "idempotent producer" and a "transactional producer"?

An idempotent producer guarantees that single messages don't end up as duplicates in case a write is internally retried by the producer. A transactional producer allows you to write multiple messages into different partitions across multiple topics atomically. Note: if you use transactions, you automatically get idempotent writes, too.

Why I still observe duplicated data in the final output topic with "transactional producer", or with "EOS enabled streams" client?

Most likely your consumer reading from the final output topic did not have `isolation.level = "read_committed"`. This is necessary so that transactional messages can be correctly read such that not-committed data is not returned from the consumer. See below on consumer "read_committed" mode for more details.

How do I enable idempotent writes?

...

You need to provide a cluster wide unique `transactional.id` for the producer and use the corresponding transaction producer calls (iei.e., initTransaction(), beginTransaction(), commitTransaction(), etc.)

What are PIDs and sequence numbers and how

...

are they related to `transactional.id`?

If a producer is configured for idempotent writes, it gets a cluster wide unique PID (producer id) assigned. The producer also appends a sequence number to every message it writes (starting with sequence number zero). Different producers would use the same sequence numbers. However, the PID-sequenceNumber-pair will be globally unique and allows brokers to identify duplicates duplicate writes (and filter/drop them). If an idempotent producer is stopped and restarted, it gets a new PID assigned, iei.e., PIDs don't "survive".

A `transactional.id` is a user config and thus on producer restart, the same `transactional.id` is uses. This allows brokers to identify the same producer across producer restarts. This identification is required to guarantee consistency in case of a failure: if a producer has an open transactions transaction and crashed, on producer restart the brokers can detect the open transaction and abort it automatically.

...

You only need to configure a consumer with `isolation.level="read_committed"` if the topic contains transactional data, iei.e., was written by a transactional producer. If data is written with an idempotent producer, no transactions are used, and thus using "read_uncommitted" or "read_committed" for the consumer does not make any difference.

...

Code Block
<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka_2.10</artifactId>
   <classifier>test</classifier>
   <scope>test</scope>
</dependency>

 


I used Apache Curator to get my test ZooKeeper server:


 

Code Block
<dependency>
   <groupId>org.apache.curator</groupId>
   <artifactId>curator-test</artifactId>
   <scope>test</scope>
</dependency>

 


And my code looks like this: 


Code Block
import java.io.IOException;
 import java.util.Properties;
 
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServerStartable;
 import kafka.utils.TestUtils;
 
 import org.apache.curator.test.TestingServer;
 
 public class TestKafkaCluster {
     KafkaServerStartable kafkaServer;
     TestingServer zkServer;
 
     public TestKafkaCluster() throws Exception {
         zkServer = new TestingServer();
         KafkaConfig config = getKafkaConfig(zkServer.getConnectString());
         kafkaServer = new KafkaServerStartable(config);
         kafkaServer.startup();
     }
 
     private static KafkaConfig getKafkaConfig(final String zkConnectString) {
         scala.collection.Iterator<Properties> propsI =
             TestUtils.createBrokerConfigs(1).iterator();
         assert propsI.hasNext();
         Properties props = propsI.next();
         assert props.containsKey("zookeeper.connect");
         props.put("zookeeper.connect", zkConnectString);
         return new KafkaConfig(props);
     }
 
     public String getKafkaBrokerString() {
         return String.format("localhost:%d",
                 kafkaServer.serverConfig().port());
     }
 
     public String getZkConnectString() {
         return zkServer.getConnectString();
     }
 
     public int getKafkaPort() {
         return kafkaServer.serverConfig().port();
     }
 
     public void stop() throws IOException {
         kafkaServer.shutdown();
         zkServer.stop();
     }
 }

Operationalizing Zookeeper

The discussion below uses a 3-instance Zookeeper cluster as an example. The findings apply to a larger cluster as well, but you’ll need to adjust the numbers.

Does it make sense to have a config with only 2 Zookeeper instances? I.e., in zookeeper.properties file have two entries for server 1 and server 2 only?

No. A setup with 2 Zookeeper instances is not fault tolerant to even 1 failure. If one of the Zookeeper instances fails, the remaining one will not be functional since there is no quorum majority (1 out of 2 is not majority). If you do a “stat” command on that remaining instance you’ll see the output being “This ZooKeeper instance is not currently serving requests”.

What if you end up with only 2 running Zookeeper instances, e.g., you started with 3 but one failed? Isn’t that the same as the case above?

No it’s not the same scenario. First of all, the 3- instance setup did tolerate 1 instance down. The 2 remaining Zookeeper instances will continue to function because the quorum majority (2 out of 3) is there.

I had a 3 Zookeeper instance setup and one instance just failed. How should I recover?

Restart the failed instance with the same configuration it had before (i.e., same “myid” ID file, and same IP address). It is not important to recover the data volume of the failed instance, but it is a bonus if you do so. Once the instance comes up, it will sync with the other 2 Zookeeper instances and get all the data.

I had a 3 Zookeeper instance setup and two instances failed. How should I recover? Is my Zookeeper cluster even running at that point?

First of all, ZooKeeper is now unavailable and the remaining instance will show “This ZooKeeper instance is not currently serving requests” if probed. Second, you should make sure this situation is extremely rare. It should be possible to recover the first failed instance quickly before the second instance fails. Third, bring up the two failed instances one by one without changing anything in their config. Similarly to the case above, it is not important to recover the data volume of the failed instance, but it is a bonus if you do so. Once the instance comes up, it will sync with the other 1 ZooKeeper instance and get all the data.

I had a 3 Zookeeper instance setup and two instances failed. I can’t recover the failed instances for whatever reason. What should I do?

You will have to restart the remaining healthy ZooKeeper in “standalone” mode and restart all the brokers and point them to this standalone zookeeper (instead of all 3 ZooKeepers).

The Zookeeper cluster is unavailable (for any of the reasons mentioned above, e.g., no quorum, all instances have failed). What is the impact on Kafka clients? What is the impact on brokers?

Applications will be able to continue producing and consuming, at least for a while. This is true if the ZooKeeper cluster is temporarily unavailable but eventually becomes available (after a few mins). On the other hand, if the ZooKeeper cluster is permanently unavailable, then applications will slowly start to see problems with producing/consuming especially if some brokers fail, because the partition leaders will not be distributed to other brokers. So taking one extreme, if the ZooKeeper cluster is down for a month, it is very likely that applications will get produce/consume errors. Admin tools (e.g., that create topics, set ACLs or change configs) will not work. Brokers will not be impacted from Zookeeper being unavailable. They will periodically try to reconnect to the ZooKeeper cluster. If you take care to use the same IP address for a recovered Zookeeper instance as it had before it failed, brokers will not need to be restarted.