THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.streams; @Deprecated public class TopologyTestDriver { ... public TopologyTestDriver(Topology topology, Properties config); // initialized WallClockMockTimeMs with System.currentTimeMillis() public TopologyTestDriver(Topology topology, Properties config, long initialWallClockTimeMs); //Deprecate old pipe and read methods @Deprecate public void pipeInput(ConsumerRecord<byte[], byte[]> record); // can trigger event-time punctuation @Deprecate public void pipeInput(List<ConsumerRecord<byte[], byte[]>> records); // can trigger event-time punctuation // methods for result verification @Deprecate public ProducerRecord<byte[], byte[]> readOutput(String topic); @Deprecate public <K, V> ProducerRecord<K, V> readOutput(String topic, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer); ... } | ||||
Code Block | ||||
language | java | title | TopologyTestDriver (new) | package org.apache.kafka.streams.test; public class TopologyTestDriver { public TopologyTestDriver(Topology topology, Properties config); // initialized WallClockMockTimeMs with System.currentTimeMillis() public TopologyTestDriver(Topology topology, Properties config, long initialWallClockTimeMs); // event-time is automatically advanced based on the provided input records, and thus event-time punctuation are triggered automatically // wall-clock time is mocked and not advanced automatically; user can advance wall-clock time manually, and thus, trigger wall-clock time punctuations manually public void advanceWallClockTime(long advanceMs); // can trigger wall-clock-time punctuation // methods for TestTopic object creation //TODO investigate, is there way to find out serde based on topology public final <K, V> TestInputTopic<K, V> createInputTopic(final String topicName, final Serde<K> keySerde, final Serde<V> valueSerde); public final <K, V> TestOutputTopic<K, V> createOutputTopic(final String topicName, final Serde<K> keySerde, final Serde<V> valueSerde); public Map<String, StateStore> getAllStateStores() public StateStore getStateStore(String name); public <K, V> KeyValueStore<K, V> getKeyValueStore(String name); public <K, V> WindowStore<K, V> getWindowStore(String name); public <K, V> SessionStore<K, V> getSessionStore(String name); public void close(); } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.streams.test;
public class TestInputTopic<K, V> {
//Create by TopologyTestDriver, Constructors are package private
//Timestamp handling
//Record timestamp can be provided when piping input or use internally tracked time initialized configured with configureTiming:
//startTimestampMs the initial timestamp for generated records, if not provided uses current system time as start timestamp.
//autoAdvanceMs the time increment per generated record, if not provided auto-advance is disabled.
void configureTiming(final long startTimestampMs, final long autoAdvanceMs);
//Advances the internally tracked time.
void advanceTimeMs(final long advanceMs);
//Methods to pipe single record
void pipeInput(final V value);
void pipeInput(final V value, final long timestampMs);
void pipeInput(final K key, final V value);
void pipeInput(final K key, final V value, final long timestampMs);
void pipeInput(final ClientRecord<K, V> record);
void pipeInput(final ClientRecord<K, V> record, final long timestampMs);
//Methods to pipe list of records
void pipeValueList(final List<V> values);
void pipeValueList(final List<V> values, final long startTimestamp, final long advanceMs);
void pipeKeyValueList(final List<KeyValue<K, V>> keyValues);
void pipeKeyValueList(final List<KeyValue<K, V>> keyValues, final long startTimestamp, final long advanceMs);
void pipeRecordList(final List<ClientRecord<K, V>> records);
void pipeRecordList(final List<ClientRecord<K, V>> records, final long startTimestamp, final long advanceMs);
} |
...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.streams.test;
public class TestOutputTopic<K, V> {
//Create by TopologyTestDriver, Constructors are package private
//Method to check queue size
final long getQueueSize();
final boolean isEmpty();
//Methods to readOutput
V readValue();
KeyValue<K, V> readKeyValue();
ClientRecord<K, V> readRecord();
//Output as collection
List<V> readValuesToList();
Map<K, V> readKeyValuesToMap();
List<KeyValue<K, V>> readKeyValuesToList();
List<ClientRecord<K, V>> readRecordToList();
} |
...
- This is will replace KIP-456: Helper classes to make it simpler to write test logic with TopologyTestDriver if accepted
- Deprecate current TestTopologyDriver and move new to test package. This would have enabled to keep also TestInputTopic and TestOutputTopic classes in test package, not in very crowded streams root package.