Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
package org.apache.kafka.streams.internals;

public class MockKeyValueStoreBuilder<K, V>  extends AbstractStoreBuilder<K, V, StateStore> {

    final Boolean persistent;

    public MockKeyValueStoreBuilder(final String storeName,
     import org.apache.kafka.common.serialization.Serde;

import org.apache.kafka.streams.MockTime;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.internals.AbstractStoreBuilder;



public class MockKeyValueStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, KeyValueStore> {

    private final boolean persistent;
    private final KeyValueBytesStoreSupplier storeSupplier;

    public MockKeyValueStoreBuilder(final String storeName,
                                    final KeyValueBytesStoreSupplier storeSupplier,
                                    final Serde<K> keySerde,
                                    final Serde<V> valueSerde,
                                    final boolean persistent) {
        super(storeName, keySerde, valueSerde, new MockTime(0));
        this.persistent = persistent;
        this.storeSupplier = storeSupplier;

    }

    @Override
    public KeyValueStore build() {
        return new MockKeyValueStore<>(name, storeSupplier.get(), persistent);
    }
}


Then in the Store, we will build a wrapper around a InMemoryStore, and capture all the get/put calls (excluding Iterators)

Code Block
package org.apache.kafka.streams.internals;

import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public class MockKeyValueStore<K, V> implements KeyValueStore<K, V>  {
    // keep a global counter of flushes and a local reference to which store had which
    // flush, so we can reason about the order in which stores get flushed.
    private static final AtomicInteger GLOBAL_FLUSH_COUNTER = new AtomicInteger(0);
    private final AtomicInteger instanceLastFlushCount = new AtomicInteger(-1);
    private final String name;
    private final boolean persistent;

    public boolean initialized = false;
    public boolean flushed = false;
    public boolean closed = true;

    public final List<KeyValue<K, V>> capturedPutCalls = new LinkedList<>();
    public final List<KeyValue<K, V>> capturedGetCalls = new LinkedList<>();
    public final List<KeyValue<K, V>> capturedDeleteCalls = new LinkedList<>();

    public final KeyValueStore<K, V> innerKeyValueStore;

    public MockKeyValueStore(final String name,
                             final KeyValueStore<K, V> innerKeyValueStore,
                             final boolean persistent) {
        this.name = name;
        this.innerKeyValueStore = innerKeyValueStore;
        this.persistent = persistent;

    }

    @Override
    public String name() {
        return name;
    }

    @Override
    public void init(final ProcessorContext context,
                     final StateStore root) {
        context.register(root, stateRestoreCallback);
        initialized = true;
        closed = false;
  final Serde<K> keySerde,}

    @Override
    public void flush() {
        instanceLastFlushCount.set(GLOBAL_FLUSH_COUNTER.getAndIncrement());
        innerKeyValueStore.flush();
        flushed final Serde<V> valueSerde,= true;
    }

    public int getLastFlushCount() {
        return instanceLastFlushCount.get();
    }

    @Override
    public    final Time timevoid close() {
        super(storeName, keySerde, valueSerde, timeinnerKeyValueStore.close();
        this.persistentclosed = persistenttrue;
    }

    @Override
    public KeyValueStoreboolean buildpersistent() {
        return new MockKeyValueStore(name)persistent;
    }
}

Then in the Store, we will build a wrapper around a InMemoryStore, and capture all the get/put calls (excluding Iterators)

Code Block
public class MockKeyValueStore extends InMemoryKeyValueStore {


    @Override
    public boolean isOpen() {
      private final booleanreturn persistent;!closed;
    }

    public final booleanStateRestoreCallback initializedstateRestoreCallback = false; new StateRestoreCallback() {

    public boolean flushed = false;
@Override
    public final List<KeyValue> capturedPutCalls =public newvoid LinkedList<>();restore(final byte[] key,
    public final List<KeyValue> capturedGetCalls = new LinkedList<>();
    public final List<KeyValue> capturedDeleteCalls = new LinkedList<>();

    public MockKeyValueStore(final String name,
 final byte[] value) {
        }
    };

    @Override
    public void put(final K key, final booleanV persistentvalue) {
        capturedPutCalls.add(new super(nameKeyValue<>(key, value));
        this.persistent = persistentinnerKeyValueStore.put(key, value);
    }

    @Override
    public voidV flushputIfAbsent()final {
K key, final V value) {
   flushed = true;
   final V originalValue =  super.flushget(key);
    }


     @Override
   if public(originalValue boolean== persistent(null) {
        return persistent;
    }

put(key, value);
      @Override
    public void putcapturedPutCalls.add(finalnew Bytes KeyValue<>(key, final byte[] value)) {;
        super.put(key, value); }
        capturedPutCalls.add(new KeyValue(key, value))return originalValue;
    }

    @Override
    public byte[]V getdelete(final BytesK key) {
        byte[]V value = superinnerKeyValueStore.getdelete(key);
        capturedGetCallscapturedDeleteCalls.add(new KeyValueKeyValue<>(key, value));
        return value;
    }

    @Override
    public byte[]void delete(final Bytes key) {
        byte[] value = super.delete(key);
putAll(final List<KeyValue<K, V>> entries) {
        for capturedDeleteCalls.add(newfinal KeyValue(keyKeyValue<K, value));
V> entry : entries) {
    return value;
    }

    @Override
put(entry.key, entry.value);
     public byte[] putIfAbsent(final Bytes key, final byte[] value) { capturedPutCalls.add(entry);
        final}
 byte[] originalValue = get(key);
 }

    @Override
    public ifV get(originalValuefinal ==K nullkey) {
        V value =  putinnerKeyValueStore.get(key, value);
            capturedPutCallscapturedGetCalls.add(new KeyValueKeyValue<>(key, value));
        }
        return originalValuevalue;
    }

    @Override
    public voidKeyValueIterator<K,V> putAllrange(final K List<KeyValue<Bytesfrom, byte[]>> entries) { final K to) {
        return innerKeyValueStore.range(from, to);
    }

    @Override
    for (final KeyValue<Bytes, byte[]> entry : entriespublic KeyValueIterator<K,V>  all() {
            put(entry.key, entry.valuereturn innerKeyValueStore.all();
    }

    @Override
    public long capturedPutCalls.addapproximateNumEntries(entry); {
         }return innerKeyValueStore.approximateNumEntries();
    }
}


Proposed Changes


Compatibility, Deprecation, and Migration Plan

...