Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

For example, in the current streams/TopologyTest.java:

Code Block
languagejava
public class TopologyTest {

    private final StoreBuilder storeBuilder = EasyMock.createNiceMock(StoreBuilder.class);
    private final KeyValueStoreBuilder globalStoreBuilder = EasyMock.createNiceMock(KeyValueStoreBuilder.class);
    private final Topology topology = new Topology();    
...

	@Test
    public void shouldFailIfSinkIsParent() {
        topology.addSource("source", "topic-1");
        topology.addSink("sink-1", "topic-2", "source");
        try {
            topology.addSink("sink-2", "topic-3", "sink-1");
            fail("Should throw TopologyException for using sink as parent");
        } catch (final TopologyException expected) { }
    }

    @Test(expected = TopologyException.class)
    public void shouldNotAllowToAddStateStoreToNonExistingProcessor() {
        mockStoreBuilder();
        EasyMock.replay(storeBuilder);
        topology.addStateStore(storeBuilder, "no-such-processor");
    }

    @Test
    public void shouldNotAllowToAddStateStoreToSource() {
        mockStoreBuilder();
        EasyMock.replay(storeBuilder);
        topology.addSource("source-1", "topic-1");
        try {
            topology.addStateStore(storeBuilder, "source-1");
            fail("Should have thrown TopologyException for adding store to source node");
        } catch (final TopologyException expected) { }
    }

	private void mockStoreBuilder() {
        EasyMock.expect(storeBuilder.name()).andReturn("store").anyTimes();
        EasyMock.expect(storeBuilder.logConfig()).andReturn(Collections.emptyMap());
        EasyMock.expect(storeBuilder.loggingEnabled()).andReturn(false);
    }
}

One of the goal is to replace the in-test vanilla mockStoreBuilder with a more general purpose mockStoreBuilder class, which simplify writing unit test and can be reuse later.

Public Interfaces

We add some new classes to the kafka-streams-test-utils under a new org.apache.kafka.streams.state package.

We will provide a MockStoreFactory to generate Mock store builders:

After the improvements, we will replace the easyMock StoreBuilder without mock StoreBuilder.

Code Block
public class TopologyTest {

    private final StoreBuilder storeBuilder = EasyMock.createNiceMock(StoreBuilder.class);
    private final Topology topology = new Topology();
    private final MockStoreFactory mockStoreFactory = new MockStoreFactory<>();
    private final KeyValueStoreBuilder keyValueStoreBuilder = mockStoreFactory.createKeyValueStoreBuilder(
Code Block
package org.apache.kafka.streams.internals;

public class MockStoreFactory<K, V> {

    final String storeName;
    final Serde<K> keySerde;
    final Serde<V> valueSerde;
    final Time time;
    final Boolean persistent;

    public MockStoreFactory (final String storeName,
              "store",
            Stores.inMemoryKeyValueStore("store"),
   final Serde<K> keySerde,
       Serdes.Bytes(),
            Serdes.Bytes(),
          final Serde<V> valueSerde, false);

...
    @Test(expected = TopologyException.class)
    public void shouldNotAllowToAddStateStoreToNonExistingProcessor() {
        topology.addStateStore(keyValueStoreBuilder, "no-such-processor");
    }
}


Public Interfaces

We add some new classes to the kafka-streams-test-utils under a new org.apache.kafka.streams.state package.

We will provide a MockStoreFactory to generate Mock store builders:


Code Block
package org.apache.kafka.streams.internals;

public class finalMockStoreFactory<K, TimeV> time) {

    final String   this.storeName;
 = storeName;
  final Serde<K> keySerde;
    this.keySerdefinal =Serde<V> keySerdevalueSerde;
    final Time time;
    this.valueSerdefinal =Boolean valueSerdepersistent;

    public MockStoreFactory   this.time = time;() {
		
    }

    public MockKeyValueStoreBuilder createKeyValueStoreBuilder(){
        return new MockKeyValueStoreBuilder<K,V>(storeName, keySerde, valueSerde, time);
    }
}

...