THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
public final class TopologyBuilder { // make class final compared to old TopologyBuilder // existing public API from current TopologyBuilder class that will not be changed public enum AutoOffsetReset { EARLIEST , LATEST } public TopologyBuilder(); public synchronized TopologyBuilder addSource(String name, String... topics); public synchronized TopologyBuilder addSource(String name, Pattern topicPattern); public synchronized TopologyBuilder addSource(AutoOffsetReset offsetReset, String name, String... topics); public synchronized TopologyBuilder addSource(AutoOffsetReset offsetReset, String name, Pattern topicPattern); public synchronized TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics); public synchronized TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, Pattern topicPattern); public synchronized TopologyBuilder addSource(AutoOffsetReset offsetReset, String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics); public synchronized TopologyBuilder addSource(AutoOffsetReset offsetReset, String name, Deserializer keyDeserializer, Deserializer valDeserializer, Pattern topicPattern; public synchronized TopologyBuilder addSink(String name, String topic, String... parentNames); public synchronized TopologyBuilder addSink(String name, String topic, Serializer keySerializer, Serializer valSerializer, String... parentNames); public synchronized TopologyBuilder addSink(String name, String topic, StreamPartitioner partitioner, String... parentNames); public synchronized <K, V> TopologyBuilder addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames); public synchronized TopologyBuilder addProcessor(String name, ProcessorSupplier supplier, String... parentNames); public synchronized TopologyBuilder addStateStore(StateStoreSupplier supplier, String... processorNames); public synchronized TopologyBuilder addGlobalStore(final StateStore store, final String sourceName, final Deserializer keyDeserializer, final Deserializer valueDeserializer, final String topic, final String processorName, final ProcessorSupplier stateUpdateSupplier); public synchronized TopologyBuilder connectProcessorAndStateStores(String processorName, String... stateStoreNames); // newly added method to support builder pattern // should be used instead of KafkaStreams() constructor // old: // TopologyBuilder builder = new TopologyBuilder(); // KafkaStreams streams = new KafkaStreams(builder, new StreamsConfig(...)); // new: // TopologyBuilder builder = new TopologyBuilder(); // KafkaStreams streams = builder.build(new StreamsConfig(...)); public synchronized KafkaStreams build(final StreamsConfig config); public synchronized KafkaStreams build(final StreamsConfig config, final KafkaClientSupplier clientSupplier); } |
New org.apache.kafka.streams.KStreamBuilder
class:
Code Block | ||
---|---|---|
| ||
public final class KStreamBuilder { // existing public API from current TopologyBuilder class that will not be changed // small change: adding `synchronized` to all methods to align with TopologyBuilder public KStreamBuilder(); public synchronized <K, V> KStream<K, V> stream(final String... topics); public synchronized <K, V> KStream<K, V> stream(final Pattern topicPattern); public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final String... topics); public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Pattern topicPattern); public synchronized <K, V> KStream<K, V> stream(final Serde<K> keySerde, final Serde<V> valSerde, final String... topics); public synchronized <K, V> KStream<K, V> stream(final Serde<K> keySerde, final Serde<V> valSerde, final Pattern topicPattern); public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Serde<K> keySerde, final Serde<V> valSerde, final String... topics); public synchronized <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Serde<K> keySerde, final Serde<V> valSerde, final Pattern topicPattern); public synchronized <K, V> KTable<K, V> table(final String topic, final String storeName); public synchronized <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, final String topic, final String storeName); public synchronized <K, V> KTable<K, V> table(final Serde<K> keySerde, final Serde<V> valSerde, final String topic, final String storeName); public synchronized <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset, final Serde<K> keySerde, final Serde<V> valSerde, final String topic, final String storeName); public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic, final String storeName); public synchronized <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde, final Serde<V> valSerde, final String topic, final String storeName); public synchronized <K, V> KStream<K, V> merge(final KStream<K, V>... streams); // newly added method to support builder pattern // should be used instead of KafkaStreams() constructor // old: // KStreamBuilder builder = new KStreamBuilder(); // KafkaStreams streams = new KafkaStreams(builder, new StreamsConfig(...)); // new: // KStreamBuilder builder = new KStreamBuilder(); // KafkaStreams streams = builder.build(new StreamsConfig(...)); public synchronized KafkaStreams build(final StreamsConfig config); public synchronized KafkaStreams build(final StreamsConfig config, final KafkaClientSupplier clientSupplier); // return the internally used TopologyBuilder public synchronized TopologyBuilder topologyBuilder(); } |
Proposed Changes
Refactor the code to remove internal methods from public API.
...