Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion Discarded in favor of KIP-470: TopologyTestDriver test input and output usability improvements

Discussion thread: here 

JIRA:

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-8233

...

Code Block
languagejava
titleTestInputTopic
package org.apache.kafka.streams.test;

public class TestInputTopic<K, V> {
    //Timestamp handling 
    //Record timestamp can be provided when piping input or use internally tracked time initialized with constructors:
    //startTimestampMs the initial timestamp for generated records, if not provided uses current system time as start timestamp.
    //autoAdvanceMs the time increment per generated record, if not provided auto-advance is disabled.

    //Constructors with serializers
    TestInputTopic(final TopologyTestDriver driver, final String topicName, final Serializer<K> keySerializer, final Serializer<V> valueSerializer);
    TestInputTopic(final TopologyTestDriver driver, final String topicName, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final long startTimestampMs);
    TestInputTopic(final TopologyTestDriver driver, final String topicName, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final long startTimestampMs, final long autoAdvanceMs);
    //Constructors with serdes
    TestInputTopic(final TopologyTestDriver driver, final String topicName, final Serde<K> keySerde, final Serde<V> valueSerde);
    TestInputTopic(final TopologyTestDriver driver, final String topicName, final Serde<K> keySerde, final Serde<V> valueSerde, final long startTimestampMs);
    TestInputTopic(final TopologyTestDriver driver, final String topicName, final Serde<K> keySerde, final Serde<V> valueSerde, final long startTimestampMs, final long autoAdvanceMs);

    //Advances the internally tracked time.
    void advanceTimeMs(final long advanceMs);

	//Methods to pipe single record
    void pipeInput(final V value);
    void pipeInput(final K key, final V value);
    void pipeInput(final V value, final long timestampMs);
    void pipeInput(final K key, final V value, final long timestampMs);
    void pipeInput(final K key, final V value, final Headers headers);
    void pipeInput(final K key, final V value, final Headers headers, final long timestampMs);

	//Methods to pipe list of records
    void pipeKeyValueList(final List<KeyValue<K, V>> keyValues);
    void pipeValueList(final List<V> values);
    void pipeKeyValueList(final List<KeyValue<K, V>> keyValues, final long startTimestamp, final long advanceMs);
    void pipeValueList(final List<V> values, final long startTimestamp, final long advanceMs);
}



Code Block
languagejava
titleTestOutputTopic
package org.apache.kafka.streams.test;

public class TestOutputTopic<K, V> {
	//Constructor with serializers
    TestOutputTopic(final TopologyTestDriver driver, final String topic, final Serde<K> keySerde, final Serde<V> valueSerde);
    //Constructor with serdes
    TestOutputTopic(final TopologyTestDriver driver, final String topic, final Deserializer<K> keyDeserializer, final Deserializer<V> valueDeserializer);

	//Methods to readOutput
    ProducerRecord<K, V> readRecord();
    KeyValue<K, V> readKeyValue();
    V readValue();

    //Output as collection
    Map<K, V> readKeyValuesToMap();
    List<KeyValue<K, V>> readKeyValuesToList();
    List<V> readValuesToList();
}

...

Code Block
languagejava
titleExample
public class SimpleTopicTest {

  private TopologyTestDriver testDriver;
  private TestInputTopic<String, String> inputTopic;
  private TestOutputTopic<String, String> outputTopic;

  @Before
  public void setup() {
    testDriver = new TopologyTestDriver(TestStream.getTopology(), TestStream.getConfig());
    inputTopic = new TestInputTopic<String, String>TestInputTopic<>(testDriver, TestStream.INPUT_TOPIC, new Serdes.StringSerde(), new Serdes.StringSerde());
    outputTopic = new TestOutputTopic<String, String>TestOutputTopic<>(testDriver, TestStream.OUTPUT_TOPIC, new Serdes.StringSerde(), new Serdes.StringSerde());
  }

  @After
  public void tearDown() {
      testDriver.close();
  }

  @Test
  public void testOneWord() {
    //Feed word "Hello" to inputTopic and no kafka key, timestamp is irrelevant in this case
    inputTopic.pipeInput("Hello");
    assertThat(outputTopic.readValue()).isEqualTo("Hello");
    //No more output in topic
    assertThat(outputTopic.readRecord()).isNull();
  }
}

...