...
Code Block |
---|
public class TopologyTest {
private final StoreBuilder storeBuilder = EasyMock.createNiceMock(StoreBuilder.class);
private final Topology topology = new Topology();
private final MockStoreFactory mockStoreFactory = new MockStoreFactory<>();
private final KeyValueStoreBuilder keyValueStoreBuilder = mockStoreFactory.createKeyValueStoreBuilder(
Stores.inMemoryKeyValueStore("store"),
Serdes.Bytes(),
Serdes.Bytes(),
false,
Time.System);
...
@Test(expected = TopologyException.class)
public void shouldNotAllowToAddStateStoreToNonExistingProcessor() {
topology.addStateStore(keyValueStoreBuilder, "no-such-processor");
}
} |
...
We add some new classes to the kafka-streams-a state package (org.apache.kafka.streams.state) under streams/test-utils under internal.
We will provide a MockStoreFactory to generate mock store builders, I will use the KeyValueStoreBuilder as an example. Window and Session will have a similar structure.
...
Code Block |
---|
package org.apache.kafka.streams.internalsstate; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.MockTime; import org.apache.kafka.streams.state.KeyValueBytesStoreSuppliercommon.utils.Time; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.internals.AbstractStoreBuilderKeyValueStoreBuilder; public class MockKeyValueStoreBuilder<K, V> extends AbstractStoreBuilder<KKeyValueStoreBuilder<K, V,V> KeyValueStore> { private final boolean persistent; private final KeyValueBytesStoreSupplier storeSupplier; final Serde<K> keySerde; final Serde<V> valueSerde; final Time time; public MockKeyValueStoreBuilder(final KeyValueBytesStoreSupplier storeSupplier, final Serde<K> keySerde, final Serde<V> valueSerde, final boolean persistent, final Time time) { super(storeSupplier, keySerde, valueSerde, time); this.persistent = persistent; finalthis.storeSupplier boolean persistent) {= storeSupplier; super(storeSupplier.name(), keySerde, valueSerde, new MockTime(0))this.keySerde = keySerde; this.persistentvalueSerde = persistentvalueSerde; this.storeSuppliertime = storeSuppliertime; } @Override public KeyValueStoreKeyValueStore<K, V> build() { return new MockKeyValueStore<>(namestoreSupplier, keySerde, storeSupplier.get()valueSerde, persistent, time); } } |
Then in the store, we will build a wrapper around the provided backend store. We will capture each get/put/delete call, the user can write tests accordingly. We will also track if the store has been flushed or closed.
...
Proposed Changes
I proposed to add:
- a MockStoreFactory A MockStoreFactory class to produce mock state store builders.
- A mock StateStoreBuilder class for KV, Session and Window.
- A mock StateStore class for KV, Session and Window with tracking.
...