...
After the improvements, we will replace the easyMock StoreBuilder without mock StoreBuilderwith our mockStoreBuilder.
Code Block |
---|
public class TopologyTest { private final StoreBuilder storeBuilder = EasyMock.createNiceMock(StoreBuilder.class); private final Topology topology = new Topology(); private final MockStoreFactory mockStoreFactory = new MockStoreFactory<>(); private final KeyValueStoreBuilder keyValueStoreBuilder = mockStoreFactory.createKeyValueStoreBuilder( "store", Stores.inMemoryKeyValueStore("store"), Serdes.Bytes(), Serdes.Bytes(), false); ... @Test(expected = TopologyException.class) public void shouldNotAllowToAddStateStoreToNonExistingProcessor() { topology.addStateStore(keyValueStoreBuilder, "no-such-processor"); } } |
...
We add some new classes to the kafka-streams-test-utils
under a new org.apache.kafka.streams.state package.
We will provide a MockStoreFactory to generate Mock store builders:mock store builders, I will use the KeyValueStoreBuilder as an example. Window and Session will have a similar structure.
The developers/users can provide their own store as the backend storage, and their own Serde of choice. For example, for simple testing, they can just use an InMemoryKeyValueStore.
Code Block |
---|
package org.apache.kafka.streams.internals; public class MockStoreFactory<K, V> { public final Map<String, StoreBuilder> stateStores = new LinkedHashMap<>(); public MockStoreFactory () { } public KeyValueStoreBuilder createKeyValueStoreBuilder(KeyValueBytesStoreSupplier keyValueBytesStoreSupplier, final Serde<K> keySerde, final Serde<V> valueSerde, boolean persistent){ String storeName = keyValueBytesStoreSupplier.name(); stateStores.put(storeName, new MockKeyValueStoreBuilder<>(storeName, keyValueBytesStoreSupplier, keySerde, valueSerde, persistent)); return (KeyValueStoreBuilder)stateStores.get(storeName); } public WindowStoreBuilder createWindowStoreBuilder(KeyValueBytesStoreSupplier keyValueBytesStoreSupplier, final Serde<K> keySerde, final Serde<V> valueSerde, final Time time){ ... } public SessionStoreBuilder createSessionStoreBuilder(KeyValueBytesStoreSupplier keyValueBytesStoreSupplier, final Serde<K> keySerde, final Serde<V> valueSerde, final Time time){ ... } public StoreBuilder getStore(String storeName) { return stateStores.get(storeName); } } |
I will use the KeyValueStoreBuilder as an example. Window and Session will have a similar structure.
Each Store builder will have a build method:
Code Block |
---|
package org.apache.kafka.streams.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.MockTime; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.internals.AbstractStoreBuilder; public class MockKeyValueStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, KeyValueStore> { private final boolean persistent; private final KeyValueBytesStoreSupplier storeSupplier; public MockKeyValueStoreBuilder(final String storeName, final KeyValueBytesStoreSupplier storeSupplier, final Serde<K> keySerde, final Serde<V> valueSerde, final boolean persistent) { super(storeName, keySerde, valueSerde, new MockTime(0)); this.persistent = persistent; this.storeSupplier = storeSupplier; } @Override public KeyValueStore build() { return new MockKeyValueStore<>(name, storeSupplier.get(), persistent); } } |
Then in the Storestore, we will build a wrapper around a InMemoryStore, and capture all the get/put calls (excluding Iterators)the provided backend store.
Code Block |
---|
package org.apache.kafka.streams.internals; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import java.util.LinkedList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; public class MockKeyValueStore<K, V> implements KeyValueStore<K, V> { // keep a global counter of flushes and a local reference to which store had which // flush, so we can reason about the order in which stores get flushed. private static final AtomicInteger GLOBAL_FLUSH_COUNTER = new AtomicInteger(0); private final AtomicInteger instanceLastFlushCount = new AtomicInteger(-1); private final String name; private final boolean persistent; public boolean initialized = false; public boolean flushed = false; public boolean closed = true; public final List<KeyValue<K, V>> capturedPutCalls = new LinkedList<>(); public final List<KeyValue<K, V>> capturedGetCalls = new LinkedList<>(); public final List<KeyValue<K, V>> capturedDeleteCalls = new LinkedList<>(); public final KeyValueStore<K, V> innerKeyValueStore; public MockKeyValueStore(final String name, final KeyValueStore<K, V> innerKeyValueStore, final boolean persistent) { this.name = name; this.innerKeyValueStore = innerKeyValueStore; this.persistent = persistent; } @Override public String name() { return name; } @Override public void init(final ProcessorContext context, final StateStore root) { context.register(root, stateRestoreCallback); initialized = true; closed = false; } @Override public void flush() { instanceLastFlushCount.set(GLOBAL_FLUSH_COUNTER.getAndIncrement()); innerKeyValueStore.flush(); flushed = true; } public int getLastFlushCount() { return instanceLastFlushCount.get(); } @Override public void close() { innerKeyValueStore.close(); closed = true; } @Override public boolean persistent() { return persistent; } @Override public boolean isOpen() { return !closed; } public final StateRestoreCallback stateRestoreCallback = new StateRestoreCallback() { @Override public void restore(final byte[] key, final byte[] value) { } }; @Override public void put(final K key, final V value) { capturedPutCalls.add(new KeyValue<>(key, value)); innerKeyValueStore.put(key, value); } @Override public V putIfAbsent(final K key, final V value) { final V originalValue = get(key); if (originalValue == null) { put(key, value); capturedPutCalls.add(new KeyValue<>(key, value)); } return originalValue; } @Override public V delete(final K key) { V value = innerKeyValueStore.delete(key); capturedDeleteCalls.add(new KeyValue<>(key, value)); return value; } @Override public void putAll(final List<KeyValue<K, V>> entries) { for (final KeyValue<K, V> entry : entries) { put(entry.key, entry.value); capturedPutCalls.add(entry); } } @Override public V get(final K key) { V value = innerKeyValueStore.get(key); capturedGetCalls.add(new KeyValue<>(key, value)); return value; } @Override public KeyValueIterator<K,V> range(final K from, final K to) { return innerKeyValueStore.range(from, to); } @Override public KeyValueIterator<K,V> all() { return innerKeyValueStore.all(); } @Override public long approximateNumEntries() { return innerKeyValueStore.approximateNumEntries(); } } |
...