Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
public class TopologyTest {

    private final StoreBuilder storeBuilder = EasyMock.createNiceMock(StoreBuilder.class);
    private final Topology topology = new Topology();
    private final MockStoreFactory mockStoreFactory = new MockStoreFactory<>();
    private final KeyValueStoreBuilder keyValueStoreBuilder = mockStoreFactory.createKeyValueStoreBuilder(
            "store",
            Stores.inMemoryKeyValueStore("store"),
            Serdes.Bytes(),
            Serdes.Bytes(),
            false,
			Time.System);

...
    @Test(expected = TopologyException.class)
    public void shouldNotAllowToAddStateStoreToNonExistingProcessor() {
        topology.addStateStore(keyValueStoreBuilder, "no-such-processor");
    }
}

...

We add some new classes to the kafka-streams-a state package (org.apache.kafka.streams.stateunder streams/test-utils .


We will provide a MockStoreFactory to generate mock store builders, I will use the KeyValueStoreBuilder as an example. Window and Session will have a similar structure.

...

Code Block
package org.apache.kafka.streams.internals;

public class MockStoreFactory<K, V> {

    public final Map<String, StoreBuilder> stateStores = new LinkedHashMap<>();

    public MockStoreFactory () {
    }

    public KeyValueStoreBuilder createKeyValueStoreBuilder(KeyValueBytesStoreSupplier keyValueBytesStoreSupplier,
                                                           final Serde<K> keySerde,
                                                           final Serde<V> valueSerde,
                                                           boolean persistent){
		String storeName = keyValueBytesStoreSupplier.name();
        stateStores.put(storeName, new MockKeyValueStoreBuilder<>(storeName, keyValueBytesStoreSupplier, keySerde, valueSerde, persistent));
        return (KeyValueStoreBuilder)stateStores.get(storeName);
    }

	public WindowStoreBuilder createWindowStoreBuilder(KeyValueBytesStoreSupplier keyValueBytesStoreSupplier,
                                                           final Serde<K> keySerde,
                                                           final Serde<V> valueSerde,
                                                           final Time time){
	...
	}

	public SessionStoreBuilder createSessionStoreBuilder(KeyValueBytesStoreSupplier keyValueBytesStoreSupplier,
                                                           final Serde<K> keySerde,
                                                           final Serde<V> valueSerde,
                                                           final Time time){
	...
	}

    public StoreBuilder getStore(String storeName) {
        return stateStores.get(storeName);
    }
}

...

Code Block
package org.apache.kafka.streams.internalsstate;

import org.apache.kafka.common.serialization.Serde;

import org.apache.kafka.common.streamsutils.MockTimeTime;
import org.apache.kafka.streams.state.internals.KeyValueBytesStoreSupplierKeyValueStoreBuilder;


import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.internals.AbstractStoreBuilder;



public class public class MockKeyValueStoreBuilder<K, V> extends AbstractStoreBuilder<KKeyValueStoreBuilder<K, V, KeyValueStore>V> {

    private final boolean persistent;
    private final KeyValueBytesStoreSupplier storeSupplier;

    public MockKeyValueStoreBuilder(final StringSerde<K> storeName,keySerde;
    final Serde<V> valueSerde;
    final Time time;

    public MockKeyValueStoreBuilder(final KeyValueBytesStoreSupplier storeSupplier,
                                    final KeyValueBytesStoreSupplierSerde<K> storeSupplierkeySerde,
                                    final Serde<K>Serde<V> keySerdevalueSerde,
                                    final Serde<V>boolean valueSerdepersistent,
                                    final booleanTime persistenttime) {
        super(storeNamestoreSupplier, keySerde, valueSerde, new MockTime(0)time);
        this.persistent = persistent;
        this.storeSupplier = storeSupplier;

    }

    @Override
this.keySerde = keySerde;
  public KeyValueStore build() {
   this.valueSerde = valueSerde;
     return new MockKeyValueStore<>(name, storeSupplier.get(), persistent)this.time = time;
    }
}

Then in the store, we will build a wrapper around the provided backend store.


    @Override
    public KeyValueStore<K, V> build() {
        return new MockKeyValueStore<>(storeSupplier, keySerde, valueSerde, persistent, time);
    }
}


Then in the store, we will build a wrapper around the provided backend store. We will capture each get/put/delete call, the user can write tests accordingly. We will also track if the store has been flushed or closed.

Code Block
package
Code Block
package org.apache.kafka.streams.internals;

import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStorestate;

import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public class MockKeyValueStore<K, V> implements KeyValueStore<K, V>  public class MockKeyValueStore<K, V>
        extends WrappedStateStore<KeyValueStore<Bytes, byte[]>, K, V>
        implements KeyValueStore<K, V>  {
    // keep a global counter of flushes and a local reference to which store had which
    // flush, so we can reason about the order in which stores get flushed.
    private static final AtomicInteger GLOBAL_FLUSH_COUNTER = new AtomicInteger(0);
    private final AtomicInteger instanceLastFlushCount = new AtomicInteger(-1);
    private final String name;
    private final boolean persistent;

    public boolean initialized = false;
    public boolean flushed = false;
    public boolean closed = true;

    public String finalname;
 List<KeyValue<K, V>> capturedPutCalls =public newboolean LinkedList<>()persistent;

    protected final Time time;


    final Serde<K> keySerde;
    final Serde<V> valueSerde;
    StateSerdes<K, V> serdes;

    public final List<KeyValue<K, V>> capturedGetCallscapturedPutCalls = new LinkedList<>();
    public final List<KeyValue<K, V>> capturedDeleteCallscapturedGetCalls = new LinkedList<>();

    public final KeyValueStore<KList<KeyValue<K, V> innerKeyValueStoreV>> capturedDeleteCalls = new LinkedList<>();


    public MockKeyValueStore(final StringKeyValueBytesStoreSupplier namekeyValueBytesStoreSupplier,
                             final KeyValueStore<K, V> innerKeyValueStoreSerde<K> keySerde,
                             final boolean persistent) {
Serde<V> valueSerde,
         this.name = name;
        this.innerKeyValueStore = innerKeyValueStore;
        this.persistentfinal =boolean persistent;
,
    }

    @Override
    public String name() {
        return name;
    }

 final Time time) @Override{
    public void init(final ProcessorContext context, super(keyValueBytesStoreSupplier.get());
        this.name = keyValueBytesStoreSupplier.name();
        this.time = time != finalnull ? StateStoretime root) {: Time.SYSTEM;
        context.register(root, stateRestoreCallback)this.persistent = persistent;
        initializedthis.keySerde = truekeySerde;
        closedthis.valueSerde = falsevalueSerde;
    }

    @Override@SuppressWarnings("unchecked")
    public void flushinitStoreSerde(final ProcessorContext context) {
        instanceLastFlushCount.set(GLOBAL_FLUSH_COUNTER.getAndIncrement());
serdes = new StateSerdes<>(
          innerKeyValueStore.flush();      ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
        flushed = true;
    }

  keySerde == null public? int getLastFlushCount(Serde<K>) context.keySerde() {
: keySerde,
          return instanceLastFlushCount.get(      valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
    }

    @Override
    public voidString closename() {
        return innerKeyValueStore.close()name;
    }

    closed = true;@Override
    }

public void init(final ProcessorContext @Overridecontext,
    public boolean persistent() {
        return persistent;
    }

 final StateStore root) @Override{
    public   boolean isOpen() {context.register(root, stateRestoreCallback);
        return !closedinitialized = true;
    }

    publicclosed final= StateRestoreCallbackfalse;
 stateRestoreCallback = new StateRestoreCallback() {}

    @Override
    @Override
  public void flush() {
      public void restore(final byte[] key, instanceLastFlushCount.set(GLOBAL_FLUSH_COUNTER.getAndIncrement());
        wrapped().flush();
        flushed = true;
    }

    public  final byte[] valueint getLastFlushCount() {
         }return instanceLastFlushCount.get();
    };

    @Override
    public void put(final K key, final V valueclose() {
        capturedPutCallswrapped().add(new KeyValue<>(key, value)close();
         innerKeyValueStore.put(key, value)closed = true;
    }

    @Override
    public Vboolean putIfAbsent(final K key, final V value) {persistent() {
        return persistent;
    }

    final@Override
   V originalValuepublic =boolean getisOpen(key); {
        return if (originalValue == null) {!closed;
    }

    public final StateRestoreCallback stateRestoreCallback = new   put(key, value);
StateRestoreCallback() {
        @Override
     capturedPutCalls.add(new KeyValue<>(key, value));   public void restore(final byte[] key,
        }
        return originalValue;
    }

    @Override
    public V delete(final Kbyte[] keyvalue) {
        }
 V value = innerKeyValueStore.delete(key) };

    @Override
    capturedDeleteCalls.add(new public void put(final K key, final V value) {
        capturedPutCalls.add(new KeyValue<>(key, value));
        return valuewrapped().put(keyBytes(key), serdes.rawValue(value));
    }

    @Override
    public voidV putAllputIfAbsent(final List<KeyValue<KK key, V>>final V entriesvalue) {
        for (final KeyValue<K, V> entry : entriesV originalValue = get(key);
        if (originalValue == null) {
            put(entry.key, entry.value);
            capturedPutCalls.add(entrynew KeyValue<>(key, value));
        }
        return originalValue;
    }

    @Override
    public V getdelete(final K key) {
        V value = innerKeyValueStore.get(key)outerValue(wrapped().delete(keyBytes(key)));
        capturedGetCallscapturedDeleteCalls.add(new KeyValue<>(key, value));
        return value;
    }

    @Override
    public KeyValueIterator<K,V>void rangeputAll(final K fromList<KeyValue<K, finalV>> K toentries) {
        returnfor innerKeyValueStore.range(from(final KeyValue<K, to);
V> entry : entries) }{

     @Override
    public KeyValueIterator<K,V>  all() {
put(entry.key, entry.value);
         return innerKeyValueStore  capturedPutCalls.alladd(entry);
        }
    }

    @Override
    public longV approximateNumEntriesget(final K key) {
        V return innerKeyValueStore.approximateNumEntries()value = outerValue(wrapped().get(keyBytes(key)));
        capturedGetCalls.add(new  }
}

Proposed Changes

KeyValue<>(key, value));
        return value;
    }

    @SuppressWarnings("unchecked")
    @Override
    public KeyValueIterator<K,V> range(final K from, final K to) {
        return new MockKeyValueStore.MockKeyValueIterator(
                wrapped().range(Bytes.wrap(serdes.rawKey(from)), Bytes.wrap(serdes.rawKey(to))));
    }

    @SuppressWarnings("unchecked")
    @Override
    public KeyValueIterator<K,V>  all() {
        return new MockKeyValueStore.MockKeyValueIterator(wrapped().all());
    }

    @Override
    public long approximateNumEntries() {
        return wrapped().approximateNumEntries();
    }

    private V outerValue(final byte[] value) {
        return value != null ? serdes.valueFrom(value) : null;
    }

    private Bytes keyBytes(final K key) {
        return Bytes.wrap(serdes.rawKey(key));
    }

    private class MockKeyValueIterator implements KeyValueIterator<K, V> {

        private final KeyValueIterator<Bytes, byte[]> iter;
		....
    }
}


Proposed Changes

I proposed to add:

  1. A MockStoreFactory class to produce mock state store builders.
  2. A mock StateStoreBuilder class for KV, Session and Window.
  3. A mock StateStore class for KV, Session and Window with tracking.

Compatibility, Deprecation, and Migration Plan

...

2) Examine the current tests (i.e. org.apache.kafka.streams.TopologyTest ), remove complicate and refactor the testing code logics and refactor with the new MockStateStores.

Rejected Alternatives

1) Rebuilding a MockStateStores vs extending an InMemoryStore

  • If we are rebuilding the functionality of a store from scratch in memory, it will basically act like an InMemoryStore.

2) Track all calls in a total order.

  • Now I haven't got a feature request to track calls within a total order, so now we track the order separately. If there is a need, we can definitely adjust it.

3)Using the MockStateStoreFactory as a main entry point to access stores. (Keeping the state of all StoreBuilder)

...

A discussion that has been brought up is 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-8630
. In-memory Window/Session store doesn't work well in tests because in the init() method, it has internally cast ProcessorContext into InternalProcessorContext, that way the tester couldn't use MockProcessorContext. Either we twist the init() method in the mock store to accommodate or we can somehow use InternalMockProcessorContext instead?

Rejected Alternatives

  1. Using the current EasyMock implementation. There is no strong argument against the current EasyMock implementation, it is easy to use and lightweight. The main argument for this KIP is to write better tests with an in-house mock state store support.