Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
package org.apache.kafka.streams.test;

public class TopologyTestDriver {

    public TopologyTestDriver(Topology topology, Properties config); // initialized WallClockMockTimeMs with System.currentTimeMillis();

    public TopologyTestDriver(Topology topology, Properties config, long initialWallClockTimeMs);

    // event-time is automatically advanced based on the provided input records, and thus event-time punctuation are triggered automatically
    // wall-clock time is mocked and not advanced automatically; user can advance wall-clock time manually, and thus, trigger wall-clock time punctuations manually
    public void processpipeInput(ConsumerRecord<byte[], byte[]> record); // can trigger event-time punctuation
    public void processpipeInput(List<ConsumerRecord<byte[], byte[]>> records); // can trigger event-time punctuation
    public void advanceWallClockTime(long advanceMs); // can trigger wall-clock-time punctuation

    // methods for result verification
 
    public ProducerRecord<byte[], byte[]> readOutput(String topic);
    public <K, V> ProducerRecord<K, V> readOutput(String topic, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer);

    public StateStore getStateStore(String name);
    public <K, V> KeyValueStore<K, V> getKeyValueStore(String name);
    public <K, V> WindowStore<K, V> getWindowStore(String name);
    public <K, V> SessionStore<K, V> getSessionStore(String name);

    public void close();
}
Code Block
languagejava
package org.apache.kafka.streams.test;

public class ConsumerRecordFactory<K, V> {

    // default 
    public ConsumerRecordFactory(String defaultTopicName, Serializer<K> keySerializer, Serializer<V> valueSerializer); // initialized WallClockMockTimeMs with System.currentTimeMillis();
    public ConsumerRecordFactory(String defaultTopicName, Serializer<K> keySerializer, Serializer<V> valueSerializer, long startTimestampMs);
    public ConsumerRecordFactory(String defaultTopicName, Serializer<K> keySerializer, Serializer<V> valueSerializer, long startTimestampMs, long autoAdvanceMs);

    // no default topic name; requires to specify topic name in #create(...)
    public ConsumerRecordFactory(Serializer<K> keySerializer, Serializer<V> valueSerializer);
    public ConsumerRecordFactory(Serializer<K> keySerializer, Serializer<V> valueSerializer, long startTimestampMs);
    public ConsumerRecordFactory(Serializer<K> keySerializer, Serializer<V> valueSerializer, long startTimestampMs, long autoAdvanceMs);

    public void advanceTimeMs(long advanceMs);

    // create single records with default topic name
    public ConsumerRecord<byte[], byte[]> create(K key, V value, long timestampMs);
    public ConsumerRecord<byte[], byte[]> create(K key, V value);
    public ConsumerRecord<byte[], byte[]> create(V value, long timestampMs);
    public ConsumerRecord<byte[], byte[]> create(V value);


    // create list of records with default topic name
    public List<ConsumerRecord<byte[], byte[]>> create(List<KeyValue<K, V>> keyValues);
    public List<ConsumerRecord<byte[], byte[]>> create(List<KeyValue<K, V>> keyValues, long startTimestamp, long advanceMs);
    public List<ConsumerRecord<byte[], byte[]>> create(List<KeyValue<K, V>> keyValues, long startTimestamp);


    // overwrite default topic name
    public ConsumerRecord<byte[], byte[]> create(String topicName, K key, V value, long timestampMs);
    public ConsumerRecord<byte[], byte[]> create(String topicName, K key, V value);
    public ConsumerRecord<byte[], byte[]> create(String topicName, V value, long timestampMs);
    public ConsumerRecord<byte[], byte[]> create(String topicName, V value);

    // those methods allow to access regular as well as global stores
    public List<ConsumerRecord<byte[], byte[]>> create(String topicName, List<KeyValue<K, V>> keyValues);
    public List<ConsumerRecord<byte[], byte[]>> create(String topicName, List<KeyValue<K, V>> keyValues, long startTimestamp, long advanceMs);
    public List<ConsumerRecord<byte[], byte[]>> create(String topicName, List<KeyValue<K, V>> keyValues, long startTimestamp);

}

...