...
For example, in the current streams/TopologyTest.java:
Code Block | ||
---|---|---|
| ||
public class TopologyTest { private final StoreBuilder storeBuilder = EasyMock.createNiceMock(StoreBuilder.class); private final KeyValueStoreBuilder globalStoreBuilder = EasyMock.createNiceMock(KeyValueStoreBuilder.class); private final Topology topology = new Topology(); ... @Test public void shouldFailIfSinkIsParent() { topology.addSource("source", "topic-1"); topology.addSink("sink-1", "topic-2", "source"); try { topology.addSink("sink-2", "topic-3", "sink-1"); fail("Should throw TopologyException for using sink as parent"); } catch (final TopologyException expected) { } } @Test(expected = TopologyException.class) public void shouldNotAllowToAddStateStoreToNonExistingProcessor() { mockStoreBuilder(); EasyMock.replay(storeBuilder); topology.addStateStore(storeBuilder, "no-such-processor"); } @Test public void shouldNotAllowToAddStateStoreToSource() { mockStoreBuilder(); EasyMock.replay(storeBuilder); topology.addSource("source-1", "topic-1"); try { topology.addStateStore(storeBuilder, "source-1"); fail("Should have thrown TopologyException for adding store to source node"); } catch (final TopologyException expected) { } } private void mockStoreBuilder() { EasyMock.expect(storeBuilder.name()).andReturn("store").anyTimes(); EasyMock.expect(storeBuilder.logConfig()).andReturn(Collections.emptyMap()); EasyMock.expect(storeBuilder.loggingEnabled()).andReturn(false); } } |
One of the goal is to replace the in-test vanilla mockStoreBuilder with a more general purpose mockStoreBuilder class, which simplify writing unit test and can be reuse later.
Public Interfaces
We add some new classes to the kafka-streams-test-utils
under a new org.apache.kafka.streams.state package.
We will provide a MockStoreFactory to generate Mock store builders:
After the improvements, we will replace the easyMock StoreBuilder without mock StoreBuilder.
Code Block |
---|
public class TopologyTest {
private final StoreBuilder storeBuilder = EasyMock.createNiceMock(StoreBuilder.class);
private final Topology topology = new Topology();
private final MockStoreFactory mockStoreFactory = new MockStoreFactory<>();
private final KeyValueStoreBuilder keyValueStoreBuilder = mockStoreFactory.createKeyValueStoreBuilder(
|
Code Block |
package org.apache.kafka.streams.internals; public class MockStoreFactory<K, V> { final String storeName; final Serde<K> keySerde; final Serde<V> valueSerde; final Time time; final Boolean persistent; public MockStoreFactory (final String storeName, "store", Stores.inMemoryKeyValueStore("store"), final Serde<K> keySerde, Serdes.Bytes(), Serdes.Bytes(), final Serde<V> valueSerde, false); ... @Test(expected = TopologyException.class) public void shouldNotAllowToAddStateStoreToNonExistingProcessor() { topology.addStateStore(keyValueStoreBuilder, "no-such-processor"); } } |
Public Interfaces
We add some new classes to the kafka-streams-test-utils
under a new org.apache.kafka.streams.state package.
We will provide a MockStoreFactory to generate Mock store builders:
Code Block |
---|
package org.apache.kafka.streams.internals; public class finalMockStoreFactory<K, TimeV> time) { final String this.storeName; = storeName; final Serde<K> keySerde; this.keySerdefinal =Serde<V> keySerdevalueSerde; final Time time; this.valueSerdefinal =Boolean valueSerdepersistent; public MockStoreFactory this.time = time;() { } public MockKeyValueStoreBuilder createKeyValueStoreBuilder(){ return new MockKeyValueStoreBuilder<K,V>(storeName, keySerde, valueSerde, time); } } |
...