Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
public class TopologyTest {

    private final StoreBuilder storeBuilder = EasyMock.createNiceMock(StoreBuilder.class);
    private final Topology topology = new Topology();
    private final MockStoreFactory mockStoreFactory = new MockStoreFactory<>();
    private final KeyValueStoreBuilder keyValueStoreBuilder = mockStoreFactory.createKeyValueStoreBuilder(
            Stores.inMemoryKeyValueStore("store"),
            Serdes.Bytes(),
            Serdes.Bytes(),
            false,
			Time.System);

...
    @Test(expected = TopologyException.class)
    public void shouldNotAllowToAddStateStoreToNonExistingProcessor() {
        topology.addStateStore(keyValueStoreBuilder, "no-such-processor");
    }
}

...

We add some new classes to the kafka-streams-a state package (org.apache.kafka.streams.stateunder streams/test-utils under internal.


We will provide a MockStoreFactory to generate mock store builders, I will use the KeyValueStoreBuilder as an example. Window and Session will have a similar structure.

...

Code Block
package org.apache.kafka.streams.internalsstate;

import org.apache.kafka.common.serialization.Serde;

import org.apache.kafka.streams.MockTime;
import org.apache.kafka.streams.state.KeyValueBytesStoreSuppliercommon.utils.Time;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.internals.AbstractStoreBuilderKeyValueStoreBuilder;



public class MockKeyValueStoreBuilder<K, V> extends AbstractStoreBuilder<KKeyValueStoreBuilder<K, V,V> KeyValueStore> {

    private final boolean persistent;
    private final KeyValueBytesStoreSupplier storeSupplier;
    final Serde<K> keySerde;
    final Serde<V> valueSerde;
    final Time time;

    public MockKeyValueStoreBuilder(final KeyValueBytesStoreSupplier storeSupplier,
                                    final Serde<K> keySerde,
                                    final Serde<V> valueSerde,
                                    final boolean persistent),
 {
        super(storeSupplier.name(), keySerde, valueSerde, new MockTime(0));
        this.persistent = persistent;
        this.storeSupplier = storeSupplier;

     final Time time) {
       }

    @Override super(storeSupplier, keySerde, valueSerde, time);
    public KeyValueStore build() {
 this.persistent = persistent;
     return new MockKeyValueStore<>(name, storeSupplier.get(), persistent)this.storeSupplier = storeSupplier;
    }
}    this.keySerde = keySerde;
        this.valueSerde = valueSerde;
        this.time = time;
    }

    @Override
    public KeyValueStore<K, V> build() {
        return new MockKeyValueStore<>(storeSupplier, keySerde, valueSerde, persistent, time);
    }
}


Then in the store, we will build a wrapper around the provided backend store. We will capture each get/put/delete call, the user can write tests accordingly. We will also track if the store has been flushed or closed.

...

Proposed Changes

I proposed to add:

  1. a MockStoreFactory A MockStoreFactory class to produce mock state store builders.
  2. A mock StateStoreBuilder class for KV, Session and Window.
  3. A mock StateStore class for KV, Session and Window with tracking.

...

Until we are refactoring with these new MockStateStores, there shouldn't be any compatibility issues. But the next phase should be refactoring, including:

1) Remove and refactor redundant MockStores refactor redundant MockStores (i.e. org.apache.kafka.test.MockKeyValueStore)

2) Examine the current tests (i.e. org.apache.kafka.test.MockKeyValueStore)2) Examine the current tests (i.e. org.apache.kafka.streams.TopologyTest and refactor the testing code logics with the new MockStateStores..streams.TopologyTest and refactor the testing code logics with the new MockStateStores.

A discussion that has been brought up is 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-8630
. In-memory Window/Session store doesn't work well in tests because in the init() method, it has internally cast ProcessorContext into InternalProcessorContext, that way the tester couldn't use MockProcessorContext. Either we twist the init() method in the mock store to accommodate or we can somehow use InternalMockProcessorContext instead?

Rejected Alternatives

  1. Using the current EasyMock implementation. There is no strong argument against the current EasyMock implementation, it is easy to use and lightweight. The main argument for this KIP is to write better tests with a an in-house mock state store support.

...