Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state:  Draft'Accepted'

Discussion thread: here

JIRA:

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-8233

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

THIS IS STILL WORK IN PROGRESS.

This KIP is inspired by the Discussion in KIP-456: Helper classes to make it simpler to write test logic with TopologyTestDriver

The stream application code is very compact and the test code is a lot of bigger code base than actual implementation of the application, that's why it would be good to get test code easily readable and understandable and that way also maintainable.

The proposal in KIP-456 was to add alternate way to input and output topic, but this KIP enhance those classes and deprecate old functionality to make clear interface for test writer to use.

...

Code Block
languagejava
titleKafkaClientTopologyTestDriver
package org.apache.kafka.clientsstreams;

//Newpublic Interfaceclass toTopologyTestDriver make{
 possible to use ProducereRecord inpublic TopologyTestDriver
//ConsumerRecord not modified to implement this because timestamp() return long, not Long
public interface ClientRecord<K, V> {
    String topic((Topology topology, Properties config); // existing constructor
    @Deprecate public TopologyTestDriver(Topology topology, Properties config, long initialWallClockTimeMs);
    Headerspublic headersTopologyTestDriver();
Topology topology, Properties config, KInstant key(initialWallClockTime);

    V value(@Deprecate public void advanceWallClockTime(long advanceMs);
 // can  Long timestamp();
}
Code Block
titleProducerRecord
package org.apache.kafka.clients.producer;
public class ProducerRecord<K, V> implements ClientRecord<K, V> {
...
Code Block
languagejava
titleTopologyTestDriver
package org.apache.kafka.streams;
 
@Deprecated
public class TopologyTestDriver {
	...
 trigger wall-clock-time punctuation
	public void advanceWallClockTime(Duration advance); // can trigger wall-clock-time punctuation

    //Deprecate old pipe and read methods
    @Deprecate public void pipeInput(ConsumerRecord<byte[], byte[]> record); // can trigger event-time punctuation
    @Deprecate public void pipeInput(List<ConsumerRecord<byte[], byte[]>> records); // can trigger event-time punctuation
 
    // methods for result verification
  
    @Deprecate public ProducerRecord<byte[], byte[]> readOutput(String topic);
    @Deprecate public <K, V> ProducerRecord<K, V> readOutput(String topic, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer);
 
	// methods for  ...
}
Code Block
languagejava
titleTopologyTestDriver (new)
package org.apache.kafka.streams.test;

public class TopologyTestDriver {TestTopic object creation
	public TopologyTestDriver(Topologyfinal topology<K, Properties config); // initialized WallClockMockTimeMs with System.currentTimeMillis()

	public TopologyTestDriver(Topology topology, Properties config, long initialWallClockTimeMs);
V> TestOutputTopic<K, V> createOutputTopic(final String topicName, final Serializer<K> keySerializer, final Serializer<V> valueSerializer);
	// event-timeUses is automatically advanced based on the provided input records, and thus event-time punctuation are triggered automatically
	// wall-clock time is mocked and not advanced automatically; user can advance wall-clock time manually, and thus, trigger wall-clock time punctuations manually
	public void advanceWallClockTime(long advanceMs); // can trigger wall-clock-time punctuation

	// methods for TestTopic object creation
	//TODO investigate, is there way to find out serde based on topologycurrent system time as start timestamp. Auto-advance is disabled.
	public final <K, V> TestInputTopic<K, V> createInputTopic(final String topicName, final Serde<K>Deserializer<K> keySerdekeyDeserializer, final Serde<V>Deserializer<V> valueSerdevalueDeserializer);
	public final <K, V> TestOutputTopic<K, V> createOutputTopic(final String topicName, final Serde<K> keySerde, final Serde<V> valueSerde);

	public Map<String, StateStore> getAllStateStores()
	public StateStore getStateStore(String name);
	public //Uses provided startTimestamp and autoAdvance duration for timestamp generation
	public final <K, V> KeyValueStore<KTestInputTopic<K, V> getKeyValueStorecreateInputTopic(final String name);
	public <K, V> WindowStore<K, V> getWindowStore(String name);
	public <K, V> SessionStore<K, V> getSessionStore(String name);

	public void close();topicName, final Deserializer<K> keyDeserializer, final Deserializer<V> valueDeserializer, final Instant startTimestamp, final Duration autoAdvance);

    ...
}



Code Block
languagejava
titleTestInputTopic
package org.apache.kafka.streams.test;

public class TestInputTopic<K, V> {
    //Create by TopologyTestDriver, Constructors are package private

    //Timestamp handling 
    //Record timestamp can be provided when piping input or use internally tracked time initialized configured with configureTimingparameters:
    //startTimestampMsstartTimestamp the initial timestamp for generated records, if not provided uses current system time as start timestamp.
    //autoAdvanceMsautoAdvance the time increment per generated record, if not provided auto-advance is disabled.

    void configureTiming(final long startTimestampMs, final long autoAdvanceMs);
    //Advances the internally tracked time.
    void advanceTimeMsadvanceTime(final longDuration advanceMsadvance);

	//Methods to pipe single record
    void pipeInput(final V value);
    void pipeInput(final K key, final V value);

    // Use provided timestamp, does not auto advance internally tracked time.
    void pipeInput(final V value, final longInstant timestampMstimestamp);

    void pipeInput(final K key, final V value, final Instant timestamp);

    // Method with long provided to support easier migration of old tests
   void pipeInput(final K key, final V value, final long timestampMs);

    void pipeInput(final ClientRecord<K, V> record);// If record timestamp set, does not auto advance internally tracked time.
    void pipeInput(final ClientRecord<KTestRecord<K, V> record, final long timestampMs);

	//Methods to pipe list of records
    void pipeValueList(final List<V> values);
    void pipeValueListpipeKeyValueList(final List<V> values, final long startTimestamp, final long advanceMs);

 List<KeyValue<K, V>> keyValues);

    // Use provided timestamp, does not auto advance internally tracked time.
    void pipeKeyValueListpipeValueList(final List<KeyValue<K, V>> keyValuesList<V> values, final Instant startTimestamp, final Duration advanceMs);
    void pipeKeyValueList(final List<KeyValue<K, V>> keyValues, final longInstant startTimestamp, final longDuration advanceMs);

    // If voidrecord pipeRecordList(finaltimestamp List<ClientRecord<Kset, V>> records); does not auto advance internally tracked time.
    void pipeRecordList(final List<ClientRecord<KList<? extends TestRecord<K, V>> records, final long startTimestamp, final long advanceMs);
}


Code Block
languagejava
titleTestOutputTopic
package org.apache.kafka.streams.test;

public class TestOutputTopic<K, V> {
    //Create by TopologyTestDriver, Constructors are package private

	//Method to check queue size
    final long getQueueSize();
    final boolean isEmpty();

	//Methods to readOutput, throw NoSuchElement if no record in queue
    V readValue();
    KeyValue<K, V> readKeyValue();
    ClientRecord<KTestRecord<K, V> readRecord();

    //Output as collection
    List<V> readValuesToList();
    Map<KList<KeyValue<K, V>V>> readKeyValuesToMapreadKeyValuesToList();
    List<KeyValue<KMap<K, V>>V> readKeyValuesToListreadKeyValuesToMap();
    List<ClientRecord<KList<TestRecord<K, V>> readRecordToListreadRecordsToList();
}


Code Block
languagejava
titleOutputVerifierTestRecord
package org.apache.kafka.streams.test;
public class TestRecord<K, V> {
	//OutputVerifierConstructors
 modified to accept ClientRecordpublic insteadTestRecord(final ofV ProducerRecord
public class OutputVerifier {
 value);
    public TestRecord(final K key, final V value);
    public TestRecord(final staticK <Kkey,V> final voidV compareValue(ClientRecord<Kvalue, V>final record, V expectedValue) throws AssertionErrorHeaders headers);
    public TestRecord(final staticK <Kkey,V> void compareValue(ClientRecord<K, V> record, ClientRecord<K, V> expectedRecord) throws AssertionErrorfinal V value, final Instant recordTime);
 
   public publicTestRecord(final staticK <Kkey,V> final voidV compareKeyValue(ClientRecord<Kvalue, final V>Headers recordheaders, Kfinal expectedKey, V expectedValue) throws AssertionErrorInstant recordTime);
    public TestRecord(final staticK <Kkey,V> final voidV compareKeyValue(ClientRecord<Kvalue, final V>Headers recordheaders, ClientRecord<K,final V>Long expectedRecord) throws AssertionError;
 timestamp);
	
	//Constructor based on existing record
    public staticTestRecord(final <KConsumerRecord<K, V> void compareValueTimestamp(ClientRecord<K record);
    public TestRecord(final ProducerRecord<K, V> record, V expectedValue, long expectedTimestamp) throws AssertionError;);
	
	// Methods like in ProducerRecord / ConsumerRecord
    public static <K,V> void compareValueTimestamp(ClientRecord<K, V> record, ClientRecord<K, V> expectedRecord) throws AssertionError;
  Headers headers();
    public K key();
    public V value();
    public Long timestamp();

	// Getters
    public static <K,V> void compareKeyValueTimestamp(ClientRecord<K, V> record, K expectedKey, V expectedValue, long expectedTimestamp) throws AssertionError Headers getHeaders();
    public K getKey();
    public V getValue();
    public Instant getRecordTime();
	
	//Overrides
    public String toString();
    public static <K,V> void compareKeyValueTimestamp(ClientRecord<K, V> record, ClientRecord<K, V> expectedRecord) throws AssertionError;
 
}boolean equals(Object o);
    public int hashCode();
}


Code Block
languagejava
titleOutputVerifier
package org.apache.kafka.streams.test;

//Recommended to use normal assertion library methods 
@Deprecated
public class OutputVerifier {
...


Code Block
languagejava
titleConsumerRecordFactory
package org.apache.kafka.streams.test;

//Similar functionality now in TestInputTopic
@Deprecated
public class ConsumerRecordFactory<K, V> {
...


Proposed Changes

This improvement adds TestInputTopic class which replaces TopologyTestDriver  and ConsumerRecordFactory methods as one class to be used to write to Input Topics and TestOutputTopic class which collects TopologyTestDriver  reading methods and provide typesafe read methods.

Code Block
languagejava
titleExample
public class SimpleTopicTest {

  private TopologyTestDriver testDriver;
  private TestInputTopic<String, String> inputTopic;
  private TestOutputTopic<String, String> outputTopic;

  @Before
  public void setup() {
    testDriver = new TopologyTestDriver(TestStream.getTopology(), TestStream.getConfig());
	inputTopic = testDriver.createInputTopic(TestStream.INPUT_TOPIC, new Serdes.StringSerdeStringDeserializer(), new Serdes.StringSerdeStringDeserializer());
    outputTopic = testDriver.createOutputTopic(TestStream.OUTPUT_TOPIC, new Serdes.StringSerdeStringSerializer(), new Serdes.LongSerdeLongSerializer());
  }

  @After
  public void tearDown() {
      testDriver.close();
  }

  @Test
  public void testOneWord() {
    //Feed word "Hello" to inputTopic and no kafka key, timestamp is irrelevant in this case
    inputTopic.pipeInput("Hello");
    assertThat(outputTopic.readValue()).isEqualTo("Hello");
    //No more output in topic
    assertThat(outputTopic.isEmpty()).isTrue();
  }
}

...

The tests utilizing old TopologyTestDriver can still use deprecated methods.

Migration plan ???

Rejected Alternatives

  • This is will replace replacing KIP-456: Helper classes to make it simpler to write test logic with TopologyTestDriver if accepted
  • Deprecate current TestTopologyDriver and move new to test package. This would have enabled to keep also TestInputTopic and TestOutputTopic classes in test package, not in very crowded streams root package.
  • Add ClientRecord interface to client package and modifiy ProducerRecord (and / or ConsumerRecord) to implement it, to be to utilize OutputVerifier with ProducerRecord and TestRecord