...
For example, in the current streams/TopologyTest.java:
Code Block | ||
---|---|---|
| ||
public class TopologyTest ...{ @Test private final publicStoreBuilder storeBuilder void= shouldFailIfSinkIsParent() { EasyMock.createNiceMock(StoreBuilder.class); private final KeyValueStoreBuilder globalStoreBuilder = topologyEasyMock.addSource("source", "topic-1"createNiceMock(KeyValueStoreBuilder.class); private final Topology topology.addSink("sink-1", "topic-2", "source"); topology = new Topology(); ... @Test public void tryshouldFailIfSinkIsParent() { topology.addSinkaddSource("sink-2source", "topic-3", "sink-1"); fail("Should throw TopologyException for using sink as parenttopology.addSink("sink-1", "topic-2", "source"); } catch (final TopologyException expected) { } try { } @Test(expected = TopologyException.class) topology.addSink("sink-2", "topic-3", "sink-1"); public void shouldNotAllowToAddStateStoreToNonExistingProcessor() { fail("Should throw TopologyException for using sink as mockStoreBuilder(parent"); } catch EasyMock.replay(storeBuilder);(final TopologyException expected) { } } topology.addStateStore(storeBuilder, "no-such-processor"); @Test(expected = TopologyException.class) } @Test public void shouldNotAllowToAddStateStoreToSourceshouldNotAllowToAddStateStoreToNonExistingProcessor() { mockStoreBuilder(); EasyMock.replay(storeBuilder); topology.addSource("source-1"addStateStore(storeBuilder, "topicno-such-1processor"); } try@Test { public void shouldNotAllowToAddStateStoreToSource() { topology.addStateStore(storeBuilder, "source-1" mockStoreBuilder(); fail("Should have thrown TopologyException for adding store to source nodeEasyMock.replay(storeBuilder); topology.addSource("source-1", "topic-1"); }try catch{ (final TopologyException expected) { } } private void mockStoreBuilder() { topology.addStateStore(storeBuilder, "source-1"); EasyMock.expect(storeBuilder.name()).andReturn("store").anyTimes(fail("Should have thrown TopologyException for adding store to source node"); EasyMock.expect(storeBuilder.logConfig} catch (final TopologyException expected) { } } private void mockStoreBuilder() { EasyMock.expect(storeBuilder.name()).andReturn(Collections.emptyMap()"store").anyTimes(); EasyMock.expect(storeBuilder.loggingEnabledlogConfig()).andReturn(falseCollections.emptyMap()); } EasyMock.expect(storeBuilder.loggingEnabled()).andReturn(false); } } |
One of the goal is to replace the in-test vanilla mockStoreBuilder with a more general purpose mockStoreBuilder class, which simplify writing unit test and can be reuse later.
Public Interfaces
We add some new classes to the kafka-streams-test-utils
under a new org.apache.kafka.streams.state package.
We will provide a MockStoreFactory to generate Mock store builders:
After the improvements, we will replace the easyMock StoreBuilder with our mockStoreBuilder.
Code Block |
---|
public class TopologyTest |
Code Block |
package org.apache.kafka.streams.internals; public class MockStoreFactory<K, V> { private final StoreBuilder storeBuilder String storeName= EasyMock.createNiceMock(StoreBuilder.class); private final Serde<K>Topology keySerde; topology = final Serde<V> valueSerdenew Topology(); private final TimeMockStoreFactory time; mockStoreFactory = final Boolean persistent; new MockStoreFactory<>(); public MockStoreFactoryprivate (final StringKeyValueStoreBuilder storeName, keyValueStoreBuilder = mockStoreFactory.createKeyValueStoreBuilder( Stores.inMemoryKeyValueStore("store"), final Serde<K> keySerdeSerdes.Bytes(), Serdes.Bytes(), final Serde<V> valueSerde,false, Time.System); ... @Test(expected = TopologyException.class) public void shouldNotAllowToAddStateStoreToNonExistingProcessor() { final Time time) { this.storeName = storeNametopology.addStateStore(keyValueStoreBuilder, "no-such-processor"); this.keySerde = keySerde; this.valueSerde = valueSerde} } |
Public Interfaces
We add some new classes to a state package (org.apache.kafka.streams.state) under streams/test-utils.
We will provide a MockStoreFactory to generate mock store builders, I will use the KeyValueStoreBuilder as an example. Window and Session will have a similar structure.
The developers/users can provide their own store as the backend storage, and their own Serde of choice. For example, for simple testing, they can just use an InMemoryKeyValueStore.
Code Block |
---|
package org.apache.kafka.streams.internals;
public class MockStoreFactory<K, V> {
public final Map<String, StoreBuilder> stateStores = new LinkedHashMap<>();
public MockStoreFactory () {
}
public KeyValueStoreBuilder createKeyValueStoreBuilder(KeyValueBytesStoreSupplier keyValueBytesStoreSupplier,
final Serde<K> keySerde,
final Serde<V> valueSerde,
boolean persistent){
String storeName = keyValueBytesStoreSupplier.name();
stateStores.put(storeName, new MockKeyValueStoreBuilder<>(keyValueBytesStoreSupplier, keySerde, valueSerde, persistent));
return (KeyValueStoreBuilder)stateStores.get(storeName);
}
public WindowStoreBuilder createWindowStoreBuilder(KeyValueBytesStoreSupplier keyValueBytesStoreSupplier,
final Serde<K> keySerde,
final Serde<V> valueSerde,
final Time time){
...
}
public SessionStoreBuilder createSessionStoreBuilder(KeyValueBytesStoreSupplier keyValueBytesStoreSupplier,
final Serde<K> keySerde,
final Serde<V> valueSerde,
final Time time){
...
}
public StoreBuilder getStore(String storeName) {
return stateStores.get(storeName);
}
} |
Each Store builder will have a build method:
Code Block |
---|
package org.apache.kafka.streams.state;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
public class MockKeyValueStoreBuilder<K, V> extends KeyValueStoreBuilder<K, V> {
private final boolean persistent;
private final KeyValueBytesStoreSupplier storeSupplier;
final Serde<K> keySerde;
final Serde<V> valueSerde;
final Time time;
public MockKeyValueStoreBuilder(final KeyValueBytesStoreSupplier storeSupplier,
final Serde<K> keySerde,
final Serde<V> valueSerde,
final boolean persistent,
final Time time) {
super(storeSupplier, keySerde, valueSerde, time);
this.persistent = persistent;
this.storeSupplier = storeSupplier;
this.keySerde = keySerde;
this.valueSerde = valueSerde;
this.time = time;
}
@Override
public KeyValueStore<K, V> build() {
return new MockKeyValueStore<>(storeSupplier, keySerde, valueSerde, persistent, time);
}
} |
Then in the store, we will build a wrapper around the provided backend store. We will capture each get/put/delete call, the user can write tests accordingly. We will also track if the store has been flushed or closed.
Code Block |
---|
package org.apache.kafka.streams.state; public class MockKeyValueStore<K, V> extends WrappedStateStore<KeyValueStore<Bytes, byte[]>, K, V> implements KeyValueStore<K, V> { // keep a global counter of flushes and a local reference to which store had which // flush, so we can reason about the order in which stores get flushed. private static final AtomicInteger GLOBAL_FLUSH_COUNTER = new AtomicInteger(0); private final AtomicInteger instanceLastFlushCount = new AtomicInteger(-1); public boolean initialized = false; public boolean flushed = false; public boolean closed = true; public String name; public boolean persistent; protected final Time time; final Serde<K> keySerde; final Serde<V> valueSerde; StateSerdes<K, V> serdes; public final List<KeyValue<K, V>> capturedPutCalls = new LinkedList<>(); public final List<KeyValue<K, V>> capturedGetCalls = new LinkedList<>(); public final List<KeyValue<K, V>> capturedDeleteCalls = new LinkedList<>(); public MockKeyValueStore(final KeyValueBytesStoreSupplier keyValueBytesStoreSupplier, final Serde<K> keySerde, final Serde<V> valueSerde, final boolean persistent, final Time time) { super(keyValueBytesStoreSupplier.get()); this.name = keyValueBytesStoreSupplier.name(); this.time = time != null ? time : Time.SYSTEM; this.persistent = persistent; this.keySerde = keySerde; this.valueSerde = valueSerde; } @SuppressWarnings("unchecked") void initStoreSerde(final ProcessorContext context) { serdes = new StateSerdes<>( ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), keySerde == null ? (Serde<K>) context.keySerde() : keySerde, valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); } @Override public String name() { return name; } @Override public void init(final ProcessorContext context, final StateStore root) { context.register(root, stateRestoreCallback); initialized = true; this.timeclosed = timefalse; } @Override public MockKeyValueStoreBuildervoid createKeyValueStoreBuilderflush() { instanceLastFlushCount.set(GLOBAL_FLUSH_COUNTER.getAndIncrement()); return new MockKeyValueStoreBuilder<K,V>(storeName, keySerde, valueSerde, time) wrapped().flush(); flushed = true; } } |
I will use the KeyValueStoreBuilder as an example. Window and Session will have a similar structure.
Each Store builder will have a build method:
Code Block |
---|
package org.apache.kafka.streams.internals; public class MockKeyValueStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, StateStore> { final Boolean persistent; public int getLastFlushCount() { return instanceLastFlushCount.get(); } @Override public void MockKeyValueStoreBuilder(final String storeName,close() { wrapped().close(); closed = true; } @Override public boolean persistent() { return persistent; final Serde<K> keySerde,} @Override public boolean isOpen() { return !closed; } public final StateRestoreCallback stateRestoreCallback = final Serde<V> valueSerde,new StateRestoreCallback() { @Override public void restore(final byte[] key, final Time time) { super(storeName, keySerde, valueSerde, time); final byte[] value) { this.persistent = persistent;} }; @Override public void put(final K key, final public KeyValueStore build(V value) { return capturedPutCalls.add(new MockKeyValueStore(nameKeyValue<>(key, value)); } } |
Then in the Store, we will build a wrapper around a InMemoryStore, and capture all the get/put calls (excluding Iterators)
Code Block |
---|
public class MockKeyValueStore extends InMemoryKeyValueStore { wrapped().put(keyBytes(key), private final boolean persistent;serdes.rawValue(value)); } public@Override boolean initialized = false; public V putIfAbsent(final K publickey, booleanfinal flushedV =value) false; { public final List<KeyValue>V capturedPutCallsoriginalValue = new LinkedList<>get(key); public final List<KeyValue> capturedGetCalls = newif LinkedList<>(); originalValue == null) { public final List<KeyValue> capturedDeleteCalls = new LinkedList<>(); public MockKeyValueStoreput(final String name,key, value); capturedPutCalls.add(new KeyValue<>(key, value)); } final boolean persistent)return {originalValue; } super(name);@Override public V delete(final K this.persistent = persistent;key) { } @Override V value public void flush() {= outerValue(wrapped().delete(keyBytes(key))); flushed = true; capturedDeleteCalls.add(new KeyValue<>(key, value)); return super.flush()value; } @Override public booleanvoid persistentputAll()final { List<KeyValue<K, V>> return persistent; } entries) { @Override public voidfor put(final BytesKeyValue<K, key,V> finalentry byte[]: valueentries) { super. put(entry.key, entry.value); capturedPutCalls.add(new KeyValue(key, value));entry); } } @Override public byte[]V get(final BytesK key) { byte[]V value = superouterValue(wrapped().get(keyBytes(key))); capturedGetCalls.add(new KeyValueKeyValue<>(key, value)); return value; } @SuppressWarnings("unchecked") @Override public byte[] delete(final Bytes keyKeyValueIterator<K,V> range(final K from, final K to) { byte[]return valuenew = super.delete(key);MockKeyValueStore.MockKeyValueIterator( capturedDeleteCalls.add(new KeyValue(key, value)); return valuewrapped().range(Bytes.wrap(serdes.rawKey(from)), Bytes.wrap(serdes.rawKey(to)))); } @Override@SuppressWarnings("unchecked") public byte[] putIfAbsent(final Bytes key, final byte[] value@Override public KeyValueIterator<K,V> all() { finalreturn byte[] originalValue = get(keynew MockKeyValueStore.MockKeyValueIterator(wrapped().all()); } @Override if (originalValuepublic ==long nullapproximateNumEntries() { return put(key, valuewrapped().approximateNumEntries(); } private V capturedPutCalls.add(new KeyValue(key,outerValue(final byte[] value)); { } return value != null ? serdes.valueFrom(value) : return originalValue; } null; @Override} publicprivate voidBytes putAllkeyBytes(final List<KeyValue<Bytes, byte[]>> entriesK key) { forreturn (final KeyValue<Bytes, byte[]> entry : entries) {Bytes.wrap(serdes.rawKey(key)); } private class MockKeyValueIterator implements put(entry.key, entry.value);KeyValueIterator<K, V> { private final KeyValueIterator<Bytes, capturedPutCalls.add(entry);byte[]> iter; .... } } } |
...
}
} |
Proposed Changes
I proposed to add:
- A MockStoreFactory class to produce mock state store builders.
- A mock StateStoreBuilder class for KV, Session and Window.
- A mock StateStore class for KV, Session and Window with tracking.
Compatibility, Deprecation, and Migration Plan
...
2) Examine the current tests (i.e. org.apache.kafka.streams.TopologyTest ), remove complicate and refactor the testing code logics and refactor with the new MockStateStores.
Rejected Alternatives
1) Rebuilding a MockStateStores vs extending an InMemoryStore
- If we are rebuilding the functionality of a store from scratch in memory, it will basically act like an InMemoryStore.
2) Track all calls in a total order.
- Now I haven't got a feature request to track calls within a total order, so now we track the order separately. If there is a need, we can definitely adjust it.
3)Using the MockStateStoreFactory as a main entry point to access stores. (Keeping the state of all StoreBuilder)
...
A discussion that has been brought up is
Jira | ||||||
---|---|---|---|---|---|---|
|
Rejected Alternatives
- Using the current EasyMock implementation. There is no strong argument against the current EasyMock implementation, it is easy to use and lightweight. The main argument for this KIP is to write better tests with an in-house mock state store support.