Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleTopologyTestDriver
package org.apache.kafka.streams;
 
@Deprecated
public class TopologyTestDriver {
	...
 
    @Deprecate public void pipeInput(ConsumerRecord<byte[], byte[]> record); // can trigger event-time punctuation
    @Deprecate public void pipeInput(List<ConsumerRecord<byte[], byte[]>> records); // can trigger event-time punctuation
 
    // methods for result verification
  
    @Deprecate public ProducerRecord<byte[], byte[]> readOutput(String topic);
    @Deprecate public <K, V> ProducerRecord<K, V> readOutput(String topic, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer);
 
    ...
}

...

Code Block
languagejava
titleTestInputTopic
package org.apache.kafka.streams.test;

public class TestInputTopic<K, V> {
    //Create by TopologyTestDriver, Constructors are package private

    //Timestamp handling 
    //Record timestamp can be provided when piping input or use internally tracked time initialized configured with configureTiming:
    //startTimestampMs the initial timestamp for generated records, if not provided uses current system time as start timestamp.
    //autoAdvanceMs the time increment per generated record, if not provided auto-advance is disabled.

    void configureTiming(final long startTimestampMs, final long autoAdvanceMs);
    //Advances the internally tracked time.
    void advanceTimeMs(final long advanceMs);

	//Methods to pipe single record
    void pipeInput(final V value);
    void pipeInput(final KV keyvalue, final Vlong valuetimestampMs);

    void pipeInput(final VK valuekey, final longV timestampMsvalue);
    void pipeInput(final K key, final V value, final long timestampMs);

    void pipeInput(final K keyClientRecord<K, final V value, final Headers headersV> record);
    void pipeInput(final K keyClientRecord<K, finalV> V valuerecord, final Headers headers, final long timestampMs);

	//Methods to pipe list of records
    void pipeKeyValueListpipeValueList(final List<KeyValue<K, V>> keyValuesList<V> values);
    void pipeValueList(final List<V> values, final long startTimestamp, final long advanceMs);

    void pipeKeyValueList(final List<KeyValue<K, V>> keyValues);
    void pipeKeyValueList(final List<KeyValue<K, V>> keyValues, final long startTimestamp, final long advanceMs);

    void pipeRecordList(final List<ClientRecord<K, V>> records);
    void pipeValueListpipeRecordList(final List<ClientRecord<K, List<V>V>> valuesrecords, final long startTimestamp, final long advanceMs);
}

...

Code Block
languagejava
titleTestOutputTopic
package org.apache.kafka.streams.test;

public class TestOutputTopic<K, V> {
    //Create by TopologyTestDriver, Constructors are package private

	//Method to check queue size
    final long getQueueSize();
    final boolean isEmpty();

	//Methods to readOutput
    ClientRecord<K, V> readRecordV readValue();
    KeyValue<K, V> readKeyValue();
    V readValueClientRecord<K, V> readRecord();

    //Output as collection
    List<V> readValuesToList();
    Map<K, V> readKeyValuesToMap();
    List<KeyValue<K, V>> readKeyValuesToList();
    List<ClientRecord<K, V>> readRecordToList();
}


Code Block
languagejava
titleOutputVerifier
package org.apache.kafka.streams.test;
 
//OutputVerifier modified to accept ClientRecord instead of ProducerRecord
public class OutputVerifier {
 
    public static <K,V> void compareValue(ClientRecord<K, V> record, V expectedValue) throws AssertionError;
    public static <K,V> void compareValue(ClientRecord<K, V> record, ClientRecord<K, V> expectedRecord) throws AssertionError;
 
    public static <K,V> void compareKeyValue(ClientRecord<K, V> record, K expectedKey, V expectedValue) throws AssertionError;
    public static <K,V> void compareKeyValue(ClientRecord<K, V> record, ClientRecord<K, V> expectedRecord) throws AssertionError;
 
    public static <K,V> void compareValueTimestamp(ClientRecord<K, V> record, V expectedValue, long expectedTimestamp) throws AssertionError;
    public static <K,V> void compareValueTimestamp(ClientRecord<K, V> record, ClientRecord<K, V> expectedRecord) throws AssertionError;
 
   List<V> readValuesToList();
} public static <K,V> void compareKeyValueTimestamp(ClientRecord<K, V> record, K expectedKey, V expectedValue, long expectedTimestamp) throws AssertionError;
    public static <K,V> void compareKeyValueTimestamp(ClientRecord<K, V> record, ClientRecord<K, V> expectedRecord) throws AssertionError;
 
}


Code Block
languagejava
titleConsumerRecordFactory
package org.apache.kafka.streams.test;

@Deprecated
public class ConsumerRecordFactory<K, V> {


Proposed Changes

This improvement adds TestInputTopic class which wraps TopologyTestDriver  replaces TopologyTestDriver  and ConsumerRecordFactory methods as one class to be used to write to Input Topics and TestOutputTopic class which collects TopologyTestDriver  reading methods and provide typesafe read methods.

Code Block
languagejava
titleExample
public class SimpleTopicTest {

  private TopologyTestDriver testDriver;
  private TestInputTopic<String, String> inputTopic;
  private TestOutputTopic<String, String> outputTopic;

  @Before
  public void setup() {
    testDriver = new TopologyTestDriver(TestStream.getTopology(), TestStream.getConfig());
    	inputTopic = new TestInputTopic<>(testDriver, testDriver.createInputTopic(TestStream.INPUT_TOPIC, new Serdes.StringSerde(), new Serdes.StringSerde());
    outputTopic = new TestOutputTopic<>(testDriver, testDriver.createOutputTopic(TestStream.OUTPUT_TOPIC, new Serdes.StringSerde(), new Serdes.StringSerdeLongSerde());
  }

  @After
  public void tearDown() {
      testDriver.close();
  }

  @Test
  public void testOneWord() {
    //Feed word "Hello" to inputTopic and no kafka key, timestamp is irrelevant in this case
    inputTopic.pipeInput("Hello");
    assertThat(outputTopic.readValue()).isEqualTo("Hello");
    //No more output in topic
    assertThat(outputTopic.readRecordisEmpty()).isNullisTrue();
  }
}


  • New Example utilizing new classes test added to streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountDemoTest.java

...

Compatibility, Deprecation, and Migration Plan

There are no compatibility issues.

This is only adding new classes. The tests utilizing directly old TopologyTestDriver can still be used. There are no compatibility issues.use deprecated methods.

Migration plan ???

Rejected Alternatives