THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
package org.apache.kafka.streams.internals; public class MockKeyValueStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, StateStore> { public MockKeyValueStoreBuilder(final String storeName, final Serde<K> keySerde, final Serde<V> valueSerde, final Time time) { super(storeName, keySerde, valueSerde, time); } @Override public KeyValueStore build() { return new MockKeyValueStore(name); } } |
Then in the Store, we will be a wrapper around a InMemoryStore, and capture all the get/put calls (excluding Iterators)
Code Block |
---|
public class MockKeyValueStore extends InMemoryKeyValueStore {
private final boolean persistent;
public boolean initialized = false;
public boolean flushed = false;
public final List<KeyValue> capturedPutCalls = new LinkedList<>();
public final List<KeyValue> capturedGetCalls = new LinkedList<>();
public final List<KeyValue> capturedDeleteCalls = new LinkedList<>();
public MockKeyValueStore(final String name,
final boolean persistent) {
super(name);
this.persistent = persistent;
}
@Override
public void flush() {
flushed = true;
super.flush();
}
@Override
public boolean persistent() {
return persistent;
}
@Override
public void put(final Bytes key, final byte[] value) {
super.put(key, value);
capturedPutCalls.add(new KeyValue(key, value));
}
@Override
public byte[] get(final Bytes key) {
byte[] value = super.get(key);
capturedGetCalls.add(new KeyValue(key, value));
return value;
}
@Override
public byte[] delete(final Bytes key) {
byte[] value = super.delete(key);
capturedDeleteCalls.add(new KeyValue(key, value));
return value;
}
@Override
public byte[] putIfAbsent(final Bytes key, final byte[] value) {
final byte[] originalValue = get(key);
if (originalValue == null) {
put(key, value);
capturedPutCalls.add(new KeyValue(key, value));
}
return originalValue;
}
@Override
public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
for (final KeyValue<Bytes, byte[]> entry : entries) {
put(entry.key, entry.value);
capturedPutCalls.add(entry);
}
}
} |
Compatibility, Deprecation, and Migration Plan
...