...
Code Block | ||
---|---|---|
| ||
package org.apache.kafka.streams.test; public class TopologyTestDriver { public TopologyTestDriver(Topology topology, Properties config); public TopologyTestDriver(Topology topology, Properties config, Time time); public void close(long initialWallClockTimeMs); // methodsevent-time foris actualautomatically testing advanced based on the publicprovided void process(String topicNameinput records, byte[] key, byte[] value, long timestamp); // main process method public void process(String topicName, byte[] key, byte[] value); and thus event-time punctuation are triggered automatically // useswall-clock constructortime providedis Timemocked object and not public <K, V> void process(String topicName, K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, long timestamp); // use objects+serializers public <K, V> void process(String topicName, K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer); // for null-key messagesadvanced automatically; user can advance wall-clock time manually, and thus, trigger wall-clock time punctuations manually public void process(String topicName, byte[] value, long timestamp); public void process(String topicNameConsumerRecord<byte[], byte[]> value); public <V> void process(String topicName, V value, Serializer<V> valueSerializer, long timestamp) public <V> void process(String topicName, V value, Serializer<V> valueSerializer) // single input topic topologies can omit `topicName` (repeats all methods from above without topicName parameterrecord); // can trigger event-time punctuation public void process(byteList<ConsumerRecord<byte[] key, byte[] value, long timestamp>> records); // can trigger public void process(byte[] key, byte[] value);event-time punctuation public <K, V> void process(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, long timestamp); public <K, V> void process(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer); public void process(byte[] value, long timestamp); public void process(byte[] value); public <V> void process(V value, Serializer<V> valueSerializer, long timestamp) public <V> void process(V value, Serializer<V> valueSerializer) // see below; cf. TestRecrodFactory public void process(TestRecord record); public void process(List<TestRecord> records); public void punctuateWallClockTime(); public void punctuateStreamTime(); advanceWallClockTime(long advanceMs); // can trigger wall-clock-time punctuation // methods for result verification public ProducerRecord<byte[], byte[]> readOutput(String topic); public <K, V> ProducerRecord<K, V> readOutput(String topic, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer); public StateStore getStateStore(String name); public <K, V> KeyValueStore<K, V> getKeyValueStore(String name); public <K, V> KeyValueStore<KWindowStore<K, V> getWindowStore(String name); public <K, V> KeyValueStore<KSessionStore<K, V> getSessionStore(String name); public void close(); } |
Code Block | ||
---|---|---|
| ||
package org.apache.kafka.streams.test; public class TestRecord<KConsumerRecordFactory<K, V> { public TestRecordConsumerRecordFactory(String topicName, K key, V value, long timestampdefaultTopicName, Serializer<K> keySerializer, Serializer<V> valueSerializer); public TestRecordConsumerRecordFactory(String topicNamedefaultTopicName, byte[]Serializer<K> keykeySerializer, byte[]Serializer<V> valuevalueSerializer, long timestampstartTimestampMs); public ConsumerRecordFactory(String topicName(); public byte[] key(); public byte[] value(); public long timestamp(); } | ||
Code Block | ||
| ||
package org.apache.kafka.streams.test; public class TestRecordFactory<K, V> { // create records for a single input topic defaultTopicName, Serializer<K> keySerializer, Serializer<V> valueSerializer, long startTimestampMs, long autoAdvanceMs); // no default topic name; requires to specify topic name in #create(...) public ConsumerRecordFactory(Serializer<K> keySerializer, Serializer<V> valueSerializer); public TestRecordFactoryConsumerRecordFactory(String topicName, Serializer<K> keySerializer, Serializer<V> valueSerializer, Timelong timestartTimestampMs); public ConsumerRecordFactory(Serializer<K> TestRecord<KkeySerializer, V> create(K keySerializer<V> valueSerializer, Vlong valuestartTimestampMs, long timestampautoAdvanceMs); public TestRecord<K, V> create(K key, V value);void advanceTimeMs(long advanceMs); // null-key create single records with default topic name public TestRecord<KConsumerRecord<byte[], V>byte[]> create(K key, V value, long timestamptimestampMs); public TestRecord<KConsumerRecord<byte[], V>byte[]> create(K key, V value); public TestRecord<KConsumerRecord<byte[], V>byte[]> create(KeyValue<K, V> keyValueV value, long timestamptimestampMs); public TestRecord<KConsumerRecord<byte[], V>byte[]> create(KeyValue<K, V> keyValueV value); // create list of records basewith ondefault list of k-v-pairstopic name public List<TestRecord<KList<ConsumerRecord<byte[], V>>byte[]>> create(List<KeyValue<K, V>> keyValues); // start timestamp from constructor Time object; timestamp auto increment by 1 public List<TestRecord<KList<ConsumerRecord<byte[], V>>byte[]>> create(List<KeyValue<K, V>> keyValues, long startTimestamp, long advanceMs); // timestamp auto increment by 1 public List<TestRecord<KList<ConsumerRecord<byte[], V>>byte[]>> create(List<KeyValue<K, V>> keyValues, long startTimestamp, long advanceMs); } | ||
Code Block | ||
| ||
package org.apache.kafka.streams.test; public class MockTime implements Time { // Listeneroverwrite thatdefault is invoked each time <em>after</em> time was advanced or modified.topic name public interface MockTimeListener { void tick(long currentTimeMsConsumerRecord<byte[], byte[]> create(String topicName, K key, V value, long timestampMs); } public MockTime(); // init with System.currentTimeMillis() and System.nanoTime(); no auto-tickpublic ConsumerRecord<byte[], byte[]> create(String topicName, K key, V value); public MockTime(long autoTickMs); public MockTime(long autoTickMs, long currentTimeMsConsumerRecord<byte[], byte[]> create(String topicName, V value, long currentHighResTimeNstimestampMs); public void addListener(final MockTimeListener listener) { ConsumerRecord<byte[], byte[]> create(String topicName, V value); public void setAutoTickMs(long autoTickMs); public void setCurrentTimeMs(long newMs); List<ConsumerRecord<byte[], byte[]>> create(String topicName, List<KeyValue<K, V>> keyValues); // calling those methods triggers auto-tick @Override public long milliseconds(public List<ConsumerRecord<byte[], byte[]>> create(String topicName, List<KeyValue<K, V>> keyValues, long startTimestamp, long advanceMs); @Override public long nanoseconds(); @Override public long hiResClockMs(); @Override public void sleep(final long ms); } public List<ConsumerRecord<byte[], byte[]>> create(String topicName, List<KeyValue<K, V>> keyValues, long startTimestamp); } |
Proposed Changes
We are adding the above described test helper classes in a new artifact streams-test-utils
such that people can easily include it as a dependency to their build. The main test class is the test driver while the others a auxiliary classes to generate test data etc.
For time base operations it's important to allow fine grained controll over time. Therefore, we provide a mock time class that can be ingested into the driver and can be used to generte input data. To avoid testing boiler plate code, we provide a rich amount of overload methods and a TestRecordFactory
ConsumerRecordFactory
that simplifies to generate TestRecords
instead generate ConsumeRecords<byte[],byte[]>
instead of using the verbose underlying methods. Thus, people have many degrees of freedom on how to use the test driver in their own code. ConsumerRecord
constructor.
In the initla release, we might want to mark all new classes with annotation @Unstable
or @Evolving
.
Compatibility, Deprecation, and Migration Plan
...