Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
package org.apache.kafka.streams.test;

public class TopologyTestDriver {

    public TopologyTestDriver(Topology topology, Properties config);
    public TopologyTestDriver(Topology topology, Properties config, Time time);

    public void close(long initialWallClockTimeMs);


    // methodsevent-time foris actualautomatically testing

advanced based on the publicprovided void process(String topicNameinput records, byte[] key, byte[] value, long timestamp);                                                          // main process method
    public void process(String topicName, byte[] key, byte[] value);                                                                      and thus event-time punctuation are triggered automatically
    // useswall-clock constructortime providedis Timemocked object
and not   public <K, V> void process(String topicName, K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, long timestamp); // use objects+serializers
    public <K, V> void process(String topicName, K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer);

    // for null-key messagesadvanced automatically; user can advance wall-clock time manually, and thus, trigger wall-clock time punctuations manually
    public void process(String topicName, byte[] value, long timestamp);
    public void process(String topicNameConsumerRecord<byte[], byte[]> value);
    public <V> void process(String topicName, V value, Serializer<V> valueSerializer, long timestamp)
    public <V> void process(String topicName, V value, Serializer<V> valueSerializer)

    // single input topic topologies can omit `topicName` (repeats all methods from above without topicName parameterrecord); // can trigger event-time punctuation
    public void process(byteList<ConsumerRecord<byte[] key, byte[] value, long timestamp>> records);
 // can trigger public void process(byte[] key, byte[] value);event-time punctuation
    public <K, V> void process(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, long timestamp);
    public <K, V> void process(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer);
    public void process(byte[] value, long timestamp);
    public void process(byte[] value);
    public <V> void process(V value, Serializer<V> valueSerializer, long timestamp)
    public <V> void process(V value, Serializer<V> valueSerializer)

    // see below; cf. TestRecrodFactory
    public void process(TestRecord record);
    public void process(List<TestRecord> records);

    public void punctuateWallClockTime();
    public void punctuateStreamTime();


advanceWallClockTime(long advanceMs); // can trigger wall-clock-time punctuation

    // methods for result verification
 
    public ProducerRecord<byte[], byte[]> readOutput(String topic);
    public <K, V> ProducerRecord<K, V> readOutput(String topic, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer);

    public StateStore getStateStore(String name);
    public <K, V> KeyValueStore<K, V> getKeyValueStore(String name);
    public <K, V> KeyValueStore<KWindowStore<K, V> getWindowStore(String name);
    public <K, V> KeyValueStore<KSessionStore<K, V> getSessionStore(String name);

    public void close();
}
Code Block
languagejava
package org.apache.kafka.streams.test;

public class TestRecord<KConsumerRecordFactory<K, V> {

    public TestRecordConsumerRecordFactory(String topicName, K key, V value, long timestampdefaultTopicName, Serializer<K> keySerializer, Serializer<V> valueSerializer);
    public TestRecordConsumerRecordFactory(String topicNamedefaultTopicName, byte[]Serializer<K> keykeySerializer, byte[]Serializer<V> valuevalueSerializer, long timestampstartTimestampMs);

    public ConsumerRecordFactory(String topicName();
    public byte[] key();
    public byte[] value();
    public long timestamp();
}
Code Block
languagejava
package org.apache.kafka.streams.test;

public class TestRecordFactory<K, V> {

    // create records for a single input topic defaultTopicName, Serializer<K> keySerializer, Serializer<V> valueSerializer, long startTimestampMs, long autoAdvanceMs);

    // no default topic name; requires to specify topic name in #create(...)
    public ConsumerRecordFactory(Serializer<K> keySerializer, Serializer<V> valueSerializer);
    public TestRecordFactoryConsumerRecordFactory(String topicName, Serializer<K> keySerializer, Serializer<V> valueSerializer, Timelong timestartTimestampMs);


    public ConsumerRecordFactory(Serializer<K> TestRecord<KkeySerializer, V> create(K keySerializer<V> valueSerializer, Vlong valuestartTimestampMs, long timestampautoAdvanceMs);

    public TestRecord<K, V> create(K key, V value);void advanceTimeMs(long advanceMs);

    // null-key create single records with default topic name
    public TestRecord<KConsumerRecord<byte[], V>byte[]> create(K key, V value, long timestamptimestampMs);
    public TestRecord<KConsumerRecord<byte[], V>byte[]> create(K key, V value);
    public TestRecord<KConsumerRecord<byte[], V>byte[]> create(KeyValue<K, V> keyValueV value, long timestamptimestampMs);
    public TestRecord<KConsumerRecord<byte[], V>byte[]> create(KeyValue<K, V> keyValueV value);


    // create list of records basewith ondefault list of k-v-pairstopic name
    public List<TestRecord<KList<ConsumerRecord<byte[], V>>byte[]>> create(List<KeyValue<K, V>> keyValues); // start timestamp from constructor Time object; timestamp auto increment by 1
    public List<TestRecord<KList<ConsumerRecord<byte[], V>>byte[]>> create(List<KeyValue<K, V>> keyValues, long startTimestamp, long advanceMs); // timestamp auto increment by 1
    public List<TestRecord<KList<ConsumerRecord<byte[], V>>byte[]>> create(List<KeyValue<K, V>> keyValues, long startTimestamp, long advanceMs);
}
Code Block
languagejava
package org.apache.kafka.streams.test;

public class MockTime implements Time {

    // Listeneroverwrite thatdefault is invoked each time <em>after</em> time was advanced or modified.topic name
    public interface MockTimeListener {
        void tick(long currentTimeMsConsumerRecord<byte[], byte[]> create(String topicName, K key, V value, long timestampMs);
    }

    public MockTime(); // init with System.currentTimeMillis() and System.nanoTime(); no auto-tickpublic ConsumerRecord<byte[], byte[]> create(String topicName, K key, V value);
    public MockTime(long autoTickMs); 
    public MockTime(long autoTickMs, long currentTimeMsConsumerRecord<byte[], byte[]> create(String topicName, V value, long currentHighResTimeNstimestampMs);

    public void addListener(final MockTimeListener listener) { ConsumerRecord<byte[], byte[]> create(String topicName, V value);

    public void setAutoTickMs(long autoTickMs);
    public void setCurrentTimeMs(long newMs);

 List<ConsumerRecord<byte[], byte[]>> create(String topicName, List<KeyValue<K, V>> keyValues);
    // calling those methods triggers auto-tick
    @Override
    public long milliseconds(public List<ConsumerRecord<byte[], byte[]>> create(String topicName, List<KeyValue<K, V>> keyValues, long startTimestamp, long advanceMs);
    @Override
    public long nanoseconds();
    @Override
    public long hiResClockMs();


    @Override
    public void sleep(final long ms);
}

public List<ConsumerRecord<byte[], byte[]>> create(String topicName, List<KeyValue<K, V>> keyValues, long startTimestamp);

}

 

Proposed Changes

We are adding the above described test helper classes in a new artifact streams-test-utils such that people can easily include it as a dependency to their build. The main test class is the test driver while the others a auxiliary classes to generate test data etc.

For time base operations it's important to allow fine grained controll over time. Therefore, we provide a mock time class that can be ingested into the driver and can be used to generte input data. To avoid testing boiler plate code, we provide a rich amount of overload methods and a TestRecordFactory ConsumerRecordFactory that simplifies to generate TestRecords instead generate ConsumeRecords<byte[],byte[]> instead of using the verbose underlying methods. Thus, people have many degrees of freedom on how to use the test driver in their own code. ConsumerRecord constructor.

In the initla release, we might want to mark all new classes with annotation @Unstable or @Evolving.

Compatibility, Deprecation, and Migration Plan

...