Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
public class TopologyTest {

    private final StoreBuilder storeBuilder = EasyMock.createNiceMock(StoreBuilder.class);
    private final Topology topology = new Topology();
    private final MockStoreFactory mockStoreFactory = new MockStoreFactory<>();
    private final KeyValueStoreBuilder keyValueStoreBuilder = mockStoreFactory.createKeyValueStoreBuilder(
            Stores.inMemoryKeyValueStore("store"),
            Serdes.Bytes(),
            Serdes.Bytes(),
            false,
			Time.System);

...
    @Test(expected = TopologyException.class)
    public void shouldNotAllowToAddStateStoreToNonExistingProcessor() {
        topology.addStateStore(keyValueStoreBuilder, "no-such-processor");
    }
}

...

We add some new classes to the kafka-streams-a state package (org.apache.kafka.streams.stateunder streams/test-utils under internal.


We will provide a MockStoreFactory to generate mock store builders, I will use the KeyValueStoreBuilder as an example. Window and Session will have a similar structure.

...

Code Block
package org.apache.kafka.streams.internalsstate;

import org.apache.kafka.common.serialization.Serde;

import org.apache.kafka.streams.MockTime;
import org.apache.kafka.streams.state.KeyValueBytesStoreSuppliercommon.utils.Time;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.internals.AbstractStoreBuilderKeyValueStoreBuilder;



public class MockKeyValueStoreBuilder<K, V> extends AbstractStoreBuilder<KKeyValueStoreBuilder<K, V,V> KeyValueStore> {

    private final boolean persistent;
    private final KeyValueBytesStoreSupplier storeSupplier;
    final Serde<K> keySerde;
    final Serde<V> valueSerde;
    final Time time;

    public MockKeyValueStoreBuilder(final KeyValueBytesStoreSupplier storeSupplier,
                                    final Serde<K> keySerde,
                                    final Serde<V> valueSerde,
                                    final boolean persistent,
                                    final Time time) {
        super(storeSupplier, keySerde, valueSerde, time);
        this.persistent = persistent;
        finalthis.storeSupplier boolean persistent) {= storeSupplier;
        super(storeSupplier.name(), keySerde, valueSerde, new MockTime(0))this.keySerde = keySerde;
        this.persistentvalueSerde = persistentvalueSerde;
        this.storeSuppliertime = storeSuppliertime;

    }

    @Override
    public KeyValueStoreKeyValueStore<K, V> build() {
        return new MockKeyValueStore<>(namestoreSupplier, keySerde, storeSupplier.get()valueSerde, persistent, time);
    }
}


Then in the store, we will build a wrapper around the provided backend store. We will capture each get/put/delete call, the user can write tests accordingly. We will also track if the store has been flushed or closed.

...

Proposed Changes

I proposed to add:

  1. a MockStoreFactory A MockStoreFactory class to produce mock state store builders.
  2. A mock StateStoreBuilder class for KV, Session and Window.
  3. A mock StateStore class for KV, Session and Window with tracking.

...