...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.streams;
@Deprecated
public class TopologyTestDriver {
...
@Deprecate public void pipeInput(ConsumerRecord<byte[], byte[]> record); // can trigger event-time punctuation
@Deprecate public void pipeInput(List<ConsumerRecord<byte[], byte[]>> records); // can trigger event-time punctuation
// methods for result verification
@Deprecate public ProducerRecord<byte[], byte[]> readOutput(String topic);
@Deprecate public <K, V> ProducerRecord<K, V> readOutput(String topic, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer);
...
} |
...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.streams.test; public class TestInputTopic<K, V> { //Create by TopologyTestDriver, Constructors are package private //Timestamp handling //Record timestamp can be provided when piping input or use internally tracked time initialized configured with configureTiming: //startTimestampMs the initial timestamp for generated records, if not provided uses current system time as start timestamp. //autoAdvanceMs the time increment per generated record, if not provided auto-advance is disabled. void configureTiming(final long startTimestampMs, final long autoAdvanceMs); //Advances the internally tracked time. void advanceTimeMs(final long advanceMs); //Methods to pipe single record void pipeInput(final V value); void pipeInput(final KV keyvalue, final Vlong valuetimestampMs); void pipeInput(final VK valuekey, final longV timestampMsvalue); void pipeInput(final K key, final V value, final long timestampMs); void pipeInput(final K keyClientRecord<K, final V value, final Headers headersV> record); void pipeInput(final K keyClientRecord<K, finalV> V valuerecord, final Headers headers, final long timestampMs); //Methods to pipe list of records void pipeKeyValueListpipeValueList(final List<KeyValue<K, V>> keyValuesList<V> values); void pipeValueList(final List<V> values, final long startTimestamp, final long advanceMs); void pipeKeyValueList(final List<KeyValue<K, V>> keyValues); void pipeKeyValueList(final List<KeyValue<K, V>> keyValues, final long startTimestamp, final long advanceMs); void pipeRecordList(final List<ClientRecord<K, V>> records); void pipeValueListpipeRecordList(final List<ClientRecord<K, List<V>V>> valuesrecords, final long startTimestamp, final long advanceMs); } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.streams.test; public class TestOutputTopic<K, V> { //Create by TopologyTestDriver, Constructors are package private //Method to check queue size final long getQueueSize(); final boolean isEmpty(); //Methods to readOutput ClientRecord<K, V> readRecordV readValue(); KeyValue<K, V> readKeyValue(); V readValueClientRecord<K, V> readRecord(); //Output as collection List<V> readValuesToList(); Map<K, V> readKeyValuesToMap(); List<KeyValue<K, V>> readKeyValuesToList(); List<ClientRecord<K, V>> readRecordToList(); } |
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.streams.test; //OutputVerifier modified to accept ClientRecord instead of ProducerRecord public class OutputVerifier { public static <K,V> void compareValue(ClientRecord<K, V> record, V expectedValue) throws AssertionError; public static <K,V> void compareValue(ClientRecord<K, V> record, ClientRecord<K, V> expectedRecord) throws AssertionError; public static <K,V> void compareKeyValue(ClientRecord<K, V> record, K expectedKey, V expectedValue) throws AssertionError; public static <K,V> void compareKeyValue(ClientRecord<K, V> record, ClientRecord<K, V> expectedRecord) throws AssertionError; public static <K,V> void compareValueTimestamp(ClientRecord<K, V> record, V expectedValue, long expectedTimestamp) throws AssertionError; public static <K,V> void compareValueTimestamp(ClientRecord<K, V> record, ClientRecord<K, V> expectedRecord) throws AssertionError; List<V> readValuesToList(); } public static <K,V> void compareKeyValueTimestamp(ClientRecord<K, V> record, K expectedKey, V expectedValue, long expectedTimestamp) throws AssertionError; public static <K,V> void compareKeyValueTimestamp(ClientRecord<K, V> record, ClientRecord<K, V> expectedRecord) throws AssertionError; } |
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.streams.test;
@Deprecated
public class ConsumerRecordFactory<K, V> { |
Proposed Changes
This improvement adds TestInputTopic class which wraps TopologyTestDriver replaces TopologyTestDriver and ConsumerRecordFactory methods as one class to be used to write to Input Topics and TestOutputTopic class which collects TopologyTestDriver reading methods and provide typesafe read methods.
Code Block | ||||
---|---|---|---|---|
| ||||
public class SimpleTopicTest { private TopologyTestDriver testDriver; private TestInputTopic<String, String> inputTopic; private TestOutputTopic<String, String> outputTopic; @Before public void setup() { testDriver = new TopologyTestDriver(TestStream.getTopology(), TestStream.getConfig()); inputTopic = new TestInputTopic<>(testDriver, testDriver.createInputTopic(TestStream.INPUT_TOPIC, new Serdes.StringSerde(), new Serdes.StringSerde()); outputTopic = new TestOutputTopic<>(testDriver, testDriver.createOutputTopic(TestStream.OUTPUT_TOPIC, new Serdes.StringSerde(), new Serdes.StringSerdeLongSerde()); } @After public void tearDown() { testDriver.close(); } @Test public void testOneWord() { //Feed word "Hello" to inputTopic and no kafka key, timestamp is irrelevant in this case inputTopic.pipeInput("Hello"); assertThat(outputTopic.readValue()).isEqualTo("Hello"); //No more output in topic assertThat(outputTopic.readRecordisEmpty()).isNullisTrue(); } } |
- New Example utilizing new classes test added to streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountDemoTest.java
- Examples in Testing Kafka Streams https://kafka.apache.org/22/documentation/streams/developer-guide/testing.html updated to use new TopolocyTestDriver, TestInputTopic and TestOutputTopic
...
Compatibility, Deprecation, and Migration Plan
There are no compatibility issues.
This is only adding new classes. The tests utilizing directly old TopologyTestDriver can still be used. There are no compatibility issues.use deprecated methods.
Migration plan ???
Rejected Alternatives
- It was considered to add methods to to return Iterable like in KIP-451: Make TopologyTestDriver output iterable , but it seems to be redundant with these List based methods This is will replace KIP-456: Helper classes to make it simpler to write test logic with TopologyTestDriver if accepted