Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Change class packages

...

TopologyTestDriver (new
Code Block
languagejava
titleTopologyTestDriver
package org.apache.kafka.streams;
 
@Deprecated
public class TopologyTestDriver {
	...
 public TopologyTestDriver(Topology topology, Properties config); // initialized WallClockMockTimeMs with System.currentTimeMillis()

	public TopologyTestDriver(Topology topology, Properties config, long initialWallClockTimeMs);

    //Deprecate old pipe and read methods
    @Deprecate public void pipeInput(ConsumerRecord<byte[], byte[]> record); // can trigger event-time punctuation
    @Deprecate public void pipeInput(List<ConsumerRecord<byte[], byte[]>> records); // can trigger event-time punctuation
 
    // methods for result verification
  
    @Deprecate public ProducerRecord<byte[], byte[]> readOutput(String topic);
    @Deprecate public <K, V> ProducerRecord<K, V> readOutput(String topic, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer);
 
    ...
}
Code Block
languagejava
title
)
package org.apache.kafka.streams.test;

public class TopologyTestDriver {
	public TopologyTestDriver(Topology topology, Properties config); // initialized WallClockMockTimeMs with System.currentTimeMillis()

	public TopologyTestDriver(Topology topology, Properties config, long initialWallClockTimeMs);

	// event-time is automatically advanced based on the provided input records, and thus event-time punctuation are triggered automatically
	// wall-clock time is mocked and not advanced automatically; user can advance wall-clock time manually, and thus, trigger wall-clock time punctuations manually
	public void advanceWallClockTime(long advanceMs); // can trigger wall-clock-time punctuation

	// methods for TestTopic object creation
	//TODO investigate, is there way to find out serde based on topology
	public final <K, V> TestInputTopic<K, V> createInputTopic(final String topicName, final Serde<K> keySerde, final Serde<V> valueSerde);
	public final <K, V> TestOutputTopic<K, V> createOutputTopic(final String topicName, final Serde<K> keySerde, final Serde<V> valueSerde);

	public Map<String, StateStore> getAllStateStores()
	public StateStore getStateStore(String name);
	public <K, V> KeyValueStore<K, V> getKeyValueStore(String name);
	public <K, V> WindowStore<K, V> getWindowStore(String name);
	public <K, V> SessionStore<K, V> getSessionStore(String name);

	public void close();
}

...

Code Block
languagejava
titleTestInputTopic
package org.apache.kafka.streams.test;

public class TestInputTopic<K, V> {
    //Create by TopologyTestDriver, Constructors are package private

    //Timestamp handling 
    //Record timestamp can be provided when piping input or use internally tracked time initialized configured with configureTiming:
    //startTimestampMs the initial timestamp for generated records, if not provided uses current system time as start timestamp.
    //autoAdvanceMs the time increment per generated record, if not provided auto-advance is disabled.

    void configureTiming(final long startTimestampMs, final long autoAdvanceMs);
    //Advances the internally tracked time.
    void advanceTimeMs(final long advanceMs);

	//Methods to pipe single record
    void pipeInput(final V value);
    void pipeInput(final V value, final long timestampMs);

    void pipeInput(final K key, final V value);
    void pipeInput(final K key, final V value, final long timestampMs);

    void pipeInput(final ClientRecord<K, V> record);
    void pipeInput(final ClientRecord<K, V> record, final long timestampMs);

	//Methods to pipe list of records
    void pipeValueList(final List<V> values);
    void pipeValueList(final List<V> values, final long startTimestamp, final long advanceMs);

    void pipeKeyValueList(final List<KeyValue<K, V>> keyValues);
    void pipeKeyValueList(final List<KeyValue<K, V>> keyValues, final long startTimestamp, final long advanceMs);

    void pipeRecordList(final List<ClientRecord<K, V>> records);
    void pipeRecordList(final List<ClientRecord<K, V>> records, final long startTimestamp, final long advanceMs);
}

...

Code Block
languagejava
titleTestOutputTopic
package org.apache.kafka.streams.test;

public class TestOutputTopic<K, V> {
    //Create by TopologyTestDriver, Constructors are package private

	//Method to check queue size
    final long getQueueSize();
    final boolean isEmpty();

	//Methods to readOutput
    V readValue();
    KeyValue<K, V> readKeyValue();
    ClientRecord<K, V> readRecord();

    //Output as collection
    List<V> readValuesToList();
    Map<K, V> readKeyValuesToMap();
    List<KeyValue<K, V>> readKeyValuesToList();
    List<ClientRecord<K, V>> readRecordToList();
}

...