...
Code Block |
---|
public class TopologyTest { private final StoreBuilder storeBuilder = EasyMock.createNiceMock(StoreBuilder.class); private final Topology topology = new Topology(); private final MockStoreFactory mockStoreFactory = new MockStoreFactory<>(); private final KeyValueStoreBuilder keyValueStoreBuilder = mockStoreFactory.createKeyValueStoreBuilder( "store", Stores.inMemoryKeyValueStore("store"), Serdes.Bytes(), Serdes.Bytes(), false, Time.System); ... @Test(expected = TopologyException.class) public void shouldNotAllowToAddStateStoreToNonExistingProcessor() { topology.addStateStore(keyValueStoreBuilder, "no-such-processor"); } } |
...
We add some new classes to the kafka-streams-a state package (org.apache.kafka.streams.state) under streams/test-utils .
We will provide a MockStoreFactory to generate mock store builders, I will use the KeyValueStoreBuilder as an example. Window and Session will have a similar structure.
...
Code Block |
---|
package org.apache.kafka.streams.internalsstate; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.streamsutils.MockTimeTime; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplierinternals.KeyValueStoreBuilder; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.internals.AbstractStoreBuilder; public class MockKeyValueStoreBuilder<K public class MockKeyValueStoreBuilder<K, V> extends AbstractStoreBuilder<KKeyValueStoreBuilder<K, V,V> KeyValueStore> { private final boolean persistent; private final KeyValueBytesStoreSupplier storeSupplier; public MockKeyValueStoreBuilder(final KeyValueBytesStoreSupplierSerde<K> storeSupplier,keySerde; final Serde<V> valueSerde; final Time time; public MockKeyValueStoreBuilder(final KeyValueBytesStoreSupplier storeSupplier, final Serde<K> keySerde, final Serde<V> valueSerde, final boolean persistent) {, super(storeSupplier.name(), keySerde, valueSerde, new MockTime(0)); this.persistent = persistent; this.storeSupplier = storeSupplier; } final Time time) { @Override public KeyValueStore build() { super(storeSupplier, keySerde, valueSerde, time); this.persistent = persistent; return new MockKeyValueStore<>(name, storeSupplier.get(), persistent)this.storeSupplier = storeSupplier; } } |
Then in the store, we will build a wrapper around the provided backend store.
this.keySerde = keySerde;
this.valueSerde = valueSerde;
this.time = time;
}
@Override
public KeyValueStore<K, V> build() {
return new MockKeyValueStore<>(storeSupplier, keySerde, valueSerde, persistent, time);
}
} |
Then in the store, we will build a wrapper around the provided backend store. We will capture each get/put/delete call, the user can write tests accordingly. We will also track if the store has been flushed or closed.
Code Block |
---|
package org.apache.kafka.streams.state;
public class MockKeyValueStore<K, V>
extends WrappedStateStore<KeyValueStore<Bytes, byte[]>, K, V>
implements KeyValueStore<K, V> {
// keep a global counter of flushes and a local reference to which store had which
// flush, so we can reason about the order in which stores get flushed.
private static final AtomicInteger GLOBAL_FLUSH_COUNTER = new AtomicInteger(0);
private final AtomicInteger instanceLastFlushCount = new AtomicInteger(-1);
public boolean initialized = false;
public boolean flushed = false;
public boolean closed = true |
Code Block |
package org.apache.kafka.streams.internals; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import java.util.LinkedList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; public class MockKeyValueStore<K, V> implements KeyValueStore<K, V> { // keep a global counter of flushes and a local reference to which store had which // flush, so we can reason about the order in which stores get flushed. private static final AtomicInteger GLOBAL_FLUSH_COUNTER = new AtomicInteger(0); private final AtomicInteger instanceLastFlushCount = new AtomicInteger(-1); private final String name; private final boolean persistent; public boolean initialized = false; public boolean flushed = false; public boolean closed = true; public final List<KeyValue<K, V>> capturedPutCalls = new LinkedList<>(); public final List<KeyValue<K, V>> capturedGetCalls = new LinkedList<>(); public final List<KeyValue<K, V>> capturedDeleteCalls = new LinkedList<>(); public final KeyValueStore<K, V> innerKeyValueStore; public MockKeyValueStore(final String name,; public boolean persistent; protected final Time time; final Serde<K> keySerde; final Serde<V> valueSerde; final KeyValueStore<KStateSerdes<K, V> innerKeyValueStore,serdes; public final List<KeyValue<K, V>> capturedPutCalls = new LinkedList<>(); public final List<KeyValue<K, V>> capturedGetCalls = new LinkedList<>(); public final final boolean persistent) { List<KeyValue<K, V>> capturedDeleteCalls = new LinkedList<>(); public MockKeyValueStore(final KeyValueBytesStoreSupplier keyValueBytesStoreSupplier, this.name = name; this.innerKeyValueStore = innerKeyValueStore; this.persistent = persistent; } final Serde<K> @OverridekeySerde, public String name() { return name; } @Override public void init(final ProcessorContextSerde<V> contextvalueSerde, final StateStore root) { final context.register(root, stateRestoreCallback); boolean persistent, initialized = true; closed = false; } @Override final public void flush(Time time) { instanceLastFlushCount.set(GLOBAL_FLUSH_COUNTER.getAndIncrementsuper(keyValueBytesStoreSupplier.get()); innerKeyValueStore.flushthis.name = keyValueBytesStoreSupplier.name(); flushedthis.time = true; time != null ? } time : Time.SYSTEM; public int getLastFlushCount() { this.persistent = persistent; return instanceLastFlushCount.get() this.keySerde = keySerde; this.valueSerde = valueSerde; } @Override @SuppressWarnings("unchecked") public void closeinitStoreSerde(final ProcessorContext context) { innerKeyValueStore.close();serdes = new StateSerdes<>( closed = true; } @OverrideProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), public boolean persistent() { return persistent; keySerde == null } @Override? (Serde<K>) context.keySerde() : keySerde, public boolean isOpen() { return !closed; valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); } public@Override final StateRestoreCallback stateRestoreCallback =public newString StateRestoreCallbackname() { @Overridereturn name; } @Override public void restoreinit(final byte[]ProcessorContext keycontext, final StateStore root) { final byte[] value) { context.register(root, stateRestoreCallback); initialized = true; }closed = false; }; @Override public void put(final K key, final V valueflush() { capturedPutCallsinstanceLastFlushCount.add(new KeyValue<>(key, valueset(GLOBAL_FLUSH_COUNTER.getAndIncrement()); innerKeyValueStorewrapped().putflush(key, value); } flushed = @Overridetrue; public} V putIfAbsent(final K key,public final V valueint getLastFlushCount() { final V originalValue = return instanceLastFlushCount.get(key); } @Override if (originalValuepublic ==void nullclose() { put(key, valuewrapped().close(); closed = true; capturedPutCalls.add(new KeyValue<>(key, value)); } @Override }public boolean persistent() { return originalValuepersistent; } @Override public Vboolean deleteisOpen(final K key) { V value = innerKeyValueStore.delete(key)return !closed; } capturedDeleteCalls.add(new KeyValue<>(key, value)); return value; public final StateRestoreCallback stateRestoreCallback = new StateRestoreCallback() { }@Override @Override public void putAllrestore(final List<KeyValue<Kbyte[] key, V>> entries) { for (final KeyValue<K, V> entry : entries) { final put(entry.key, entry.value); capturedPutCalls.add(entry);byte[] value) { } }; @Override public Vvoid getput(final K key), { final V value = innerKeyValueStore.get(key);) { capturedGetCallscapturedPutCalls.add(new KeyValue<>(key, value)); return value;wrapped().put(keyBytes(key), serdes.rawValue(value)); } @Override public KeyValueIterator<K,V>V rangeputIfAbsent(final K fromkey, final KV tovalue) { return innerKeyValueStore.range(from, tofinal V originalValue = get(key); } @Override if (originalValue == null) public KeyValueIterator<K,V> all() { { return innerKeyValueStore.all(put(key, value); } @Override capturedPutCalls.add(new KeyValue<>(key, value)); public long approximateNumEntries() {} return innerKeyValueStore.approximateNumEntries()originalValue; } } |
Proposed Changes
@Override
public V delete(final K key) {
V value = outerValue(wrapped().delete(keyBytes(key)));
capturedDeleteCalls.add(new KeyValue<>(key, value));
return value;
}
@Override
public void putAll(final List<KeyValue<K, V>> entries) {
for (final KeyValue<K, V> entry : entries) {
put(entry.key, entry.value);
capturedPutCalls.add(entry);
}
}
@Override
public V get(final K key) {
V value = outerValue(wrapped().get(keyBytes(key)));
capturedGetCalls.add(new KeyValue<>(key, value));
return value;
}
@SuppressWarnings("unchecked")
@Override
public KeyValueIterator<K,V> range(final K from, final K to) {
return new MockKeyValueStore.MockKeyValueIterator(
wrapped().range(Bytes.wrap(serdes.rawKey(from)), Bytes.wrap(serdes.rawKey(to))));
}
@SuppressWarnings("unchecked")
@Override
public KeyValueIterator<K,V> all() {
return new MockKeyValueStore.MockKeyValueIterator(wrapped().all());
}
@Override
public long approximateNumEntries() {
return wrapped().approximateNumEntries();
}
private V outerValue(final byte[] value) {
return value != null ? serdes.valueFrom(value) : null;
}
private Bytes keyBytes(final K key) {
return Bytes.wrap(serdes.rawKey(key));
}
private class MockKeyValueIterator implements KeyValueIterator<K, V> {
private final KeyValueIterator<Bytes, byte[]> iter;
....
}
} |
Proposed Changes
I proposed to add:
- A MockStoreFactory class to produce mock state store builders.
- A mock StateStoreBuilder class for KV, Session and Window.
- A mock StateStore class for KV, Session and Window with tracking.
Compatibility, Deprecation, and Migration Compatibility, Deprecation, and Migration Plan
Until we are refactoring with these new MockStateStores, there shouldn't be any compatibility issues. But the next phase should be refactoring, including:
...
2) Examine the current tests (i.e. org.apache.kafka.streams.TopologyTest ), remove complicate and refactor the testing code logics and refactor with the new MockStateStores.
Rejected Alternatives
1) Rebuilding a MockStateStores vs extending an InMemoryStore
- If we are rebuilding the functionality of a store from scratch in memory, it will basically act like an InMemoryStore.
2) Track all calls in a total order.
- Now I haven't got a feature request to track calls within a total order, so now we track the order separately. If there is a need, we can definitely adjust it.
3)Using the MockStateStoreFactory as a main entry point to access stores. (Keeping the state of all StoreBuilder)
...
A discussion that has been brought up is
Jira | ||||||
---|---|---|---|---|---|---|
|
Rejected Alternatives
- Using the current EasyMock implementation. There is no strong argument against the current EasyMock implementation, it is easy to use and lightweight. The main argument for this KIP is to write better tests with an in-house mock state store support.