Status
Current state: Under Discussion
Discussion thread: TODO
JIRA:
Released: target version 1.1.0
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Kafka Streams topologies can be quite complex and it is important for users to have the ability to test their code. This is especially important if the Processor API is used. At the moment, we only have some helper classes in our internal unit test package. Even if this package could be used, there is no guarantee about API stability and users also pull in a large test artifact even if they are only interested in a few test helper classes. Thus, we should provide a public test artifact that supports people to test their code.
Public Interfaces
package org.apache.kafka.streams.test; public class TopologyTestDriver { public TopologyTestDriver(Topology topology, Properties config); public TopologyTestDriver(Topology topology, Properties config, Time time); public void close(); // methods for actual testing public void process(String topicName, byte[] key, byte[] value, long timestamp); // main process method public void process(String topicName, byte[] key, byte[] value); // uses constructor provided Time object public <K, V> void process(String topicName, K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, long timestamp); // use objects+serializers public <K, V> void process(String topicName, K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer); // for null-key messages public void process(String topicName, byte[] value, long timestamp); public void process(String topicName, byte[] value); public <V> void process(String topicName, V value, Serializer<V> valueSerializer, long timestamp) public <V> void process(String topicName, V value, Serializer<V> valueSerializer) // single input topic topologies can omit `topicName` (repeats all methods from above without topicName parameter public void process(byte[] key, byte[] value, long timestamp); public void process(byte[] key, byte[] value); public <K, V> void process(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, long timestamp); public <K, V> void process(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer); public void process(byte[] value, long timestamp); public void process(byte[] value); public <V> void process(V value, Serializer<V> valueSerializer, long timestamp) public <V> void process(V value, Serializer<V> valueSerializer) // see below; cf. TestRecrodFactory public void process(TestRecord record); public void process(List<TestRecord> records); public void punctuateWallClockTime(); public void punctuateStreamTime(); // methods for result verification public ProducerRecord<byte[], byte[]> readOutput(String topic); public <K, V> ProducerRecord<K, V> readOutput(String topic, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer); public StateStore getStateStore(String name); public <K, V> KeyValueStore<K, V> getKeyValueStore(String name); public <K, V> KeyValueStore<K, V> getWindowStore(String name); public <K, V> KeyValueStore<K, V> getSessionStore(String name); }
package org.apache.kafka.streams.test; public class TestRecord<K, V> { public TestRecord(String topicName, K key, V value, long timestamp, Serializer<K> keySerializer, Serializer<V> valueSerializer); public TestRecord(String topicName, byte[] key, byte[] value, long timestamp); public String topicName(); public byte[] key(); public byte[] value(); public long timestamp(); }
package org.apache.kafka.streams.test; public class TestRecordFactory<K, V> { // create records for a single input topic public TestRecordFactory(String topicName, Serializer<K> keySerializer, Serializer<V> valueSerializer, Time time); public TestRecord<K, V> create(K key, V value, long timestamp); public TestRecord<K, V> create(K key, V value); // null-key public TestRecord<K, V> create(V value, long timestamp); public TestRecord<K, V> create(V value); public TestRecord<K, V> create(KeyValue<K, V> keyValue, long timestamp) public TestRecord<K, V> create(KeyValue<K, V> keyValue); // create list of records base on list of k-v-pairs public List<TestRecord<K, V>> create(List<KeyValue<K, V>> keyValues); // start timestamp from constructor Time object; timestamp auto increment by 1 public List<TestRecord<K, V>> create(List<KeyValue<K, V>> keyValues, long startTimestamp); // timestamp auto increment by 1 public List<TestRecord<K, V>> create(List<KeyValue<K, V>> keyValues, long startTimestamp, long advanceMs); }
package org.apache.kafka.streams.test; public class MockTime implements Time { // Listener that is invoked each time <em>after</em> time was advanced or modified. public interface MockTimeListener { void tick(long currentTimeMs); } public MockTime(); // init with System.currentTimeMillis() and System.nanoTime(); no auto-tick public MockTime(long autoTickMs); public MockTime(long autoTickMs, long currentTimeMs, long currentHighResTimeNs); public void addListener(final MockTimeListener listener) { public void setAutoTickMs(long autoTickMs); public void setCurrentTimeMs(long newMs); // calling those methods triggers auto-tick @Override public long milliseconds(); @Override public long nanoseconds(); @Override public long hiResClockMs(); @Override public void sleep(final long ms); }
Proposed Changes
We are adding the above described test helper classes in a new artifact streams-test-utils
such that people can easily include it as a dependency to their build. The main test class is the test driver while the others a auxiliary classes to generate test data etc.
For time base operations it's important to allow fine grained controll over time. Therefore, we provide a mock time class that can be ingested into the driver and can be used to generte input data. To avoid testing boiler plate code, we provide a rich amount of overload methods and a TestRecordFactory
that simplifies to generate TestRecords
instead of using the verbose underlying methods. Thus, people have many degrees of freedom on how to use the test driver in their own code.
Compatibility, Deprecation, and Migration Plan
We are only adding new classes. There are no compatiblity issues.
Test Plan
We need to test all added classes with unit tests. Integration or system test are not required.
Rejected Alternatives
None.