Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

After the improvements, we will replace the easyMock StoreBuilder without mock StoreBuilderwith our mockStoreBuilder.

Code Block
public class TopologyTest {

    private final StoreBuilder storeBuilder = EasyMock.createNiceMock(StoreBuilder.class);
    private final Topology topology = new Topology();
    private final MockStoreFactory mockStoreFactory = new MockStoreFactory<>();
    private final KeyValueStoreBuilder keyValueStoreBuilder = mockStoreFactory.createKeyValueStoreBuilder(
            "store",
            Stores.inMemoryKeyValueStore("store"),
            Serdes.Bytes(),
            Serdes.Bytes(),
            false);

...
    @Test(expected = TopologyException.class)
    public void shouldNotAllowToAddStateStoreToNonExistingProcessor() {
        topology.addStateStore(keyValueStoreBuilder, "no-such-processor");
    }
}

...

We add some new classes to the kafka-streams-test-utils under a new org.apache.kafka.streams.state package. 

We will provide a MockStoreFactory to generate Mock store builders:mock store builders, I will use the KeyValueStoreBuilder as an example. Window and Session will have a similar structure.

The developers/users can provide their own store as the backend storage, and their own Serde of choice. For example, for simple testing, they can just use an InMemoryKeyValueStore.

Code Block
package org.apache.kafka.streams.internals;

public class MockStoreFactory<K, V> {

    public final Map<String, StoreBuilder> stateStores = new LinkedHashMap<>();

    public MockStoreFactory () {
    }

    public KeyValueStoreBuilder createKeyValueStoreBuilder(KeyValueBytesStoreSupplier keyValueBytesStoreSupplier,
                                                           final Serde<K> keySerde,
                                                           final Serde<V> valueSerde,
                                                           boolean persistent){
		String storeName = keyValueBytesStoreSupplier.name();
        stateStores.put(storeName, new MockKeyValueStoreBuilder<>(storeName, keyValueBytesStoreSupplier, keySerde, valueSerde, persistent));
        return (KeyValueStoreBuilder)stateStores.get(storeName);
    }

	public WindowStoreBuilder createWindowStoreBuilder(KeyValueBytesStoreSupplier keyValueBytesStoreSupplier,
                                                           final Serde<K> keySerde,
                                                           final Serde<V> valueSerde,
                                                           final Time time){
	...
	}

	public SessionStoreBuilder createSessionStoreBuilder(KeyValueBytesStoreSupplier keyValueBytesStoreSupplier,
                                                           final Serde<K> keySerde,
                                                           final Serde<V> valueSerde,
                                                           final Time time){
	...
	}

    public StoreBuilder getStore(String storeName) {
        return stateStores.get(storeName);
    }
}

I will use the KeyValueStoreBuilder as an example. Window and Session will have a similar structure.


Each Store builder will have a build method:

Code Block
package org.apache.kafka.streams.internals;

import org.apache.kafka.common.serialization.Serde;

import org.apache.kafka.streams.MockTime;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.internals.AbstractStoreBuilder;



public class MockKeyValueStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, KeyValueStore> {

    private final boolean persistent;
    private final KeyValueBytesStoreSupplier storeSupplier;

    public MockKeyValueStoreBuilder(final String storeName,
                                    final KeyValueBytesStoreSupplier storeSupplier,
                                    final Serde<K> keySerde,
                                    final Serde<V> valueSerde,
                                    final boolean persistent) {
        super(storeName, keySerde, valueSerde, new MockTime(0));
        this.persistent = persistent;
        this.storeSupplier = storeSupplier;

    }

    @Override
    public KeyValueStore build() {
        return new MockKeyValueStore<>(name, storeSupplier.get(), persistent);
    }
}


Then in the Storestore, we will build a wrapper around a InMemoryStore, and capture all the get/put calls (excluding Iterators)the provided backend store.

Code Block
package org.apache.kafka.streams.internals;

import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public class MockKeyValueStore<K, V> implements KeyValueStore<K, V>  {
    // keep a global counter of flushes and a local reference to which store had which
    // flush, so we can reason about the order in which stores get flushed.
    private static final AtomicInteger GLOBAL_FLUSH_COUNTER = new AtomicInteger(0);
    private final AtomicInteger instanceLastFlushCount = new AtomicInteger(-1);
    private final String name;
    private final boolean persistent;

    public boolean initialized = false;
    public boolean flushed = false;
    public boolean closed = true;

    public final List<KeyValue<K, V>> capturedPutCalls = new LinkedList<>();
    public final List<KeyValue<K, V>> capturedGetCalls = new LinkedList<>();
    public final List<KeyValue<K, V>> capturedDeleteCalls = new LinkedList<>();

    public final KeyValueStore<K, V> innerKeyValueStore;

    public MockKeyValueStore(final String name,
                             final KeyValueStore<K, V> innerKeyValueStore,
                             final boolean persistent) {
        this.name = name;
        this.innerKeyValueStore = innerKeyValueStore;
        this.persistent = persistent;

    }

    @Override
    public String name() {
        return name;
    }

    @Override
    public void init(final ProcessorContext context,
                     final StateStore root) {
        context.register(root, stateRestoreCallback);
        initialized = true;
        closed = false;
    }

    @Override
    public void flush() {
        instanceLastFlushCount.set(GLOBAL_FLUSH_COUNTER.getAndIncrement());
        innerKeyValueStore.flush();
        flushed = true;
    }

    public int getLastFlushCount() {
        return instanceLastFlushCount.get();
    }

    @Override
    public void close() {
        innerKeyValueStore.close();
        closed = true;
    }

    @Override
    public boolean persistent() {
        return persistent;
    }

    @Override
    public boolean isOpen() {
        return !closed;
    }

    public final StateRestoreCallback stateRestoreCallback = new StateRestoreCallback() {

        @Override
        public void restore(final byte[] key,
                            final byte[] value) {
        }
    };

    @Override
    public void put(final K key, final V value) {
        capturedPutCalls.add(new KeyValue<>(key, value));
        innerKeyValueStore.put(key, value);
    }

    @Override
    public V putIfAbsent(final K key, final V value) {
        final V originalValue = get(key);
        if (originalValue == null) {
            put(key, value);
            capturedPutCalls.add(new KeyValue<>(key, value));
        }
        return originalValue;
    }

    @Override
    public V delete(final K key) {
        V value = innerKeyValueStore.delete(key);
        capturedDeleteCalls.add(new KeyValue<>(key, value));
        return value;
    }

    @Override
    public void putAll(final List<KeyValue<K, V>> entries) {
        for (final KeyValue<K, V> entry : entries) {
            put(entry.key, entry.value);
            capturedPutCalls.add(entry);
        }
    }

    @Override
    public V get(final K key) {
        V value = innerKeyValueStore.get(key);
        capturedGetCalls.add(new KeyValue<>(key, value));
        return value;
    }

    @Override
    public KeyValueIterator<K,V> range(final K from, final K to) {
        return innerKeyValueStore.range(from, to);
    }

    @Override
    public KeyValueIterator<K,V>  all() {
        return innerKeyValueStore.all();
    }

    @Override
    public long approximateNumEntries() {
        return innerKeyValueStore.approximateNumEntries();
    }
}

...