...
Code Block |
---|
public class TopologyTest {
private final StoreBuilder storeBuilder = EasyMock.createNiceMock(StoreBuilder.class);
private final Topology topology = new Topology();
private final MockStoreFactory mockStoreFactory = new MockStoreFactory<>();
private final KeyValueStoreBuilder keyValueStoreBuilder = mockStoreFactory.createKeyValueStoreBuilder(
Stores.inMemoryKeyValueStore("store"),
Serdes.Bytes(),
Serdes.Bytes(),
false,
Time.System);
...
@Test(expected = TopologyException.class)
public void shouldNotAllowToAddStateStoreToNonExistingProcessor() {
topology.addStateStore(keyValueStoreBuilder, "no-such-processor");
}
} |
...
We add some new classes to the kafka-streams-a state package (org.apache.kafka.streams.state) under streams/test-utils .
We will provide a MockStoreFactory to generate mock store builders, I will use the KeyValueStoreBuilder as an example. Window and Session will have a similar structure.
...
Code Block |
---|
package org.apache.kafka.streams.internalsstate; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.MockTime; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.KeyValueStorecommon.utils.Time; import org.apache.kafka.streams.state.internals.AbstractStoreBuilderKeyValueStoreBuilder; public class MockKeyValueStoreBuilder<K, V> extends AbstractStoreBuilder<KKeyValueStoreBuilder<K, V,V> KeyValueStore> { private final boolean persistent; private final KeyValueBytesStoreSupplier storeSupplier; final Serde<K> keySerde; final Serde<V> valueSerde; final Time time; public MockKeyValueStoreBuilder(final KeyValueBytesStoreSupplier storeSupplier, final Serde<K> keySerde, final Serde<V> valueSerde, final boolean persistent), { super(storeSupplier.name(), keySerde, valueSerde, new MockTime(0)); this.persistent = persistent; this.storeSupplier = storeSupplier; final } Time time) { @Override public KeyValueStore buildsuper() { storeSupplier, keySerde, valueSerde, time); return new MockKeyValueStore<>(name, storeSupplier.get(), persistent) this.persistent = persistent; } } |
Then in the store, we will build a wrapper around the provided backend store. We will capture each get/put/delete call, the user can write tests accordingly. We will also track if the store has been flushed or closed.
this.storeSupplier = storeSupplier;
this.keySerde = keySerde;
this.valueSerde = valueSerde;
this.time = time;
}
@Override
public KeyValueStore<K, V> build() {
return new MockKeyValueStore<>(storeSupplier, keySerde, valueSerde, persistent, time);
}
} |
Then in the store, we will build a wrapper around the provided backend store. We will capture each get/put/delete call, the user can write tests accordingly. We will also track if the store has been flushed or closed.
Code Block |
---|
package org.apache.kafka.streams.state;
public class MockKeyValueStore<K, V>
extends WrappedStateStore<KeyValueStore<Bytes, byte[]>, K, V>
implements KeyValueStore<K, V> {
// keep a global counter of flushes and a local reference to which store had which
// flush, so we can reason about the order in which stores get flushed.
private static final AtomicInteger GLOBAL_FLUSH_COUNTER = new AtomicInteger(0);
private final AtomicInteger instanceLastFlushCount = new AtomicInteger(-1);
public boolean initialized = false;
public boolean flushed = false;
public boolean closed = true;
public String name;
public boolean persistent;
protected final Time time;
final Serde<K> keySerde;
final Serde<V> valueSerde;
StateSerdes<K, V> serdes;
public final List<KeyValue<K, V>> capturedPutCalls = new LinkedList<>();
public final List<KeyValue<K, V>> capturedGetCalls = new LinkedList<>();
public final List<KeyValue<K, V>> capturedDeleteCalls = new LinkedList<>();
public MockKeyValueStore(final KeyValueBytesStoreSupplier keyValueBytesStoreSupplier,
final Serde<K> keySerde,
final Serde<V> valueSerde,
final boolean persistent,
final Time time) {
super(keyValueBytesStoreSupplier.get());
this.name = keyValueBytesStoreSupplier.name();
this.time = time != null ? time : Time.SYSTEM;
this.persistent = persistent;
this.keySerde = keySerde;
this.valueSerde = valueSerde;
}
@SuppressWarnings("unchecked")
void initStoreSerde(final ProcessorContext context) {
serdes = new StateSerdes<>(
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
}
@Override
public String name() {
return name;
}
@Override
public void init(final ProcessorContext context |
Code Block |
package org.apache.kafka.streams.internals; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import java.util.LinkedList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; public class MockKeyValueStore<K, V> implements KeyValueStore<K, V> { // keep a global counter of flushes and a local reference to which store had which // flush, so we can reason about the order in which stores get flushed. private static final AtomicInteger GLOBAL_FLUSH_COUNTER = new AtomicInteger(0); private final AtomicInteger instanceLastFlushCount = new AtomicInteger(-1); private final String name; private final boolean persistent; public boolean initialized = false; public boolean flushed = false; public boolean closed = true; public final List<KeyValue<K, V>> capturedPutCalls = new LinkedList<>(); public final List<KeyValue<K, V>> capturedGetCalls = new LinkedList<>(); public final List<KeyValue<K, V>> capturedDeleteCalls = new LinkedList<>(); public final KeyValueStore<K, V> innerKeyValueStore; public MockKeyValueStore(final String name, final StateStore root) { final KeyValueStore<K, V> innerKeyValueStore,context.register(root, stateRestoreCallback); initialized = true; closed = false; } @Override finalpublic booleanvoid persistentflush() { this.name = nameinstanceLastFlushCount.set(GLOBAL_FLUSH_COUNTER.getAndIncrement()); this.innerKeyValueStore = innerKeyValueStorewrapped().flush(); this.persistentflushed = persistenttrue; } @Override public Stringint namegetLastFlushCount() { return name instanceLastFlushCount.get(); } @Override public void initclose(final ProcessorContext context,) { wrapped().close(); closed = true; final StateStore} root) { @Override public boolean context.register(root, stateRestoreCallback); initialized = true;persistent() { closedreturn = falsepersistent; } @Override public voidboolean flushisOpen() { instanceLastFlushCount.set(GLOBAL_FLUSH_COUNTER.getAndIncrement())return !closed; } innerKeyValueStore.flush();public final StateRestoreCallback stateRestoreCallback = new StateRestoreCallback() { flushed = true;@Override } public intvoid getLastFlushCount() { restore(final byte[] key, return instanceLastFlushCount.get(); } @Override public void close() { final byte[] innerKeyValueStore.close();value) { closed = true;} }; @Override public booleanvoid persistentput()final { K key, final V return persistent; } value) { @Override public boolean isOpen() {capturedPutCalls.add(new KeyValue<>(key, value)); return !closedwrapped().put(keyBytes(key), serdes.rawValue(value)); } @Override public V putIfAbsent(final StateRestoreCallbackK stateRestoreCallbackkey, =final newV StateRestoreCallback(value) { final V originalValue = @Overrideget(key); publicif void restore(final byte[] key,(originalValue == null) { put(key, value); capturedPutCalls.add(new KeyValue<>(key, value)); final byte[] value) {} return }originalValue; }; @Override public voidV putdelete(final K key,) final{ V value) { V value = capturedPutCalls.add(new KeyValue<>(key, valueouterValue(wrapped().delete(keyBytes(key))); innerKeyValueStore.putcapturedDeleteCalls.add(new KeyValue<>(key, value)); } return @Overridevalue; public V putIfAbsent(final K key, final V value) { } @Override public void putAll(final VList<KeyValue<K, originalValueV>> = get(key);entries) { iffor (originalValue == nullfinal KeyValue<K, V> entry : entries) { put(entry.key, entry.value); capturedPutCalls.add(new KeyValue<>(key, value))entry); } return originalValue; } @Override public V deleteget(final K key) { V value = innerKeyValueStore.deleteouterValue(wrapped().get(keyBytes(key))); capturedDeleteCallscapturedGetCalls.add(new KeyValue<>(key, value)); return value; } @Override@SuppressWarnings("unchecked") public void putAll(final List<KeyValue<K, V>> entries) {@Override public for KeyValueIterator<K,V> range(final K KeyValue<Kfrom, V>final entryK : entriesto) { return new MockKeyValueStore.MockKeyValueIterator( put(entry.key, entry.value); capturedPutCalls.add(entrywrapped().range(Bytes.wrap(serdes.rawKey(from)), Bytes.wrap(serdes.rawKey(to)))); } } @SuppressWarnings("unchecked") @Override public V get(final K keyKeyValueIterator<K,V> all() { V valuereturn = innerKeyValueStore.get(keynew MockKeyValueStore.MockKeyValueIterator(wrapped().all()); } capturedGetCalls.add(new KeyValue<>(key, value));@Override public long approximateNumEntries() { return valuewrapped().approximateNumEntries(); } @Override public KeyValueIterator<K,V> range(final K from, final K toprivate V outerValue(final byte[] value) { return innerKeyValueStore.range(from, to)value != null ? serdes.valueFrom(value) : null; } @Override private Bytes keyBytes(final public KeyValueIterator<K,V> all(K key) { return innerKeyValueStore.all()Bytes.wrap(serdes.rawKey(key)); } private class @Override MockKeyValueIterator implements KeyValueIterator<K, V> public{ long approximateNumEntries() { private final return innerKeyValueStore.approximateNumEntries();KeyValueIterator<Bytes, byte[]> iter; .... } } |
Proposed Changes
I proposed to add:
- a MockStoreFactory.java A MockStoreFactory class to produce mock state store builders.
- A mock StateStoreBuilder class for KV, Session and Window.
- A mock StateStore class for KV, Session and Window with tracking.
...
Until we are refactoring with these new MockStateStores, there shouldn't be any compatibility issues. But the next phase should be refactoring, including:
1) Remove and refactor redundant MockStores redundant MockStores (i.e. org.apache.kafka.test.MockKeyValueStore)
2) Examine the current tests (i.e. org.apache.kafka.test.MockKeyValueStore)2) Examine the current tests (i.e. org.apache.kafka.streams.TopologyTest and refactor the testing code logics with the new MockStateStores.streams.TopologyTest and refactor the testing code logics with the new MockStateStores.
A discussion that has been brought up is
Jira | ||||||
---|---|---|---|---|---|---|
|
Rejected Alternatives
- Using the current EasyMock implementation. There is no strong argument against the current EasyMock implementation, it is easy to use and lightweight. The main argument for this KIP is to write better tests with a an in-house mock state store support.
...