Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
package org.apache.kafka.streams.internalsstate;

import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public class MockKeyValueStore<K, V> implements KeyValueStore<K, V>  {
    // keep a global counter of flushes and a local reference to which store had which
    // flush, so we can reason about the order in which stores get flushed.
    private static final AtomicInteger GLOBAL_FLUSH_COUNTER = new AtomicInteger(0);
    private final AtomicInteger instanceLastFlushCount = new AtomicInteger(-1);
    private final String name;
    private final boolean persistent;

    public boolean initialized = false;
    public boolean flushed = false;
    public boolean closed = true;

    public final List<KeyValue<K, V>> capturedPutCalls = new LinkedList<>();
    public final List<KeyValue<K, V>> capturedGetCalls = new LinkedList<>();
    public final List<KeyValue<K, V>> capturedDeleteCalls = new LinkedList<>();

    public final KeyValueStore<K, V> innerKeyValueStore;

    public MockKeyValueStore(final String namepublic class MockKeyValueStore<K, V>
        extends WrappedStateStore<KeyValueStore<Bytes, byte[]>, K, V>
        implements KeyValueStore<K, V>  {
    // keep a global counter of flushes and a local reference to which store had which
    // flush, so we can reason about the order in which stores get flushed.
    private static final AtomicInteger GLOBAL_FLUSH_COUNTER = new AtomicInteger(0);
    private final AtomicInteger instanceLastFlushCount = new AtomicInteger(-1);


    public boolean initialized = false;
    public boolean flushed = false;
    public boolean closed = true;

    public String name;
    public boolean persistent;

    protected final Time time;


    final Serde<K> keySerde;
    final Serde<V> valueSerde;
    StateSerdes<K, V> serdes;

    public final List<KeyValue<K, V>> capturedPutCalls = new LinkedList<>();
    public final List<KeyValue<K, V>> capturedGetCalls = new LinkedList<>();
    public final List<KeyValue<K, V>> capturedDeleteCalls = new LinkedList<>();


    public MockKeyValueStore(final KeyValueBytesStoreSupplier keyValueBytesStoreSupplier,
                             final Serde<K> keySerde,
                             final Serde<V> valueSerde,
                             final boolean persistent,
                             final Time time) {
        super(keyValueBytesStoreSupplier.get());
        this.name = keyValueBytesStoreSupplier.name();
        this.time = time != null ? time : Time.SYSTEM;
        this.persistent = persistent;
        this.keySerde = keySerde;
        this.valueSerde = valueSerde;
    }

    @SuppressWarnings("unchecked")
    void initStoreSerde(final ProcessorContext context) {
        serdes = new StateSerdes<>(
                ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
                keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
                valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
    }

    @Override
    public String name() {
        return name;
    }

    @Override
    public void init(final ProcessorContext context,
                     final StateStore root) {
     final KeyValueStore<K, V> innerKeyValueStore,context.register(root, stateRestoreCallback);
        initialized = true;
        closed = false;
    }

    @Override
    finalpublic booleanvoid persistentflush() {
        this.name = name;
  instanceLastFlushCount.set(GLOBAL_FLUSH_COUNTER.getAndIncrement());
      this.innerKeyValueStore = innerKeyValueStore wrapped().flush();
        this.persistentflushed = persistenttrue;

    }

    @Override
    public Stringint namegetLastFlushCount() {
        return nameinstanceLastFlushCount.get();
    }

    @Override
    public void initclose(final ProcessorContext context,) {
         wrapped().close();
            final StateStore root) {
   closed = true;
   context.register(root, stateRestoreCallback);
 }

    @Override
    public initializedboolean = true;persistent() {
        closedreturn = falsepersistent;
    }

    @Override
    public voidboolean flushisOpen() {
        instanceLastFlushCount.set(GLOBAL_FLUSH_COUNTER.getAndIncrement())return !closed;
    }

    public final StateRestoreCallback stateRestoreCallback = new innerKeyValueStore.flushStateRestoreCallback(); {
        flushed = true;@Override
    }

    public intvoid getLastFlushCount() {restore(final byte[] key,
        return instanceLastFlushCount.get();
    }

    @Override
    public void close() {
      final  innerKeyValueStore.close();byte[] value) {
        closed = true;}
    };

    @Override
    public booleanvoid persistent() {put(final K key, final V value) {
        capturedPutCalls.add(new KeyValue<>(key, value));
        return persistentwrapped().put(keyBytes(key), serdes.rawValue(value));
    }

    @Override
    public booleanV isOpenputIfAbsent()final {
K key, final V     return !closed;value) {
    }

    public final StateRestoreCallbackV stateRestoreCallbackoriginalValue = new StateRestoreCallbackget(key) {
;
        @Override
if (originalValue == null) {
    public void restore(final byte[] key,
        put(key, value);
            capturedPutCalls.add(new KeyValue<>(key, value));
     final byte[] value) {}
        return }originalValue;
    };

    @Override
    public voidV putdelete(final K key, final V value) {) {
        V value = outerValue(wrapped().delete(keyBytes(key)));
        capturedPutCallscapturedDeleteCalls.add(new KeyValue<>(key, value));
        innerKeyValueStore.put(key,return value);
    }

    @Override
    public Vvoid putIfAbsentputAll(final K keyList<KeyValue<K, finalV>> V valueentries) {
        final V originalValue = get(key);
        if (originalValue == nullfor (final KeyValue<K, V> entry : entries) {
            put(entry.key,  entry.value);
            capturedPutCalls.add(new KeyValue<>(key, value)entry);
        }
        return originalValue;
    }

    @Override
    public V deleteget(final K key) {
        V value = innerKeyValueStore.deleteouterValue(wrapped().get(keyBytes(key)));
        capturedDeleteCallscapturedGetCalls.add(new KeyValue<>(key, value));
        return value;
    }

    @Override@SuppressWarnings("unchecked")
    public void putAll(final List<KeyValue<K, V>> entries) {
   @Override
    public forKeyValueIterator<K,V> range(final K KeyValue<Kfrom, V>final entryK : entriesto) {
        return new MockKeyValueStore.MockKeyValueIterator(
  put(entry.key, entry.value);
               capturedPutCalls.add(entrywrapped().range(Bytes.wrap(serdes.rawKey(from)), Bytes.wrap(serdes.rawKey(to))));
        }

    }
@SuppressWarnings("unchecked")
    @Override
    public V get(final K keyKeyValueIterator<K,V>  all() {
        Vreturn value = innerKeyValueStore.get(key);
new MockKeyValueStore.MockKeyValueIterator(wrapped().all());
    }

    @Override
    capturedGetCalls.add(new KeyValue<>(key, value));public long approximateNumEntries() {
        return valuewrapped().approximateNumEntries();
    }

    @Override
private    public KeyValueIterator<K,V> rangeV outerValue(final K from, final K tobyte[] value) {
        return innerKeyValueStore.range(from, to);
    }
 value != null ? serdes.valueFrom(value) : null;
    @Override}

    public KeyValueIterator<K,V>  all(private Bytes keyBytes(final K key) {
        return innerKeyValueStore.all()Bytes.wrap(serdes.rawKey(key));
    }

    @Override
private class MockKeyValueIterator implements publicKeyValueIterator<K, longV> approximateNumEntries() {

        return innerKeyValueStore.approximateNumEntries();private final KeyValueIterator<Bytes, byte[]> iter;
		....
    }
}


Proposed Changes

I proposed to add:

...