Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: added check to code (and explanation) to handle when compressed messages return old messages

...

Code Block
           if (fetchResponse.hasError()) {
                numErrors++;
                // Something went wrong!
                short code = fetchResponse.errorCode(a_topic, a_partition);
                System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
                if (numErrors > 5) break;

                if (code == ErrorMapping.OffsetOutOfRangeCode())  {
                    // We asked for an invalid offset. For simple case ask for the last element to reset
                    readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.EarliestTimeLatestTime(), clientName);
                    continue;
                }
                consumer.close();
                consumer = null;
                leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
                continue;
            }

...

Code Block
            FetchRequest req = new FetchRequestBuilder()
                    .clientId(clientName)
                    .addFetch(a_topic, a_partition, readOffset, 100000)
                    .build();
            FetchResponse fetchResponse = consumer.fetch(req);

            if (fetchResponse.hasError()) {
                    // See code in previous section
            }
            numErrors = 0;

            long numRead = 0;
            for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
                ByteBufferlong payloadcurrentOffset = messageAndOffset.message().payloadoffset();
                  readOffset = messageAndOffset.nextOffset();
         if  (currentOffset  <   byte[] bytes = new byte[payload.limit()];
                payload.get(bytes);
               readOffset) {
                    System.out.println(String.valueOf(messageAndOffset.offset()) + ""Found an old offset: " + new String(bytes, "UTF-8"));
                numRead++;
                a_maxReads--;
            }

            if (numRead == 0) {
                try {
                    Thread.sleep(1000);
currentOffset + " Expecting: " + readOffset);
                    continue;
                }
                readOffset = messageAndOffset.nextOffset();
                ByteBuffer payload = messageAndOffset.message().payload();

                } catch (InterruptedException ie) { byte[] bytes = new byte[payload.limit()];
                }payload.get(bytes);
            }
    System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
                numRead++;
                a_maxReads--;
            }

            if (numRead == 0) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                }
            }

Note that the ‘readOffset’ asks the last read message what the next Offset would be. This way when the block of messages is processed we know where to ask Kafka where to start the next fetchwe know where to ask Kafka where to start the next fetch.

Also note that we are explicitly checking that the offset being read is not less than the offset that we requested. This is needed since if Kafka is compressing the messages, the fetch request will return an entire compressed block even if the requested offset isn't the beginning of the compressed block. Thus a message we saw previously may be returned again.

Finally, we keep track of the # of messages read. If we didn't read anything on the last request we go to sleep for a second so we aren't hammering Kafka when there is no data.

...

Code Block
package com.test.simple;
;

import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.*;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.apimessage.FetchRequestMessageAndOffset;

import kafkajava.apinio.FetchRequestBuilderByteBuffer;
import kafkajava.apiutil.PartitionOffsetRequestInfoArrayList;
import kafkajava.commonutil.ErrorMappingHashMap;
import kafkajava.commonutil.TopicAndPartitionList;
import kafkajava.javaapiutil.*;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class SimpleExample {
    public static void main(String args[]) {Map;

public class SimpleExample {
    public static void main(String args[]) {
        SimpleExample example = new SimpleExample();
        long maxReads = Long.parseLong(args[0]);
        String topic = args[1];
        int partition = Integer.parseInt(args[2]);
        List<String> seeds = new ArrayList<String>();
        seeds.add(args[3]);
        SimpleExampleint exampleport = new SimpleExample(Integer.parseInt(args[4]);
        long maxReads = Long.parseLong(args[0]try {
            example.run(maxReads, topic, partition, seeds, port);
        String topic = args[1];} catch (Exception e) {
        int  partition = Integer.parseInt(args[2]  System.out.println("Oops:" + e);
        List<String>  seeds = new ArrayList<String>   e.printStackTrace();
        seeds.add(args[3]);}
    }

    int portprivate List<String> m_replicaBrokers = Integer.parseInt(args[4]new ArrayList<String>();

     public  SimpleExample() try {
        m_replicaBrokers  =  new example.run(maxReads, topic, partition, seeds, portArrayList<String>();
    }

    }public catchvoid run(Exception e) {
            System.out.println("Oops:" + e);
             e.printStackTrace();long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {
        }
    }

    private List<String> m_replicaBrokers = new ArrayList<String>();

    public SimpleExample() {// find the meta data about the topic and partition we are interested in
        m_replicaBrokers = new ArrayList<String>();
//
      }

   PartitionMetadata publicmetadata void= run(longfindLeader(a_seedBrokers, a_maxReadsport, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {
);
        if (metadata == null) {
            //System.out.println("Can't find the meta data about the topic and partition we are interested inmetadata for Topic and Partition. Exiting");
            return;
        }
        // String leadBroker = metadata.leader().host();
        PartitionMetadataString metadataclientName = findLeader(a_seedBrokers,"Client_" + a_port, a_topic,topic + "_" + a_partition);

        if (metadata == null) {
   SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
          System.out.println("Can't find metadata for Topic and Partition. Exiting");long readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);

        int  numErrors  = return0;
        }
        String leadBroker = metadata.leader().host();while (a_maxReads > 0) {
        String  clientName  = "Client_" + a_topic + "_" + a_partition;

if (consumer == null) {
              SimpleConsumer  consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
            }
            longFetchRequest readOffsetreq = new getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);
FetchRequestBuilder()
                    .clientId(clientName)
        int  numErrors  = 0;
        while (.addFetch(a_topic, a_maxReadspartition, >readOffset, 0100000)
  {
            if  (consumer  == null) {  .build();
             FetchResponse  fetchResponse consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);consumer.fetch(req);

            } if (fetchResponse.hasError()) {
            FetchRequest  req = new FetchRequestBuilder()  numErrors++;
                //  Something   .clientId(clientName)went wrong!
                short  code  = fetchResponse.addFetcherrorCode(a_topic, a_partition, readOffset, 100000)
                    .build();
                System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
            FetchResponse  fetchResponse  =if consumer.fetch(req);

(numErrors > 5) break;
                if (fetchResponse.hasErrorcode == ErrorMapping.OffsetOutOfRangeCode())  {
                numErrors++;
                // Something went wrong!// We asked for an invalid offset. For simple case ask for the last element to reset
                short    codereadOffset = fetchResponse.errorCodegetLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);
                    continue;
      System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code          }
                consumer.close();
                ifconsumer (numErrors > 5) break= null;
                leadBroker if= (code == ErrorMapping.OffsetOutOfRangeCode())  {findNewLeader(leadBroker, a_topic, a_partition, a_port);
                continue;
    //  We  asked  for  an}
  invalid  offset.  For  simple  case  asknumErrors for the last element to reset
= 0;

            long numRead = 0;
            readOffset = getLastOffset(consumer, for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);)) {
                 long currentOffset   continue= messageAndOffset.offset();
                 }
               if consumer.close();
                consumer = null;
                leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
                continue;
            }
            numErrors = 0;

            long numRead = 0;
            for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
                ByteBuffer payload = messageAndOffset.message().payload();
                readOffset = messageAndOffset.nextOffset();(currentOffset < readOffset) {
                    System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
                    continue;
                }
                readOffset = messageAndOffset.nextOffset();
                ByteBuffer payload = messageAndOffset.message().payload();

                byte[] bytes = new byte[payload.limit()];
                payload.get(bytes);
                System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
                numRead++;
                a_maxReads--;
            }

            if (numRead == 0) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                }
            }
        }
        if (consumer != null) consumer.close();
    }

    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                     long whichTime, String clientName) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
                requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
        OffsetResponse response = consumer.getOffsetsBefore(request);

        if (response.hasError()) {
            System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
            return 0;
        }
        long[] offsets = response.offsets(topic, partition);
        return offsets[0];
    }

    private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {
        for (int i = 0; i < 3; i++) {
            boolean goToSleep = false;
            PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
            if (metadata == null) {
                goToSleep = true;
            } else if (metadata.leader() == null) {
                goToSleep = true;
            } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
                // first time through if the leader hasn't changed give ZooKeeper a second to recover
                // second time, assume the broker did recover before failover, or it was a non-Broker issue
                //
                goToSleep = true;
            } else {
                return metadata.leader().host();
            }
            if (goToSleep) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                }
            }
        }
        System.out.println("Unable to find new leader after Broker failure. Exiting");
        throw new Exception("Unable to find new leader after Broker failure. Exiting");
    }

    private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {
        PartitionMetadata returnMetaData = null;
        for (String seed : a_seedBrokers) {
            SimpleConsumer consumer = null;
            try {
                consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
                List<String> topics = new ArrayList<String>();
                topics.add(a_topic);
                TopicMetadataRequest req = new TopicMetadataRequest(topics);
                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

                List<TopicMetadata> metaData = resp.topicsMetadata();
                for (TopicMetadata item : metaData) {
                    for (PartitionMetadata part : item.partitionsMetadata()) {
                        if (part.partitionId() == a_partition) {
                            returnMetaData = part;
                            break;
                        }
                    }
                }
            } catch (Exception e) {
                System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic
                        + ", " + a_partition + "] Reason: " + e);
            } finally {
                if (consumer != null) consumer.close();
            }
        }
        if (returnMetaData != null) {
            m_replicaBrokers.clear();
            for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
                m_replicaBrokers.add(replica.host());
            }
        }
        return returnMetaData;
    }
}