Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state:  Draft'Accepted'

Discussion thread: here

JIRA:

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-8233

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

THIS STILL DRAFT:

This KIP is inspired by the Discussion in KIP-456: Helper classes to make it simpler to write test logic with TopologyTestDriver

The stream application code is very compact and the test code is a lot of bigger code base than actual implementation of the application, that's why it would be good to get test code easily readable and understandable and that way also maintainable.

The proposal in KIP-456 was to add alternate way to input and output topic, but this KIP enhance those classes and deprecate old functionality to make clear interface for test writer to use.

When using the old TopologyTestDriver you need to call ConsumerRecordFactory to create ConsumerRecord passed into pipeInput method to write to topic. Also when calling readOutput to consume from topic, you need to provide correct Deserializers each time.

You easily end up writing helper methods in your test classes, but this can be avoided when adding generic input and output topic classes wrapping existing functionalityto implement the needed functionality.

Also the logic of the old TopologyTestDriver is confusing, when you need to pipe ConsumerRecords to produce record to input topic and receive ProducerRecords when consuming from output topic.

Non-existing topic and no record in the queue scenarious are modified to throw Exception instead of returning null.

Public Interfaces


Code Block
languagejava
titleTestInputTopicTopologyTestDriver
package org.apache.kafka.streams.test;

public class TestInputTopic<K, V>TopologyTestDriver {
    public TopologyTestDriver(Topology topology, Properties config); //Timestamp handlingexisting constructor
    //Record timestamp can be provided when piping input or use internally tracked time initialized with constructors:
    //startTimestampMs the initial timestamp for generated records, if not provided uses current system time as start timestamp.
    //autoAdvanceMs the time increment per generated record, if not provided auto-advance is disabled.

    //Constructors with serializers
    TestInputTopic(final TopologyTestDriver driver, @Deprecate public TopologyTestDriver(Topology topology, Properties config, long initialWallClockTimeMs);
    public TopologyTestDriver(Topology topology, Properties config, Instant initialWallClockTime);

    @Deprecate public void advanceWallClockTime(long advanceMs); // can trigger wall-clock-time punctuation
	public void advanceWallClockTime(Duration advance); // can trigger wall-clock-time punctuation

    //Deprecate old pipe and read methods
    @Deprecate public void pipeInput(ConsumerRecord<byte[], byte[]> record); // can trigger event-time punctuation
    @Deprecate public void pipeInput(List<ConsumerRecord<byte[], byte[]>> records); // can trigger event-time punctuation
    @Deprecate public ProducerRecord<byte[], byte[]> readOutput(String topic);
    @Deprecate public <K, V> ProducerRecord<K, V> readOutput(String topic, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer);

	// methods for TestTopic object creation
	public final <K, V> TestOutputTopic<K, V> createOutputTopic(final String topicName, final Serializer<K> keySerializer, final Serializer<V> valueSerializer);
	// Uses current system  TestInputTopic(final TopologyTestDriver driver, final String topicName, final Serializer<K> keySerializertime as start timestamp. Auto-advance is disabled.
	public final <K, V> TestInputTopic<K, V> createInputTopic(final String topicName, final Serializer<V>Deserializer<K> valueSerializerkeyDeserializer, final longDeserializer<V> startTimestampMsvalueDeserializer);
    //Uses provided  TestInputTopic(final TopologyTestDriver driver, startTimestamp and autoAdvance duration for timestamp generation
	public final <K, V> TestInputTopic<K, V> createInputTopic(final String topicName, final Serializer<K>Deserializer<K> keySerializerkeyDeserializer, final Serializer<V>Deserializer<V> valueSerializervalueDeserializer, final longInstant startTimestampMsstartTimestamp, final longDuration autoAdvance);

    ...
}



Code Block
languagejava
titleTestInputTopic
package org.apache.kafka.streams;

public class TestInputTopic<K, V> {autoAdvanceMs);
    //Create by TopologyTestDriver, Constructors are withpackage serdes
 private

    //Timestamp handling 
   TestInputTopic(final TopologyTestDriver driver, final String topicName, final Serde<K> keySerde, final Serde<V> valueSerde);
    TestInputTopic(final TopologyTestDriver driver, final String topicName, final Serde<K> keySerde, final Serde<V> valueSerde, final long startTimestampMs);
    TestInputTopic(final TopologyTestDriver driver, final String topicName, final Serde<K> keySerde, final Serde<V> valueSerde, final long startTimestampMs, final long autoAdvanceMs); //Record timestamp can be provided when piping input or use internally tracked time configured with parameters:
    //startTimestamp the initial timestamp for generated records, if not provided uses current system time as start timestamp.
    //autoAdvance the time increment per generated record, if not provided auto-advance is disabled.

    //Advances the internally tracked time.
    void advanceTimeMsadvanceTime(final longDuration advanceMsadvance);

	//Methods to pipe single record
    void pipeInput(final V value);
    void pipeInput(final K key, final V value);

    // Use provided timestamp, does not auto advance internally tracked time.
    void pipeInput(final V value, final longInstant timestampMstimestamp);
    void pipeInput(final K key, final V value, final longInstant timestampMstimestamp);

    // Method with long provided to support easier migration of old tests
   void pipeInput(final K key, final V value, final Headerslong headerstimestampMs);

    // voidIf pipeInput(finalrecord Ktimestamp key, final V value, final Headers headers, final long timestampMsset, does not auto advance internally tracked time.
    void pipeInput(final TestRecord<K, V> record);

	//Methods to pipe list of records
    void pipeValueList(final List<V> values);
    void pipeKeyValueList(final List<KeyValue<K, V>> keyValues);

    // Use provided timestamp, does not auto advance internally tracked time.
    void pipeValueList(final List<V> values, final Instant startTimestamp, final Duration advanceMs);
    void pipeKeyValueList(final List<KeyValue<K, V>> keyValues, final longInstant startTimestamp, final longDuration advanceMs);

    // If record timestamp set, does not auto advance internally tracked time.
    void pipeValueList(final List<V> values, final long startTimestamp, final long advanceMspipeRecordList(final List<? extends TestRecord<K, V>> records);
}


Code Block
languagejava
titleTestOutputTopic
package org.apache.kafka.streams;

public class TestOutputTopic<K, V> {
    //Create by TopologyTestDriver, Constructors are package private

	//Method to check queue size
    final long getQueueSize();
    final boolean isEmpty();

	//Methods to readOutput, throw NoSuchElement if no record in queue
    V readValue();
    KeyValue<K, V> readKeyValue();
    TestRecord<K, V> readRecord();

    //Output as collection
    List<V> readValuesToList();
    List<KeyValue<K, V>> readKeyValuesToList();
    Map<K, V> readKeyValuesToMap();
    List<TestRecord<K, V>> readRecordsToList();
}


Code Block
languagejava
titleTestOutputTopicTestRecord
package org.apache.kafka.streams.test;

public class TestOutputTopic<KTestRecord<K, V> {
	//Constructor with serializers
Constructors
    public TestRecord(final V value);
    public TestRecord(final K key, final V value);
    public TestOutputTopic(TestRecord(final K key, final TopologyTestDriverV drivervalue, final String topic Headers headers);
    public TestRecord(final K key, final Serde<K>V keySerdevalue, final Serde<V>Instant valueSerderecordTime);
    public TestRecord(final K key, final V value, final Headers headers, //Constructorfinal withInstant serdesrecordTime);
    TestOutputTopicpublic TestRecord(final TopologyTestDriverK driverkey, final StringV topicvalue, final Deserializer<K>Headers keyDeserializerheaders, final Deserializer<V>Long valueDeserializertimestamp);
	
	//Methods to readOutputConstructor based on existing record
    public TestRecord(final ConsumerRecord<K, V> record);
    public TestRecord(final ProducerRecord<K, V> readRecord record);
	
	// Methods like in ProducerRecord / ConsumerRecord
    public Headers headers();
    KeyValue<K,public V>K readKeyValuekey();
    public V readValuevalue();

    public Long //Output as collectiontimestamp();

	// Getters
    public Headers getHeaders();
    public K getKey();
    public V getValue();
    public Instant getRecordTime();
	
	//Overrides
    Map<K,public V>String readKeyValuesToMaptoString();
    List<KeyValue<K,public V>>boolean readKeyValuesToListequals(Object o);
    public List<V>int readValuesToListhashCode();
}


Code Block
languagejava
titleOutputVerifier
package org.apache.kafka.streams.test;

//Recommended to use normal assertion library methods 
@Deprecated
public class OutputVerifier {
...


Code Block
languagejava
titleConsumerRecordFactory
package org.apache.kafka.streams.test;

//Similar functionality now in TestInputTopic
@Deprecated
public class ConsumerRecordFactory<K, V> {
...


Proposed Changes

This improvement adds TestInputTopic class which wraps TopologyTestDriver  replaces TopologyTestDriver  and ConsumerRecordFactory methods as one class to be used to write to Input Topics and TestOutputTopic class which collects TopologyTestDriver  reading methods and provide typesafe read methods.

Code Block
languagejava
titleExample
public class SimpleTopicTest {

  private TopologyTestDriver testDriver;
  private TestInputTopic<String, String> inputTopic;
  private TestOutputTopic<String, String> outputTopic;

  @Before
  public void setup() {
    testDriver = new TopologyTestDriver(TestStream.getTopology(), TestStream.getConfig());
    	inputTopic = new TestInputTopic<>(testDriver, testDriver.createInputTopic(TestStream.INPUT_TOPIC, new Serdes.StringSerdeStringDeserializer(), new Serdes.StringSerdeStringDeserializer());
    outputTopic = new TestOutputTopic<>(testDriver, testDriver.createOutputTopic(TestStream.OUTPUT_TOPIC, new Serdes.StringSerdeStringSerializer(), new Serdes.StringSerdeLongSerializer());
  }

  @After
  public void tearDown() {
      testDriver.close();
  }

  @Test
  public void testOneWord() {
    //Feed word "Hello" to inputTopic and no kafka key, timestamp is irrelevant in this case
    inputTopic.pipeInput("Hello");
    assertThat(outputTopic.readValue()).isEqualTo("Hello");
    //No more output in topic
    assertThat(outputTopic.readRecordisEmpty()).isNullisTrue();
  }
}


  • New Example utilizing new classes test added to streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountDemoTest.java

...

Compatibility, Deprecation, and Migration Plan

There are no compatibility issues.

This is only adding new classes. The tests utilizing directly old TopologyTestDriver can still be used. There are no compatibility issues.use deprecated methods.

Rejected Alternatives

  • It was considered to add methods to to return Iterable like in This is replacing KIP-451: Make TopologyTestDriver output iterable , but it seems to be redundant with these List based methods 456: Helper classes to make it simpler to write test logic with TopologyTestDriver
  • Deprecate current TestTopologyDriver and move new to test package. This would have enabled to keep also TestInputTopic and TestOutputTopic classes in test package, not in very crowded streams root package.
  • Add ClientRecord interface to client package and modifiy ProducerRecord (and / or ConsumerRecord) to implement it, to be to utilize OutputVerifier with ProducerRecord and TestRecord