THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
public interface ProcessorContext { void send(String topic, Object key, Object value); // send the key value-pair to a Kafka topic void schedule(long timestamp); // repeatedly schedule the punctuation function for the period void commit(); // commit the current state, along with the upstream offset and the downstream sent data String topic(); // return the Kafka record's topic of the current processing key-value pair int partition(); // return the Kafka record's partition id of the current processing key-value pair long offset(); // return the Kafka record's offset of the current processing key-value pair } public interface Processor<K1Processor<K, V1, K2, V2>V> { void init(ProcessorContext context); // initialize the processor void process(K1 key, V1 value); // process a key-value pair void forward(K2 key, V2 value);punctuate(); // forwardprocess awhen key-valuethe pairthe toscheduled thetime downstreamhas chained processorreached void punctuateclose(); // process when the the// scheduledclose timethe has reachedprocessor } public abstractinterface class KafkaProcessor<K1, V2, K2, V2> implements Processor<K1, V1, K2, V2> { @Override public final forward(K2 key, V2 value) { .... // implemented to trigger the downstream chained processor's process() } @Override public void punctuate() { // do nothing, can be overriden by users } @Override public void init(ProcessorContext context) { ProcessorDef { Processor instance(); // docreate nothing,a cannew beinstance overridenof bythe users processor from its } @Override public void close() { // do nothing, can be overriden by users } }definition } |
And users can create their processing job with the created processor topology as the following:
Code Block | ||
---|---|---|
| ||
public class StatefulProcessJobProcessorJob { private static class AggregateProcessorMyProcessorDef extends KafkaProcessor<String, Integer, String, Integer>implements ProcessorDef { private ProcessorContext context;@Override privatepublic KeyValueStore<StringProcessor<String, Integer> kvStore; instance() { public MyProcessor(String namereturn new Processor<String, Integer>() { super(name)private ProcessorContext context; } private KeyValueStore<String, Integer> @OverridekvStore; public void init(ProcessorContext context) {@Override this.context = context; public void init(ProcessorContext context) { this.context.schedule(this, 1000); this.kvStorecontext = new InMemoryKeyValueStore<>("local-state", context); } @Override this.context.schedule(this, 1000); public void process(String key, Integer value) { this.kvStore Integer= oldValue = this.kvStore.get(keynew InMemoryKeyValueStore<>("local-state", context); if (oldValue == null) { } @Override this.kvStore.put(key, value); public void process(String key, }Integer elsevalue) { int newValue =Integer oldValue + value= this.kvStore.get(key); this.kvStore.put(key, newValue); if (oldValue == null) { } contextthis.kvStore.commitput(key, value); } @Override } else { public void punctuate(long streamTime) { KeyValueIterator<String, Integer> iter = this.kvStore.all(); int newValue = oldValue + value; while (iter.hasNext()) { Entry<String, Integer> entry = iter.next( this.kvStore.put(key, newValue); forward(entry.key(), entry.value()); } } thiscontext.kvStore.clearcommit(); } @Override} public void close() { @Override this.kvStore.close(); } public void punctuate(long streamTime) }{ private static class FilterProcessor extends KafkaProcessor<String, Integer, String, Integer> { KeyValueIterator<String, Integer> iter public MyProcessor(String name) {= this.kvStore.all(); super(name); } while @Override (iter.hasNext()) { public void process(String key, Integer value) { Entry<String, Integer> entry if= (value > 10) iter.next(); forward(key, value) } } // create the topologySystem.out.println("[" + entry.key() + ", " + entry.value() + "]"); private static class MyPTopology extends ProcessorTopology { @Override context.forward(entry.key(), public void build() { entry.value()); KafkaProcessor<String, Integer, String, Integer> source;} KafkaProcessor<String, Integer, String, Integer> aggregate;} KafkaProcessor<String, Integer, String, Integer> filter;@Override source = addSource(new StringDeserializer(), new IntegerDeserializer(), "topic-source"); public void close() { aggregate = new AggregateProcessor("aggregate"this.kvStore.close(); filter = new FilterProcessor("filter"); } addProcessor(aggregate, source)}; } addProcessor(filter, aggregate);} public static void main(String[] args) throws Exception { } StreamingConfig config = } new StreamingConfig(new Properties()); public static void main(String[] args) throws Exception {TopologyBuilder builder = new TopologyBuilder(); KafkaProcessKafkaStreaming processstreaming = new KafkaProcessKafkaStreaming(MyPTopology.classbuilder, new ProcessorProperties(new Properties())config); processstreaming.start(); } } |
This example API demonstrates the abstraction of the low-level consumer / producer interfaces, such as consumer.poll() / commit(), producer.send(callback), producer.flush(), etc.
...
With this high-level interface, the user instantiated program can be simplified as (using lambda expression in Java 0.8):
Code Block | ||||
---|---|---|---|---|
| ||||
public class KStreamJob { public class StatefulProcessJob { private static class MyKStreamTopology extends KStreamTopology { @Override public void build() { public static void main(String[] args) throws // create a source streamException { StreamingConfig KStream<String, String> stream1 config = from(new StringDeserializerStreamingConfig(), new StringDeserializer(), "topic-source")props); KStreamBuilder builder = // parse the value string to integernew KStreamBuilder(); StringSerializer stringSerializer = KStream<String, Integer> stream2 = new StringSerializer(); IntegerSerializer intSerializer = new IntegerSerializer(); stream1.map((String key KStream<String, StringString> value)stream1 -> return = builder.from(new KeyValue<>StringDeserializer(key), new IntegerStringDeserializer(value))), "topic1"); KStream<String, Integer> stream2 = stream1.filtermap((String key, Integer value) -> return value > 10)); new KeyValue<>(key, new Integer(value))) // branch two streams with odd / even values .filter(((key, value) -> true)); KStream<String, Integer>[] streams = stream2.branch( (String .branch((key, Integer value) -> return isOdd()value > 10, (String key, Integer value) -> return isEven()); // send result stream to Kafka topics value <= 10); streams[0].sendTo("topic-odd"topic2", stringSerializer, intSerializer); streams[1].sendTo("topic-even"topic3", stringSerializer, intSerializer); } } publicKafkaStreaming static void main(String[] args) throws Exception { KafkaProcess process kstream = new KafkaProcessKafkaStreaming(MyKStreamTopology.classbuilder, new ProcessorProperties(new Properties()))config); processkstream.start(); } } |
Architecture Design
...