Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

 

Table of Contents

Status

Current stateUnder DiscussionAccepted [VOTE] KIP-247: Add public test utils for Kafka Streams

Discussion thread: TODO[DISCUSS] KIP-247: Add public test utils for Kafka Streams

JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-3625

...

Code Block
languagejava
package org.apache.kafka.streams.test;

public class TopologyTestDriver {

    public TopologyTestDriver(Topology topology, Properties config); // initialized WallClockMockTimeMs with System.currentTimeMillis()

    public TopologyTestDriver(Topology topology, Properties config, Timelong timeinitialWallClockTimeMs);

    public void close();


    // methodsevent-time foris actualautomatically testing

advanced based on the publicprovided void process(String topicNameinput records, byte[] key, byte[] value, long timestamp); and thus event-time punctuation are triggered automatically
    // wall-clock time is mocked and not advanced automatically; user can advance wall-clock time manually, and thus, trigger wall-clock time punctuations manually
    public void pipeInput(ConsumerRecord<byte[],                         byte[]> record); // main process methodcan trigger event-time punctuation
    public void process(String topicName, bytepipeInput(List<ConsumerRecord<byte[] key, byte[]>> valuerecords); // can trigger event-time punctuation
    public void advanceWallClockTime(long advanceMs); // can trigger                                                       wall-clock-time punctuation

    // usesmethods constructorfor provided Time objectresult verification
 
    public <KProducerRecord<byte[], V> void process(String topicName, K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, long timestamp); // use objects+serializersbyte[]> readOutput(String topic);
    public <K, V> ProducerRecord<K, voidV> processreadOutput(String topicNametopic, KDeserializer<K> keykeyDeserializer, V value, Serializer<K> keySerializer, Serializer<V> valueSerializerDeserializer<V> valueDeserializer);

    //public for null-key messagesMap<String, StateStore> getAllStateStores()
    public voidStateStore processgetStateStore(String topicName, byte[] value, long timestampname);
    public void process(String topicName, byte[] value<K, V> KeyValueStore<K, V> getKeyValueStore(String name);
    public <V> void process(String topicName<K, VV> valueWindowStore<K, Serializer<V> valueSerializer, long timestamp)V> getWindowStore(String name);
    public <V><K, void process(String topicNameV> SessionStore<K, V value, Serializer<V> valueSerializer)V> getSessionStore(String name);

    //public single input topic topologies can omit `topicName` (repeats all methods from above without topicName parametervoid close();
}
Code Block
languagejava
package org.apache.kafka.streams.test;

public class ConsumerRecordFactory<K, V> {

    // default 
    public void process(byte[] key, byte[] value, long timestamp);
    public void process(byte[] key, byte[] value);ConsumerRecordFactory(String defaultTopicName, Serializer<K> keySerializer, Serializer<V> valueSerializer); // initialized startTimestampMs with System.currentTimeMillis() and autoAdvanceMs with zero
    public <K, V> void process(K key, V value, ConsumerRecordFactory(String defaultTopicName, Serializer<K> keySerializer, Serializer<V> valueSerializer, long timestampstartTimestampMs);
    public <KConsumerRecordFactory(String defaultTopicName, V> void process(K keySerializer<K> keySerializer, VSerializer<V> valuevalueSerializer, Serializer<K>long keySerializerstartTimestampMs, Serializer<V>long valueSerializerautoAdvanceMs);

    public// void process(byte[] value, long timestamp);
    public void process(byte[] value);no default topic name; requires to specify topic name in #create(...)
    public <V> void process(V valueConsumerRecordFactory(Serializer<K> keySerializer, Serializer<V> valueSerializer, long timestamp);
    public <V> void process(V valueConsumerRecordFactory(Serializer<K> keySerializer, Serializer<V> valueSerializer)

,    // see below; cf. TestRecrodFactorylong startTimestampMs);
    public void processConsumerRecordFactory(TestRecord record);
    public void process(List<TestRecord> recordsSerializer<K> keySerializer, Serializer<V> valueSerializer, long startTimestampMs, long autoAdvanceMs);

    public void punctuateWallClockTimeadvanceTimeMs();
    public void punctuateStreamTime(long advanceMs);


    // methods for result verification
 create single records with default topic name
    public ProducerRecord<byteConsumerRecord<byte[], byte[]> readOutputcreate(String topic);
    public <KK key, V>V ProducerRecord<Kvalue, V> readOutput(String topic, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializerlong timestampMs);

    public StateStore getStateStore(String name);
    public <K, V> KeyValueStore<K, V> getKeyValueStore(String nameConsumerRecord<byte[], byte[]> create(K key, V value);
    public <KConsumerRecord<byte[], V> KeyValueStore<Kbyte[]> create(V value, V> getWindowStore(String namelong timestampMs);
    public <KConsumerRecord<byte[], V> KeyValueStore<K, V> getSessionStore(String name);
}
Code Block
languagejava
package org.apache.kafka.streams.test;

public class TestRecord<K, V> {

    public TestRecord(String topicName, K key, V value, long timestamp, Serializer<K> keySerializer, Serializer<V> valueSerializerbyte[]> create(V value);


    // create list of records with default topic name
    public List<ConsumerRecord<byte[], byte[]>> create(List<KeyValue<K, V>> keyValues);
    public TestRecord(String topicNameList<ConsumerRecord<byte[], byte[]>> keycreate(List<KeyValue<K, byte[]V>> valuekeyValues, long timestamp);

    public String topicName(startTimestamp, long advanceMs);
    public byteList<ConsumerRecord<byte[] key();
    public, byte[]>> value();
    public long timestamp();
}
Code Block
languagejava
package org.apache.kafka.streams.test;

public class TestRecordFactory<K, V> {create(List<KeyValue<K, V>> keyValues, long startTimestamp);


    // createoverwrite recordsdefault fortopic aname
 single input topic
 public ConsumerRecord<byte[],  public TestRecordFactorybyte[]> create(String topicName, Serializer<K> keySerializer, Serializer<V> valueSerializer, Time time);


    public TestRecord<K, V> create(K key, V value, long timestamptimestampMs);
    public TestRecord<KConsumerRecord<byte[], V>byte[]> create(String topicName, K key, V value);
    // null-key
    public TestRecord<KConsumerRecord<byte[], V>byte[]> create(V value, long timestamp);
    public TestRecord<K, V> create(V value);
    public TestRecord<K, V> create(KeyValue<K, V> keyValue, long timestamp)String topicName, V value, long timestampMs);
    public TestRecord<KConsumerRecord<byte[], V>byte[]> create(KeyValue<KString topicName, V>V keyValuevalue);


    // those methods createallow listto ofaccess recordsregular baseas onwell listas of k-v-pairsglobal stores
    public List<TestRecord<KList<ConsumerRecord<byte[], V>>byte[]>> create(String topicName, List<KeyValue<K, V>> keyValues); // start timestamp from constructor Time object; timestamp auto increment by 1
    public List<TestRecord<KList<ConsumerRecord<byte[], V>>byte[]>> create(String topicName, List<KeyValue<K, V>> keyValues, long startTimestamp, long advanceMs); // timestamp auto increment by 1
    public List<TestRecord<KList<ConsumerRecord<byte[], V>>byte[]>> create(String topicName, List<KeyValue<K, V>> keyValues, long startTimestamp, long advanceMs);

}
Code Block
languagejava
package org.apache.kafka.streams.test;

public class MockTimeOutputVerifier implements Time {

    //public Listenerstatic that<K,V> is invoked each time <em>after</em> time was advanced or modified.void compareValue(ProducerRecord<K, V> record, V expectedValue) throws AssertionError;
    public interfacestatic MockTimeListener {
        void tick(long currentTimeMs);
<K,V> void compareValue(ProducerRecord<K, V> record, ProducerRecord<K, V> expectedRecord) throws AssertionError;

    }

public static <K,V> void public MockTime(); // init with System.currentTimeMillis() and System.nanoTime(); no auto-tickcompareKeyValue(ProducerRecord<K, V> record, K expectedKey, V expectedValue) throws AssertionError;
    public MockTime(long autoTickMs); 
    public MockTime(long autoTickMs, long currentTimeMs, long currentHighResTimeNs)static <K,V> void compareKeyValue(ProducerRecord<K, V> record, ProducerRecord<K, V> expectedRecord) throws AssertionError;

    public static <K,V> void addListenercompareValueTimestamp(finalProducerRecord<K, MockTimeListenerV> listener) {

    public void setAutoTickMs(long autoTickMs)record, V expectedValue, long expectedTimestamp) throws AssertionError;
    public static <K,V> void setCurrentTimeMscompareValueTimestamp(long newMs);


    // calling those methods triggers auto-tick
    @OverrideProducerRecord<K, V> record, ProducerRecord<K, V> expectedRecord) throws AssertionError;

    public long milliseconds();
    @Override
    public long nanoseconds();
    @Overridestatic <K,V> void compareKeyValueTimestamp(ProducerRecord<K, V> record, K expectedKey, V expectedValue, long expectedTimestamp) throws AssertionError;
    public long hiResClockMs();


    @Override
    public void sleep(final long ms);
}

static <K,V> void compareKeyValueTimestamp(ProducerRecord<K, V> record, ProducerRecord<K, V> expectedRecord) throws AssertionError;

}

 

Proposed Changes

We are adding the above described test helper classes in a new artifact streams-test-utils such that people can easily include it as a dependency to their build. The main test class is the test driver while the others a auxiliary classes to generate test data etc.

For time base operations it's important to allow fine grained controll over time. Therefore, we provide a mock time class that can be ingested into the driver and can be used to generte input data. To avoid testing boiler plate code, we provide a rich amount of overload methods and a TestRecordFactory ConsumerRecordFactory that simplifies to generate TestRecords instead generate ConsumeRecords<byte[],byte[]> instead of using the verbose underlying methods. Thus, people have many degrees of freedom on how to use the test driver in their own code. ConsumerRecord constructor.

In the initla release, we might want to mark all new classes with annotation @Unstable or @Evolving.

Compatibility, Deprecation, and Migration Plan

...