THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
package org.apache.kafka.streams.internalsstate; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import java.util.LinkedList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; public class MockKeyValueStore<K, V> implements KeyValueStore<K, V> { // keep a global counter of flushes and a local reference to which store had which // flush, so we can reason about the order in which stores get flushed. private static final AtomicInteger GLOBAL_FLUSH_COUNTER = new AtomicInteger(0); private final AtomicInteger instanceLastFlushCount = new AtomicInteger(-1); private final String name; private final boolean persistent; public boolean initialized = false; public boolean flushed = false; public boolean closed = true; public final List<KeyValue<K, V>> capturedPutCalls = new LinkedList<>(); public final List<KeyValue<K, V>> capturedGetCalls = new LinkedList<>(); public final List<KeyValue<K, V>> capturedDeleteCalls = new LinkedList<>(); public final KeyValueStore<K, V> innerKeyValueStore; public MockKeyValueStore(final String namepublic class MockKeyValueStore<K, V> extends WrappedStateStore<KeyValueStore<Bytes, byte[]>, K, V> implements KeyValueStore<K, V> { // keep a global counter of flushes and a local reference to which store had which // flush, so we can reason about the order in which stores get flushed. private static final AtomicInteger GLOBAL_FLUSH_COUNTER = new AtomicInteger(0); private final AtomicInteger instanceLastFlushCount = new AtomicInteger(-1); public boolean initialized = false; public boolean flushed = false; public boolean closed = true; public String name; public boolean persistent; protected final Time time; final Serde<K> keySerde; final Serde<V> valueSerde; StateSerdes<K, V> serdes; public final List<KeyValue<K, V>> capturedPutCalls = new LinkedList<>(); public final List<KeyValue<K, V>> capturedGetCalls = new LinkedList<>(); public final List<KeyValue<K, V>> capturedDeleteCalls = new LinkedList<>(); public MockKeyValueStore(final KeyValueBytesStoreSupplier keyValueBytesStoreSupplier, final Serde<K> keySerde, final Serde<V> valueSerde, final boolean persistent, final Time time) { super(keyValueBytesStoreSupplier.get()); this.name = keyValueBytesStoreSupplier.name(); this.time = time != null ? time : Time.SYSTEM; this.persistent = persistent; this.keySerde = keySerde; this.valueSerde = valueSerde; } @SuppressWarnings("unchecked") void initStoreSerde(final ProcessorContext context) { serdes = new StateSerdes<>( ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), keySerde == null ? (Serde<K>) context.keySerde() : keySerde, valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); } @Override public String name() { return name; } @Override public void init(final ProcessorContext context, final StateStore root) { final KeyValueStore<K, V> innerKeyValueStore,context.register(root, stateRestoreCallback); initialized = true; closed = false; } @Override finalpublic booleanvoid persistentflush() { this.name = name; instanceLastFlushCount.set(GLOBAL_FLUSH_COUNTER.getAndIncrement()); this.innerKeyValueStore = innerKeyValueStore wrapped().flush(); this.persistentflushed = persistenttrue; } @Override public Stringint namegetLastFlushCount() { return nameinstanceLastFlushCount.get(); } @Override public void initclose(final ProcessorContext context,) { wrapped().close(); final StateStore root) { closed = true; context.register(root, stateRestoreCallback); } @Override public initializedboolean = true;persistent() { closedreturn = falsepersistent; } @Override public voidboolean flushisOpen() { instanceLastFlushCount.set(GLOBAL_FLUSH_COUNTER.getAndIncrement())return !closed; } public final StateRestoreCallback stateRestoreCallback = new innerKeyValueStore.flushStateRestoreCallback(); { flushed = true;@Override } public intvoid getLastFlushCount() {restore(final byte[] key, return instanceLastFlushCount.get(); } @Override public void close() { final innerKeyValueStore.close();byte[] value) { closed = true;} }; @Override public booleanvoid persistent() {put(final K key, final V value) { capturedPutCalls.add(new KeyValue<>(key, value)); return persistentwrapped().put(keyBytes(key), serdes.rawValue(value)); } @Override public booleanV isOpenputIfAbsent()final { K key, final V return !closed;value) { } public final StateRestoreCallbackV stateRestoreCallbackoriginalValue = new StateRestoreCallbackget(key) { ; @Override if (originalValue == null) { public void restore(final byte[] key, put(key, value); capturedPutCalls.add(new KeyValue<>(key, value)); final byte[] value) {} return }originalValue; }; @Override public voidV putdelete(final K key, final V value) {) { V value = outerValue(wrapped().delete(keyBytes(key))); capturedPutCallscapturedDeleteCalls.add(new KeyValue<>(key, value)); innerKeyValueStore.put(key,return value); } @Override public Vvoid putIfAbsentputAll(final K keyList<KeyValue<K, finalV>> V valueentries) { final V originalValue = get(key); if (originalValue == nullfor (final KeyValue<K, V> entry : entries) { put(entry.key, entry.value); capturedPutCalls.add(new KeyValue<>(key, value)entry); } return originalValue; } @Override public V deleteget(final K key) { V value = innerKeyValueStore.deleteouterValue(wrapped().get(keyBytes(key))); capturedDeleteCallscapturedGetCalls.add(new KeyValue<>(key, value)); return value; } @Override@SuppressWarnings("unchecked") public void putAll(final List<KeyValue<K, V>> entries) { @Override public forKeyValueIterator<K,V> range(final K KeyValue<Kfrom, V>final entryK : entriesto) { return new MockKeyValueStore.MockKeyValueIterator( put(entry.key, entry.value); capturedPutCalls.add(entrywrapped().range(Bytes.wrap(serdes.rawKey(from)), Bytes.wrap(serdes.rawKey(to)))); } } @SuppressWarnings("unchecked") @Override public V get(final K keyKeyValueIterator<K,V> all() { Vreturn value = innerKeyValueStore.get(key); new MockKeyValueStore.MockKeyValueIterator(wrapped().all()); } @Override capturedGetCalls.add(new KeyValue<>(key, value));public long approximateNumEntries() { return valuewrapped().approximateNumEntries(); } @Override private public KeyValueIterator<K,V> rangeV outerValue(final K from, final K tobyte[] value) { return innerKeyValueStore.range(from, to); } value != null ? serdes.valueFrom(value) : null; @Override} public KeyValueIterator<K,V> all(private Bytes keyBytes(final K key) { return innerKeyValueStore.all()Bytes.wrap(serdes.rawKey(key)); } @Override private class MockKeyValueIterator implements publicKeyValueIterator<K, longV> approximateNumEntries() { return innerKeyValueStore.approximateNumEntries();private final KeyValueIterator<Bytes, byte[]> iter; .... } } |
Proposed Changes
I proposed to add:
...