Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
package org.apache.kafka.streams.internals;

public class MockKeyValueStoreBuilder<K, V>  extends AbstractStoreBuilder<K, V, StateStore> {

    public MockKeyValueStoreBuilder(final String storeName,
                                    final Serde<K> keySerde,
                                    final Serde<V> valueSerde,
                                    final Time time) {
        super(storeName, keySerde, valueSerde, time);
    }

    @Override
    public KeyValueStore build() {
        return new MockKeyValueStore(name);
    }
}


Then in the Store, we will be a wrapper around a InMemoryStore, and capture all the get/put calls (excluding Iterators)

Code Block
public class MockKeyValueStore extends InMemoryKeyValueStore {
    private final boolean persistent;

    public boolean initialized = false;
    public boolean flushed = false;

    public final List<KeyValue> capturedPutCalls = new LinkedList<>();
    public final List<KeyValue> capturedGetCalls = new LinkedList<>();
    public final List<KeyValue> capturedDeleteCalls = new LinkedList<>();

    public MockKeyValueStore(final String name,
                             final boolean persistent) {
        super(name);
        this.persistent = persistent;
    }

    @Override
    public void flush() {
        flushed = true;
        super.flush();
    }

    @Override
    public boolean persistent() {
        return persistent;
    }

    @Override
    public void put(final Bytes key, final byte[] value) {
        super.put(key, value);
        capturedPutCalls.add(new KeyValue(key, value));
    }

    @Override
    public byte[] get(final Bytes key) {
        byte[] value = super.get(key);
        capturedGetCalls.add(new KeyValue(key, value));
        return value;
    }

    @Override
    public byte[] delete(final Bytes key) {
        byte[] value = super.delete(key);
        capturedDeleteCalls.add(new KeyValue(key, value));
        return value;
    }

    @Override
    public byte[] putIfAbsent(final Bytes key, final byte[] value) {
        final byte[] originalValue = get(key);
        if (originalValue == null) {
            put(key, value);
            capturedPutCalls.add(new KeyValue(key, value));
        }
        return originalValue;
    }

    @Override
    public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
        for (final KeyValue<Bytes, byte[]> entry : entries) {
            put(entry.key, entry.value);
            capturedPutCalls.add(entry);
        }
    }
}


Compatibility, Deprecation, and Migration Plan

...