THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
package org.apache.kafka.streams.internals; public class MockKeyValueStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, StateStore> { final Boolean persistent; public MockKeyValueStoreBuilder(final String storeName, import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.MockTime; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.internals.AbstractStoreBuilder; public class MockKeyValueStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, KeyValueStore> { private final boolean persistent; private final KeyValueBytesStoreSupplier storeSupplier; public MockKeyValueStoreBuilder(final String storeName, final KeyValueBytesStoreSupplier storeSupplier, final Serde<K> keySerde, final Serde<V> valueSerde, final boolean persistent) { super(storeName, keySerde, valueSerde, new MockTime(0)); this.persistent = persistent; this.storeSupplier = storeSupplier; } @Override public KeyValueStore build() { return new MockKeyValueStore<>(name, storeSupplier.get(), persistent); } } |
Then in the Store, we will build a wrapper around a InMemoryStore, and capture all the get/put calls (excluding Iterators)
Code Block |
---|
package org.apache.kafka.streams.internals; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import java.util.LinkedList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; public class MockKeyValueStore<K, V> implements KeyValueStore<K, V> { // keep a global counter of flushes and a local reference to which store had which // flush, so we can reason about the order in which stores get flushed. private static final AtomicInteger GLOBAL_FLUSH_COUNTER = new AtomicInteger(0); private final AtomicInteger instanceLastFlushCount = new AtomicInteger(-1); private final String name; private final boolean persistent; public boolean initialized = false; public boolean flushed = false; public boolean closed = true; public final List<KeyValue<K, V>> capturedPutCalls = new LinkedList<>(); public final List<KeyValue<K, V>> capturedGetCalls = new LinkedList<>(); public final List<KeyValue<K, V>> capturedDeleteCalls = new LinkedList<>(); public final KeyValueStore<K, V> innerKeyValueStore; public MockKeyValueStore(final String name, final KeyValueStore<K, V> innerKeyValueStore, final boolean persistent) { this.name = name; this.innerKeyValueStore = innerKeyValueStore; this.persistent = persistent; } @Override public String name() { return name; } @Override public void init(final ProcessorContext context, final StateStore root) { context.register(root, stateRestoreCallback); initialized = true; closed = false; final Serde<K> keySerde,} @Override public void flush() { instanceLastFlushCount.set(GLOBAL_FLUSH_COUNTER.getAndIncrement()); innerKeyValueStore.flush(); flushed final Serde<V> valueSerde,= true; } public int getLastFlushCount() { return instanceLastFlushCount.get(); } @Override public final Time timevoid close() { super(storeName, keySerde, valueSerde, timeinnerKeyValueStore.close(); this.persistentclosed = persistenttrue; } @Override public KeyValueStoreboolean buildpersistent() { return new MockKeyValueStore(name)persistent; } } |
Then in the Store, we will build a wrapper around a InMemoryStore, and capture all the get/put calls (excluding Iterators)
Code Block |
---|
public class MockKeyValueStore extends InMemoryKeyValueStore { @Override public boolean isOpen() { private final booleanreturn persistent;!closed; } public final booleanStateRestoreCallback initializedstateRestoreCallback = false; new StateRestoreCallback() { public boolean flushed = false; @Override public final List<KeyValue> capturedPutCalls =public newvoid LinkedList<>();restore(final byte[] key, public final List<KeyValue> capturedGetCalls = new LinkedList<>(); public final List<KeyValue> capturedDeleteCalls = new LinkedList<>(); public MockKeyValueStore(final String name, final byte[] value) { } }; @Override public void put(final K key, final booleanV persistentvalue) { capturedPutCalls.add(new super(nameKeyValue<>(key, value)); this.persistent = persistentinnerKeyValueStore.put(key, value); } @Override public voidV flushputIfAbsent()final { K key, final V value) { flushed = true; final V originalValue = super.flushget(key); } @Override if public(originalValue boolean== persistent(null) { return persistent; } put(key, value); @Override public void putcapturedPutCalls.add(finalnew Bytes KeyValue<>(key, final byte[] value)) {; super.put(key, value); } capturedPutCalls.add(new KeyValue(key, value))return originalValue; } @Override public byte[]V getdelete(final BytesK key) { byte[]V value = superinnerKeyValueStore.getdelete(key); capturedGetCallscapturedDeleteCalls.add(new KeyValueKeyValue<>(key, value)); return value; } @Override public byte[]void delete(final Bytes key) { byte[] value = super.delete(key); putAll(final List<KeyValue<K, V>> entries) { for capturedDeleteCalls.add(newfinal KeyValue(keyKeyValue<K, value)); V> entry : entries) { return value; } @Override put(entry.key, entry.value); public byte[] putIfAbsent(final Bytes key, final byte[] value) { capturedPutCalls.add(entry); final} byte[] originalValue = get(key); } @Override public ifV get(originalValuefinal ==K nullkey) { V value = putinnerKeyValueStore.get(key, value); capturedPutCallscapturedGetCalls.add(new KeyValueKeyValue<>(key, value)); } return originalValuevalue; } @Override public voidKeyValueIterator<K,V> putAllrange(final K List<KeyValue<Bytesfrom, byte[]>> entries) { final K to) { return innerKeyValueStore.range(from, to); } @Override for (final KeyValue<Bytes, byte[]> entry : entriespublic KeyValueIterator<K,V> all() { put(entry.key, entry.valuereturn innerKeyValueStore.all(); } @Override public long capturedPutCalls.addapproximateNumEntries(entry); { }return innerKeyValueStore.approximateNumEntries(); } } |
Proposed Changes
Compatibility, Deprecation, and Migration Plan
...