Status
Current state: "Under Discussion"
Discussion thread: here
JIRA: KAFKA-6460
Motivation
This is for adding mock testing support for StateStore, StoreBuilder, StoreSupplier and other store related components which are used in Streams unit testing.
We'd like to use mocks for different types of state stores: KV, window, session - that can be used to record the number of expected put / get calls used in the DSL operator unit testing. These will provide convenience for developers when they are writing unit test for Kafka stream and other related modules.
For example, in the current streams/TopologyTest.java:
public class TopologyTest { private final StoreBuilder storeBuilder = EasyMock.createNiceMock(StoreBuilder.class); private final KeyValueStoreBuilder globalStoreBuilder = EasyMock.createNiceMock(KeyValueStoreBuilder.class); private final Topology topology = new Topology(); ... @Test public void shouldFailIfSinkIsParent() { topology.addSource("source", "topic-1"); topology.addSink("sink-1", "topic-2", "source"); try { topology.addSink("sink-2", "topic-3", "sink-1"); fail("Should throw TopologyException for using sink as parent"); } catch (final TopologyException expected) { } } @Test(expected = TopologyException.class) public void shouldNotAllowToAddStateStoreToNonExistingProcessor() { mockStoreBuilder(); EasyMock.replay(storeBuilder); topology.addStateStore(storeBuilder, "no-such-processor"); } @Test public void shouldNotAllowToAddStateStoreToSource() { mockStoreBuilder(); EasyMock.replay(storeBuilder); topology.addSource("source-1", "topic-1"); try { topology.addStateStore(storeBuilder, "source-1"); fail("Should have thrown TopologyException for adding store to source node"); } catch (final TopologyException expected) { } } private void mockStoreBuilder() { EasyMock.expect(storeBuilder.name()).andReturn("store").anyTimes(); EasyMock.expect(storeBuilder.logConfig()).andReturn(Collections.emptyMap()); EasyMock.expect(storeBuilder.loggingEnabled()).andReturn(false); } }
One of the goal is to replace the in-test vanilla mockStoreBuilder with a more general purpose mockStoreBuilder class, which simplify writing unit test and can be reuse later.
After the improvements, we will replace the easyMock StoreBuilder without mock StoreBuilder.
public class TopologyTest { private final StoreBuilder storeBuilder = EasyMock.createNiceMock(StoreBuilder.class); private final Topology topology = new Topology(); private final MockStoreFactory mockStoreFactory = new MockStoreFactory<>(); private final KeyValueStoreBuilder keyValueStoreBuilder = mockStoreFactory.createKeyValueStoreBuilder( "store", Stores.inMemoryKeyValueStore("store"), Serdes.Bytes(), Serdes.Bytes(), false); ... @Test(expected = TopologyException.class) public void shouldNotAllowToAddStateStoreToNonExistingProcessor() { topology.addStateStore(keyValueStoreBuilder, "no-such-processor"); } }
Public Interfaces
We add some new classes to the kafka-streams-test-utils
under a new org.apache.kafka.streams.state package.
We will provide a MockStoreFactory to generate Mock store builders:
package org.apache.kafka.streams.internals; public class MockStoreFactory<K, V> { public final Map<String, StoreBuilder> stateStores = new LinkedHashMap<>(); public MockStoreFactory () { } public KeyValueStoreBuilder createKeyValueStoreBuilder(KeyValueBytesStoreSupplier keyValueBytesStoreSupplier, final Serde<K> keySerde, final Serde<V> valueSerde, boolean persistent){ String storeName = keyValueBytesStoreSupplier.name(); stateStores.put(storeName, new MockKeyValueStoreBuilder<>(storeName, keyValueBytesStoreSupplier, keySerde, valueSerde, persistent)); return (KeyValueStoreBuilder)stateStores.get(storeName); } public WindowStoreBuilder createWindowStoreBuilder(KeyValueBytesStoreSupplier keyValueBytesStoreSupplier, final Serde<K> keySerde, final Serde<V> valueSerde, final Time time){ ... } public SessionStoreBuilder createSessionStoreBuilder(KeyValueBytesStoreSupplier keyValueBytesStoreSupplier, final Serde<K> keySerde, final Serde<V> valueSerde, final Time time){ ... } public StoreBuilder getStore(String storeName) { return stateStores.get(storeName); } }
I will use the KeyValueStoreBuilder as an example. Window and Session will have a similar structure.
Each Store builder will have a build method:
package org.apache.kafka.streams.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.MockTime; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.internals.AbstractStoreBuilder; public class MockKeyValueStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, KeyValueStore> { private final boolean persistent; private final KeyValueBytesStoreSupplier storeSupplier; public MockKeyValueStoreBuilder(final String storeName, final KeyValueBytesStoreSupplier storeSupplier, final Serde<K> keySerde, final Serde<V> valueSerde, final boolean persistent) { super(storeName, keySerde, valueSerde, new MockTime(0)); this.persistent = persistent; this.storeSupplier = storeSupplier; } @Override public KeyValueStore build() { return new MockKeyValueStore<>(name, storeSupplier.get(), persistent); } }
Then in the Store, we will build a wrapper around a InMemoryStore, and capture all the get/put calls (excluding Iterators)
package org.apache.kafka.streams.internals; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import java.util.LinkedList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; public class MockKeyValueStore<K, V> implements KeyValueStore<K, V> { // keep a global counter of flushes and a local reference to which store had which // flush, so we can reason about the order in which stores get flushed. private static final AtomicInteger GLOBAL_FLUSH_COUNTER = new AtomicInteger(0); private final AtomicInteger instanceLastFlushCount = new AtomicInteger(-1); private final String name; private final boolean persistent; public boolean initialized = false; public boolean flushed = false; public boolean closed = true; public final List<KeyValue<K, V>> capturedPutCalls = new LinkedList<>(); public final List<KeyValue<K, V>> capturedGetCalls = new LinkedList<>(); public final List<KeyValue<K, V>> capturedDeleteCalls = new LinkedList<>(); public final KeyValueStore<K, V> innerKeyValueStore; public MockKeyValueStore(final String name, final KeyValueStore<K, V> innerKeyValueStore, final boolean persistent) { this.name = name; this.innerKeyValueStore = innerKeyValueStore; this.persistent = persistent; } @Override public String name() { return name; } @Override public void init(final ProcessorContext context, final StateStore root) { context.register(root, stateRestoreCallback); initialized = true; closed = false; } @Override public void flush() { instanceLastFlushCount.set(GLOBAL_FLUSH_COUNTER.getAndIncrement()); innerKeyValueStore.flush(); flushed = true; } public int getLastFlushCount() { return instanceLastFlushCount.get(); } @Override public void close() { innerKeyValueStore.close(); closed = true; } @Override public boolean persistent() { return persistent; } @Override public boolean isOpen() { return !closed; } public final StateRestoreCallback stateRestoreCallback = new StateRestoreCallback() { @Override public void restore(final byte[] key, final byte[] value) { } }; @Override public void put(final K key, final V value) { capturedPutCalls.add(new KeyValue<>(key, value)); innerKeyValueStore.put(key, value); } @Override public V putIfAbsent(final K key, final V value) { final V originalValue = get(key); if (originalValue == null) { put(key, value); capturedPutCalls.add(new KeyValue<>(key, value)); } return originalValue; } @Override public V delete(final K key) { V value = innerKeyValueStore.delete(key); capturedDeleteCalls.add(new KeyValue<>(key, value)); return value; } @Override public void putAll(final List<KeyValue<K, V>> entries) { for (final KeyValue<K, V> entry : entries) { put(entry.key, entry.value); capturedPutCalls.add(entry); } } @Override public V get(final K key) { V value = innerKeyValueStore.get(key); capturedGetCalls.add(new KeyValue<>(key, value)); return value; } @Override public KeyValueIterator<K,V> range(final K from, final K to) { return innerKeyValueStore.range(from, to); } @Override public KeyValueIterator<K,V> all() { return innerKeyValueStore.all(); } @Override public long approximateNumEntries() { return innerKeyValueStore.approximateNumEntries(); } }
Proposed Changes
Compatibility, Deprecation, and Migration Plan
Until we are refactoring with these new MockStateStores, there shouldn't be any compatibility issues. But the next phase should be refactoring, including:
1) Remove and refactor redundant MockStores (i.e. org.apache.kafka.test.MockKeyValueStore)
2) Examine the current tests (i.e. org.apache.kafka.streams.TopologyTest), remove complicate testing code logics and refactor with the new MockStateStores.
Rejected Alternatives
1) Rebuilding a MockStateStores vs extending an InMemoryStore
- If we are rebuilding the functionality of a store from scratch in memory, it will basically act like an InMemoryStore.
2) Track all calls in a total order.
- Now I haven't got a feature request to track calls within a total order, so now we track the order separately. If there is a need, we can definitely adjust it.
3)Using the MockStateStoreFactory as a main entry point to access stores. (Keeping the state of all StoreBuilder)
- Now the MockStateStoreFactory only provides build() functionality, but not access(i.e. getStore(String storeName)) to StoreBuilders and Stores. Developers have to access these Stores separately. If there is a request to allow access from the factory, we can adjust that, create a mapping to store all the StoreBuilders.