** PLEASE NOTE ** The recommended producer is from latest stable release using the new Java producer http://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html

 

Once you have confirmed you have a basic Kafka cluster setup (see 0.8 Quick Start) it is time to write some code!

Producers

The Producer class is used to create new messages for a specific Topic and optional Partition.

If using Java you need to include a few packages for the Producer and supporting classes:

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

The first step in your code is to define properties for how the Producer finds the cluster, serializes the messages and if appropriate directs the message to a specific Partition.

These properties are defined in the standard Java Properties object:

Properties props = new Properties();

props.put("metadata.broker.list", "broker1:9092,broker2:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "example.producer.SimplePartitioner");
props.put("request.required.acks", "1");

ProducerConfig config = new ProducerConfig(props);

The first property, “metadata.broker.list” defines where the Producer can find a one or more Brokers to determine the Leader for each topic. This does not need to be the full set of Brokers in your cluster but should include at least two in case the first Broker is not available. No need to worry about figuring out which Broker is the leader for the topic (and partition), the Producer knows how to connect to the Broker and ask for the meta data then connect to the correct Broker.

The second property “serializer.class” defines what Serializer to use when preparing the message for transmission to the Broker. In our example we use a simple String encoder provided as part of Kafka. Note that the encoder must accept the same type as defined in the KeyedMessage object in the next step.

It is possible to change the Serializer for the Key (see below) of the message by defining "key.serializer.class" appropriately. By default it is set to the same value as "serializer.class".

The third property  "partitioner.class" defines what class to use to determine which Partition in the Topic the message is to be sent to. This is optional, but for any non-trivial implementation you are going to want to implement a partitioning scheme. More about the implementation of this class later. If you include a value for the key but haven't defined a partitioner.class Kafka will use the default partitioner. If the key is null, then the Producer will assign the message to a random Partition.

The last property "request.required.acks" tells Kafka that you want your Producer to require an acknowledgement from the Broker that the message was received. Without this setting the Producer will 'fire and forget' possibly leading to data loss. Additional information can be found here

Next you define the Producer object itself:

Producer<String, String> producer = new Producer<String, String>(config);

Note that the Producer is a Java Generic and you need to tell it the type of two parameters. The first is the type of the Partition key, the second the type of the message. In this example they are both Strings, which also matches to what we defined in the Properties above.

Now build your message:

Random rnd = new Random();

long runtime = new Date().getTime();

String ip = “192.168.2.” + rnd.nextInt(255);

String msg = runtime + “,www.example.com,” + ip;

 

In this example we are faking a message for a website visit by IP address. First part of the comma-separated message is the timestamp of the event, the second is the website and the third is the IP address of the requester. We use the Java Random class here to make the last octet of the IP vary so we can see how Partitioning works.

Finally write the message to the Broker:

KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);

producer.send(data);

The “page_visits” is the Topic to write to. Here we are passing the IP as the partition key. Note that if you do not include a key, even if you've defined a partitioner class, Kafka will assign the message to a random partition.

Full Source:

import java.util.*;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class TestProducer {
    public static void main(String[] args) {
        long events = Long.parseLong(args[0]);
        Random rnd = new Random();

        Properties props = new Properties();
        props.put("metadata.broker.list", "broker1:9092,broker2:9092 ");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("partitioner.class", "example.producer.SimplePartitioner");
        props.put("request.required.acks", "1");

        ProducerConfig config = new ProducerConfig(props);

        Producer<String, String> producer = new Producer<String, String>(config);

        for (long nEvents = 0; nEvents < events; nEvents++) { 
               long runtime = new Date().getTime();  
               String ip = “192.168.2.” + rnd.nextInt(255); 
               String msg = runtime + “,www.example.com,” + ip; 
               KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
               producer.send(data);
        }
        producer.close();
    }
}

 

Partitioning Code:

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class SimplePartitioner implements Partitioner {
    public SimplePartitioner (VerifiableProperties props) {

    }

    public int partition(Object key, int a_numPartitions) {
        int partition = 0;
        String stringKey = (String) key;
        int offset = stringKey.lastIndexOf('.');
        if (offset > 0) {
           partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions;
        }
       return partition;
  }

}

The logic takes the key, which we expect to be the IP address, finds the last octet and does a modulo operation on the number of partitions defined within Kafka for the topic. The benefit of this partitioning logic is all web visits from the same source IP end up in the same Partition. Of course so do other IPs, but your consumer logic will need to know how to handle that.

Before running this, make sure you have created the Topic page_visits. From the command line:

bin/kafka-create-topic.sh --topic page_visits --replica 3 --zookeeper localhost:2181 --partition 5

Make sure you include a --partition option so you create more than one.

Now compile and run your Producer and data will be written to Kafka.

To confirm you have data, use the command line tool to see what was written:

 bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic page_visits --from-beginning

Maven 

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.9.2</artifactId>
      <version>0.8.1.1</version>
      <scope>compile</scope>
      <exclusions>
        <exclusion>
          <artifactId>jmxri</artifactId>
          <groupId>com.sun.jmx</groupId>
        </exclusion>
        <exclusion>
          <artifactId>jms</artifactId>
          <groupId>javax.jms</groupId>
        </exclusion>
        <exclusion>
          <artifactId>jmxtools</artifactId>
          <groupId>com.sun.jdmk</groupId>
        </exclusion>
      </exclusions>
    </dependency>
  • No labels

17 Comments

  1. Anonymous

    It would be good to explain the error handling code around the send call, which may fail if the broker is down or if a network partition occurs. In 0.7.2 the client code throws NoBrokersForPartitionException; there may be other possible exceptions, I don't know.

  2. Hi, excited to see the improvements in 0.8 – very cool!

    I'm curious, though, why there seems to be no option to simply specify the Zookeeper host(s)/chroot when configuring a producer. I know there's some momentum behind removing Zookeeper as a requirement (which I think is great), but since we do use ZK in our environment, it was nice with 0.7.2 to be able to simply point at the ZK cluster and keep the actual Kafka config decoupled from the app.

    Any chance that could be brought back?

    nate

    PS: Maybe it does still exist? Is there an updated configuration options page for 0.8?

  3. Anonymous

    In the producer example provided for 0.7, there is no requirement of creating the topic before sending the message. see here: http://kafka.apache.org/07/quickstart.html.

    I guess in 0.7 the new topic will be created on the fly when you send the first message of the topic?

    Does this logic get change in 0.8?

  4. Anonymous

    I think it would be worth noting that the kafka jar file needed to write java code with is core/target/scala-2.8.0/kafka_2.8.0-0.8.0-SNAPSHOT.jar, assuming you've followed the 0.8 quickstart.

    Took me a bit to find that.

  5. when i send message through above producer code.. their is no exception but message will not receive at consumer.
    please help...

  6. Anonymous

    We currently are using 0.7 and is in process of upgrading to use 0.8. One thing I notice is, the producer no longer getting brokers' information from zookeeper. If I add a new broker into the cluster then I would have to add the new broker into the metadata.broker.list configuration and restart the producer. That's different behavior than in 0.7, am I right. I would like to know if there is any way of doing the reconfiguration during run-time rather offline reconfiguration in 0.8.

    1. which is the stable version of Kafka or from where i got it...please provide me the link

  7. Anonymous

    When running cluster,if network interrupt,producer do not throw excpetion (that i can handle) to upper layer program and at the same time it log too many useless logs. Is there a solution, i can know the producer send message failed. Please help. Kafka vesion is 0.72.

  8. Anonymous

    metadata.broker.list accepts input in the form "host1:port1,host2:port2" and not brokerids:port(as it may seem from the example).

    1. Anonymous

      You are brilliant. Thank you for posting that!

  9. Anonymous

    It's better to give out a dependency pom.

  10. Anonymous

    Kafka Guys, reply back to the questions/comments.

  11. Anonymous

    Has anybody get this exception as below ? Any idea about that?
    Exception in thread "main" kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
    at kafka.producer.Producer.send(Producer.scala:74)
    at kafka.javaapi.producer.Producer.send(Producer.scala:32)
    at vinson.test.kafka.ProducerTest.main(ProducerTest.java:24)

    1. Anonymous

      Try changing the broker1:9092,broker2:9093 to localhost:9092,localhost:9093. That worked for me.

    2. Anonymous

      Also, make sure the brokers are running before you run the code. Otherwise, there will no brokers to respond to the Producers in the code.

  12. Forgive my ignorance as I'm new to distributed systems, kafka and linux but I'm looking for an example of how to configure a producer to read from a log file as its source. Can anyone point me towards a good resource that is simple for a novice programmer to understand?

    1. Anonymous

      Just read a file into a buffer, and create a producer to send part of the buffer at one time or whatever you want. Is this right?