...
After the improvements, we will replace the easyMock StoreBuilder without mock StoreBuilderwith our mockStoreBuilder.
Code Block |
---|
public class TopologyTest { private final StoreBuilder storeBuilder = EasyMock.createNiceMock(StoreBuilder.class); private final Topology topology = new Topology(); private final MockStoreFactory mockStoreFactory = new MockStoreFactory<>(); private final KeyValueStoreBuilder keyValueStoreBuilder = mockStoreFactory.createKeyValueStoreBuilder( "store", Stores.inMemoryKeyValueStore("store"), Serdes.Bytes(), Serdes.Bytes(), false, Time.System); ... @Test(expected = TopologyException.class) public void shouldNotAllowToAddStateStoreToNonExistingProcessor() { topology.addStateStore(keyValueStoreBuilder, "no-such-processor"); } } |
...
We add some new classes to the kafka-streams-test-utils
under a new a state package (org.apache.kafka.streams.state packagestate) under streams/test-utils.
We will provide a MockStoreFactory to generate Mock store builders:mock store builders, I will use the KeyValueStoreBuilder as an example. Window and Session will have a similar structure.
The developers/users can provide their own store as the backend storage, and their own Serde of choice. For example, for simple testing, they can just use an InMemoryKeyValueStore.
Code Block |
---|
Code Block |
package org.apache.kafka.streams.internals; public class MockStoreFactory<K, V> { public final Map<String, StoreBuilder> stateStores = new LinkedHashMap<>(); public MockStoreFactory () { } public KeyValueStoreBuilder createKeyValueStoreBuilder(KeyValueBytesStoreSupplier keyValueBytesStoreSupplier, final Serde<K> keySerde, final Serde<V> valueSerde, boolean persistent){ String storeName = keyValueBytesStoreSupplier.name(); stateStores.put(storeName, new MockKeyValueStoreBuilder<>(storeName, keyValueBytesStoreSupplier, keySerde, valueSerde, persistent)); return (KeyValueStoreBuilder)stateStores.get(storeName); } public WindowStoreBuilder createWindowStoreBuilder(KeyValueBytesStoreSupplier keyValueBytesStoreSupplier, final Serde<K> keySerde, final Serde<V> valueSerde, final Time time){ ... } public SessionStoreBuilder createSessionStoreBuilder(KeyValueBytesStoreSupplier keyValueBytesStoreSupplier, final Serde<K> keySerde, final Serde<V> valueSerde, final Time time){ ... } public StoreBuilder getStore(String storeName) { return stateStores.get(storeName); } } |
I will use the KeyValueStoreBuilder as an example. Window and Session will have a similar structure.
Each Store builder will have a Each Store builder will have a build method:
Code Block |
---|
package org.apache.kafka.streams.internalsstate; public class MockKeyValueStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, StateStore> { final Boolean persistent; public MockKeyValueStoreBuilder(final String storeName,import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder; public class MockKeyValueStoreBuilder<K, V> extends KeyValueStoreBuilder<K, V> { private final boolean persistent; private final KeyValueBytesStoreSupplier storeSupplier; final Serde<K> keySerde; final Serde<V> valueSerde; final Time time; public MockKeyValueStoreBuilder(final Serde<K>KeyValueBytesStoreSupplier keySerdestoreSupplier, final Serde<V>Serde<K> valueSerdekeySerde, final Serde<V> TimevalueSerde, time) { super(storeName, keySerde, valueSerde, time); this.persistent = persistent; } @Override public KeyValueStorefinal build() {boolean persistent, return new MockKeyValueStore(name); } } |
Then in the Store, we will build a wrapper around a InMemoryStore, and capture all the get/put calls (excluding Iterators)
Code Block |
---|
public class MockKeyValueStore extends InMemoryKeyValueStore { private final boolean persistent; public boolean initialized = false; final Time publictime) boolean{ flushed = false; public final List<KeyValue> capturedPutCalls = new LinkedList<>(super(storeSupplier, keySerde, valueSerde, time); public final List<KeyValue> capturedGetCallsthis.persistent = new LinkedList<>()persistent; public final List<KeyValue> capturedDeleteCallsthis.storeSupplier = new LinkedList<>()storeSupplier; public MockKeyValueStore(final String name, this.keySerde = keySerde; this.valueSerde final boolean persistent) { super(name)= valueSerde; this.persistenttime = persistenttime; } @Override public KeyValueStore<K, voidV> flushbuild() { flushedreturn = true; super.flush(new MockKeyValueStore<>(storeSupplier, keySerde, valueSerde, persistent, time); } @Override public boolean persistent() { return persistent; } @Override public void put(final Bytes key, final byte[] value) { super.put(key, value); capturedPutCalls.add(new KeyValue(key, value)); } @Override public byte[] get(final Bytes key)} |
Then in the store, we will build a wrapper around the provided backend store. We will capture each get/put/delete call, the user can write tests accordingly. We will also track if the store has been flushed or closed.
Code Block |
---|
package org.apache.kafka.streams.state; public class MockKeyValueStore<K, V> extends WrappedStateStore<KeyValueStore<Bytes, byte[]>, K, V> implements KeyValueStore<K, V> { // keep a byte[] value = super.get(key); capturedGetCalls.add(new KeyValue(key, value)); return value; } @Override public byte[] delete(final Bytes key) { byte[] value = super.delete(key); capturedDeleteCalls.add(new KeyValue(key, value))global counter of flushes and a local reference to which store had which // flush, so we can reason about the order in which stores get flushed. private static final AtomicInteger GLOBAL_FLUSH_COUNTER = new AtomicInteger(0); private final AtomicInteger instanceLastFlushCount = new AtomicInteger(-1); public boolean initialized = false; public boolean flushed = return valuefalse; public boolean closed = }true; public String @Overridename; public byte[] putIfAbsent(final Bytes key,boolean persistent; protected final byte[] value) { Time time; final byte[] originalValue = get(key)Serde<K> keySerde; final Serde<V> valueSerde; if (originalValue == null) {StateSerdes<K, V> serdes; public final List<KeyValue<K, V>> capturedPutCalls = new put(key, valueLinkedList<>(); public final List<KeyValue<K, V>> capturedGetCalls capturedPutCalls.add(= new KeyValueLinkedList<>(key, value)); public final List<KeyValue<K, V>> capturedDeleteCalls } = new LinkedList<>(); public MockKeyValueStore(final returnKeyValueBytesStoreSupplier originalValue;keyValueBytesStoreSupplier, } @Override public void putAll(final List<KeyValue<Bytes, byte[]>> entries) { for (final Serde<K> KeyValue<ByteskeySerde, byte[]> entry : entries) { put(entry.key, entry.value); final capturedPutCalls.add(entry); Serde<V> valueSerde, } } } |
...
final boolean persistent,
final Time time) {
super(keyValueBytesStoreSupplier.get());
this.name = keyValueBytesStoreSupplier.name();
this.time = time != null ? time : Time.SYSTEM;
this.persistent = persistent;
this.keySerde = keySerde;
this.valueSerde = valueSerde;
}
@SuppressWarnings("unchecked")
void initStoreSerde(final ProcessorContext context) {
serdes = new StateSerdes<>(
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
}
@Override
public String name() {
return name;
}
@Override
public void init(final ProcessorContext context,
final StateStore root) {
context.register(root, stateRestoreCallback);
initialized = true;
closed = false;
}
@Override
public void flush() {
instanceLastFlushCount.set(GLOBAL_FLUSH_COUNTER.getAndIncrement());
wrapped().flush();
flushed = true;
}
public int getLastFlushCount() {
return instanceLastFlushCount.get();
}
@Override
public void close() {
wrapped().close();
closed = true;
}
@Override
public boolean persistent() {
return persistent;
}
@Override
public boolean isOpen() {
return !closed;
}
public final StateRestoreCallback stateRestoreCallback = new StateRestoreCallback() {
@Override
public void restore(final byte[] key,
final byte[] value) {
}
};
@Override
public void put(final K key, final V value) {
capturedPutCalls.add(new KeyValue<>(key, value));
wrapped().put(keyBytes(key), serdes.rawValue(value));
}
@Override
public V putIfAbsent(final K key, final V value) {
final V originalValue = get(key);
if (originalValue == null) {
put(key, value);
capturedPutCalls.add(new KeyValue<>(key, value));
}
return originalValue;
}
@Override
public V delete(final K key) {
V value = outerValue(wrapped().delete(keyBytes(key)));
capturedDeleteCalls.add(new KeyValue<>(key, value));
return value;
}
@Override
public void putAll(final List<KeyValue<K, V>> entries) {
for (final KeyValue<K, V> entry : entries) {
put(entry.key, entry.value);
capturedPutCalls.add(entry);
}
}
@Override
public V get(final K key) {
V value = outerValue(wrapped().get(keyBytes(key)));
capturedGetCalls.add(new KeyValue<>(key, value));
return value;
}
@SuppressWarnings("unchecked")
@Override
public KeyValueIterator<K,V> range(final K from, final K to) {
return new MockKeyValueStore.MockKeyValueIterator(
wrapped().range(Bytes.wrap(serdes.rawKey(from)), Bytes.wrap(serdes.rawKey(to))));
}
@SuppressWarnings("unchecked")
@Override
public KeyValueIterator<K,V> all() {
return new MockKeyValueStore.MockKeyValueIterator(wrapped().all());
}
@Override
public long approximateNumEntries() {
return wrapped().approximateNumEntries();
}
private V outerValue(final byte[] value) {
return value != null ? serdes.valueFrom(value) : null;
}
private Bytes keyBytes(final K key) {
return Bytes.wrap(serdes.rawKey(key));
}
private class MockKeyValueIterator implements KeyValueIterator<K, V> {
private final KeyValueIterator<Bytes, byte[]> iter;
....
}
} |
Proposed Changes
I proposed to add:
- A MockStoreFactory class to produce mock state store builders.
- A mock StateStoreBuilder class for KV, Session and Window.
- A mock StateStore class for KV, Session and Window with tracking.
Compatibility, Deprecation, and Migration Plan
...
2) Examine the current tests (i.e. org.apache.kafka.streams.TopologyTest), remove complicate testing code logics and refactor with the new MockStateStores.
Rejected Alternatives
1) Rebuilding a MockStateStores vs extending an InMemoryStore
- If we are rebuilding the functionality of a store from scratch in memory, it will basically act like an InMemoryStore.
2) Track all calls in a total order.
- Now I haven't got a feature request to track calls within a total order, so now we track the order separately. If there is a need, we can definitely adjust it.
3)Using the MockStateStoreFactory as a main entry point to access stores. (Keeping the state of all StoreBuilder)
...
.streams.TopologyTest and refactor the testing code logics with the new MockStateStores.
A discussion that has been brought up is
Jira | ||||||
---|---|---|---|---|---|---|
|
Rejected Alternatives
- Using the current EasyMock implementation. There is no strong argument against the current EasyMock implementation, it is easy to use and lightweight. The main argument for this KIP is to write better tests with an in-house mock state store support.