Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
public interface ProcessorContext {

    void send(String topic, Object key, Object value);  // send the key value-pair to a Kafka topic

    void schedule(long timestamp);                      // repeatedly schedule the punctuation function for the period

    void commit();                                      // commit the current state, along with the upstream offset and the downstream sent data

    String topic();                                     // return the Kafka record's topic of the current processing key-value pair

    int partition();                                    // return the Kafka record's partition id of the current processing key-value pair

    long offset();                                      // return the Kafka record's offset of the current processing key-value pair
}
 
public interface Processor<K1Processor<K, V1, K2, V2>V>  {

    void init(ProcessorContext context);           // initialize the processor

    void process(K1 key, V1 value);                // process a key-value pair
 
    void forward(K2 key, V2 value);punctuate();                              // forwardprocess awhen key-valuethe pairthe toscheduled thetime downstreamhas chained processorreached
 
    void punctuateclose();                              // process when the the// scheduledclose timethe has reachedprocessor
}
 
public abstractinterface class KafkaProcessor<K1, V2, K2, V2> implements Processor<K1, V1, K2, V2> {
 
    @Override
    public final forward(K2 key, V2 value) { 
        .... // implemented to trigger the downstream chained processor's process()
    }
 
    @Override
    public void punctuate() {
        // do nothing, can be overriden by users
    }

    @Override
    public void init(ProcessorContext context) {
ProcessorDef {

    Processor instance();                          // docreate nothing,a cannew beinstance overridenof bythe users
processor from its  }

    @Override
    public void close() {
        // do nothing, can be overriden by users
    }
}definition
}

 

And users can create their processing job with the created processor topology as the following:

Code Block
languagejava
public class StatefulProcessJobProcessorJob {

    private static class AggregateProcessorMyProcessorDef extends KafkaProcessor<String, Integer, String, Integer>implements ProcessorDef {

        private ProcessorContext context;@Override
        privatepublic KeyValueStore<StringProcessor<String, Integer> kvStore;

 instance() {
            public MyProcessor(String namereturn new Processor<String, Integer>() {
                 super(name)private ProcessorContext context;
        }

        private KeyValueStore<String, Integer> @OverridekvStore;

         public    void init(ProcessorContext context) {@Override
             this.context = context;
 public void init(ProcessorContext context) {
       this.context.schedule(this, 1000);

            this.kvStorecontext = new InMemoryKeyValueStore<>("local-state", context);
        }

        @Override
    this.context.schedule(this, 1000);
   public void process(String key, Integer value) {
           this.kvStore Integer= oldValue = this.kvStore.get(keynew InMemoryKeyValueStore<>("local-state", context);
            if (oldValue == null) {
}

                @Override
       this.kvStore.put(key, value);
        public void process(String key, }Integer elsevalue) {
                int   newValue =Integer oldValue + value= this.kvStore.get(key);
                this.kvStore.put(key, newValue);    if (oldValue == null) {
            }

            contextthis.kvStore.commitput(key, value);
        }

        @Override
    } else {
  public void punctuate(long streamTime) {
            KeyValueIterator<String, Integer> iter = this.kvStore.all();
  int newValue = oldValue + value;
     while (iter.hasNext()) {
                Entry<String, Integer> entry = iter.next( this.kvStore.put(key, newValue);
                forward(entry.key(), entry.value());
    }

        }
            thiscontext.kvStore.clearcommit();
        }

        @Override}

         public void close() {
    @Override
        this.kvStore.close();
        }
public void punctuate(long streamTime) }{
 
      private static class FilterProcessor extends KafkaProcessor<String, Integer, String, Integer> {
     KeyValueIterator<String, Integer> iter public MyProcessor(String name) {= this.kvStore.all();

            super(name);
          }

      while  @Override
(iter.hasNext()) {
         public void process(String key, Integer value) {
         Entry<String, Integer> entry if= (value > 10)
iter.next();

                forward(key, value)
        }
    }

    // create the topologySystem.out.println("[" + entry.key() + ", " + entry.value() + "]");

    private static class MyPTopology extends ProcessorTopology {

        @Override
      context.forward(entry.key(),  public void build() {
entry.value());
                KafkaProcessor<String, Integer, String, Integer> source;}
            KafkaProcessor<String, Integer, String, Integer> aggregate;}

            KafkaProcessor<String, Integer, String, Integer> filter;@Override
            source = addSource(new StringDeserializer(), new IntegerDeserializer(), "topic-source");
public void close() {
                 aggregate = new AggregateProcessor("aggregate"this.kvStore.close();
            filter  = new FilterProcessor("filter"); }
            addProcessor(aggregate, source)};
        }
    addProcessor(filter, aggregate);}

    public static void main(String[] args) throws Exception {
        }
StreamingConfig config =  }

new StreamingConfig(new Properties());
     public static void main(String[] args) throws Exception {TopologyBuilder builder = new TopologyBuilder();
        KafkaProcessKafkaStreaming processstreaming = new KafkaProcessKafkaStreaming(MyPTopology.classbuilder, new ProcessorProperties(new Properties())config);
        processstreaming.start();
    }
}

 

This example API demonstrates the abstraction of the low-level consumer / producer interfaces, such as consumer.poll() / commit(), producer.send(callback), producer.flush(), etc.

...

With this high-level interface, the user instantiated program can be simplified as (using lambda expression in Java 0.8):

Code Block
languagejavajava
public class KStreamJob {

public class StatefulProcessJob {

    private static class MyKStreamTopology extends KStreamTopology {

        @Override
        public void build() {
    public static void main(String[] args) throws   // create a source streamException {
        StreamingConfig    KStream<String, String> stream1 config = from(new StringDeserializerStreamingConfig(), new StringDeserializer(), "topic-source")props);

        KStreamBuilder builder =  // parse the value string to integernew KStreamBuilder();

        StringSerializer stringSerializer =  KStream<String, Integer> stream2 =
new StringSerializer();
        IntegerSerializer intSerializer = new IntegerSerializer();

       stream1.map((String key KStream<String, StringString> value)stream1 -> return = builder.from(new KeyValue<>StringDeserializer(key), new IntegerStringDeserializer(value))), "topic1");

        KStream<String, Integer> stream2 =
            stream1.filtermap((String key, Integer value) -> return value > 10));

new KeyValue<>(key, new Integer(value)))
            // branch two streams with odd / even values
     .filter(((key, value) -> true));

        KStream<String, Integer>[] streams = stream2.branch(
                (String .branch((key, Integer value) -> return isOdd()value > 10,
                    (String key, Integer value) -> return isEven());

            // send result stream to Kafka topics
   value <= 10);
 
         streams[0].sendTo("topic-odd"topic2", stringSerializer, intSerializer);
            streams[1].sendTo("topic-even"topic3", stringSerializer, intSerializer);
        }
    }

    publicKafkaStreaming static void main(String[] args) throws Exception {
        KafkaProcess process kstream = new KafkaProcessKafkaStreaming(MyKStreamTopology.classbuilder, new ProcessorProperties(new Properties()))config);
        processkstream.start();
    }
}

 

Architecture Design

...