Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Added more methods with Instant and duration, deprecating version with long

...

Code Block
languagejava
titleTopologyTestDriver
package org.apache.kafka.streams;

public class TopologyTestDriver {
    ...public TopologyTestDriver(Topology topology, Properties config); // existing constructor
    @Deprecate public TopologyTestDriver(Topology topology, Properties config, long initialWallClockTimeMs);
    public TopologyTestDriver(Topology topology, Properties config, Instant initialWallClockTime);

    @Deprecate public void advanceWallClockTime(long advanceMs); // can trigger wall-clock-time punctuation
	public void advanceWallClockTime(Duration advance); // can trigger wall-clock-time punctuation

    //Deprecate old pipe and read methods
    @Deprecate public void pipeInput(ConsumerRecord<byte[], byte[]> record); // can trigger event-time punctuation
    @Deprecate public void pipeInput(List<ConsumerRecord<byte[], byte[]>> records); // can trigger event-time punctuation
    @Deprecate public ProducerRecord<byte[], byte[]> readOutput(String topic);
    @Deprecate public <K, V> ProducerRecord<K, V> readOutput(String topic, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer);

	// methods for TestTopic object creation
	public final <K, V> TestOutputTopic<K, V> createOutputTopic(final String topicName, final Serde<K> keySerde, final Serde<V> valueSerde);
	// Uses current system time as start timestamp. Auto-advance is disabled.
	public final <K, V> TestInputTopic<K, V> createInputTopic(final String topicName, final Serde<K> keySerde, final Serde<V> valueSerde);
    //Uses provided startTimestamp and autoAdvance duration for timestamp generation
	public final <K, V> TestInputTopic<K, V> createInputTopic(final String topicName, final Serde<K> keySerde, final Serde<V> valueSerde, final Instant startTimestamp, final Duration autoAdvance);

    ...
}

...

Code Block
languagejava
titleTestInputTopic
package org.apache.kafka.streams;

public class TestInputTopic<K, V> {
    //Create by TopologyTestDriver, Constructors are package private

    //Timestamp handling 
    //Record timestamp can be provided when piping input or use internally tracked time configured with parameters:
    //startTimestamp the initial timestamp for generated records, if not provided uses current system time as start timestamp.
    //autoAdvance the time increment per generated record, if not provided auto-advance is disabled.

    //Advances the internally tracked time.
    void advanceTimeMs(final Duration advance);

	//Methods to pipe single record
    void pipeInput(final V value);
    void pipeInput(final K key, final V value);

    // Use provided timestamp, does not auto advance internally tracked time.
    void pipeInput(final V value, final Instant timestamp);
    void pipeInput(final K key, final V value, final Instant timestamp);

    // Methods with long provided as deprecated to support easier migration of old tests
    @Deprecated void pipeInput(final V value, final long timestampMs);
    @Deprecated void pipeInput(final K key, final V value, final long timestampMs);

    // If record timestamp set, does not auto advance internally tracked time.
    void pipeInput(final TestRecord<K, V> record);

	//Methods to pipe list of records
    void pipeValueList(final List<V> values);
    void pipeKeyValueList(final List<KeyValue<K, V>> keyValues);

    // Use provided timestamp, does not auto advance internally tracked time.
    void pipeValueList(final List<V> values, final Instant startTimestamp, final Duration advanceMs);
    void pipeKeyValueList(final List<KeyValue<K, V>> keyValues, final Instant startTimestamp, final Duration advanceMs);

    // If record timestamp set, does not auto advance internally tracked time.
    void pipeRecordList(final List<? extends TestRecord<K, V>> records);
}

...

Code Block
languagejava
titleTestRecord
package org.apache.kafka.streams.test;
public class TestRecord<K, V> {
	//Constructors
    public TestRecord(final V value);
    public TestRecord(final K key, final V value);
    public TestRecord(final K key, final V value, final Headers headers);
    public TestRecord(final K key, final V value, final Instant recordTime);
    public TestRecord(final K key, final V value, final Headers headers, final Instant recordTime);
    @Deprecated public TestRecord(final K key, final V value, final Long timestamp);
    @Deprecated public TestRecord(final K key, final V value, final Headers headers, final Long timestamp);
	
	//Constructor based on existing record
    public TestRecord(final ConsumerRecord<K, V> record);
    public TestRecord(final ProducerRecord<K, V> record);
	
	// Methods like in ProducerRecord / ConsumerRecord
    public Headers headers();
    public K key();
    public V value();
    public Long timestamp();

	// Getters
    public Headers getHeaders();
    public K getKey();
    public V getValue();
    public LongInstant getTimestampgetRecordTime();
	
	//Overrides
    public String toString();
    public boolean equals(Object o);
    public int hashCode();
}

...