THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
Table of Contents |
---|
Status
Current state: Under Discussion'Accepted'
Discussion thread: here
JIRA:
Jira | ||||||||
---|---|---|---|---|---|---|---|---|
|
...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.streams; public class TopologyTestDriver { public TopologyTestDriver(Topology topology, Properties config); // existing constructor @Deprecate public TopologyTestDriver(Topology topology, Properties config, long initialWallClockTimeMs); public TopologyTestDriver(Topology topology, Properties config, Instant initialWallClockTime); @Deprecate public void advanceWallClockTime(long advanceMs); // can trigger wall-clock-time punctuation public void advanceWallClockTime(Duration advance); // can trigger wall-clock-time punctuation //Deprecate old pipe and read methods @Deprecate public void pipeInput(ConsumerRecord<byte[], byte[]> record); // can trigger event-time punctuation @Deprecate public void pipeInput(List<ConsumerRecord<byte[], byte[]>> records); // can trigger event-time punctuation @Deprecate public ProducerRecord<byte[], byte[]> readOutput(String topic); @Deprecate public <K, V> ProducerRecord<K, V> readOutput(String topic, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer); // methods for TestTopic object creation public final <K, V> TestOutputTopic<K, V> createOutputTopic(final String topicName, final Serde<K>Serializer<K> keySerdekeySerializer, final Serde<V>Serializer<V> valueSerdevalueSerializer); // Uses current system time as start timestamp. Auto-advance is disabled. public final <K, V> TestInputTopic<K, V> createInputTopic(final String topicName, final Serde<K>Deserializer<K> keySerdekeyDeserializer, final Serde<V>Deserializer<V> valueSerdevalueDeserializer); //Uses provided startTimestamp and autoAdvance duration for timestamp generation public final <K, V> TestInputTopic<K, V> createInputTopic(final String topicName, final Serde<K>Deserializer<K> keySerdekeyDeserializer, final Serde<V>Deserializer<V> valueSerdevalueDeserializer, final Instant startTimestamp, final Duration autoAdvance); ... } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.streams; public class TestInputTopic<K, V> { //Create by TopologyTestDriver, Constructors are package private //Timestamp handling //Record timestamp can be provided when piping input or use internally tracked time configured with parameters: //startTimestamp the initial timestamp for generated records, if not provided uses current system time as start timestamp. //autoAdvance the time increment per generated record, if not provided auto-advance is disabled. //Advances the internally tracked time. void advanceTimeMsadvanceTime(final Duration advance); //Methods to pipe single record void pipeInput(final V value); void pipeInput(final K key, final V value); // Use provided timestamp, does not auto advance internally tracked time. void pipeInput(final V value, final Instant timestamp); void pipeInput(final K key, final V value, final Instant timestamp); // Method with long provided to support easier migration of old tests void pipeInput(final K key, final V value, final long timestampMs); // If record timestamp set, does not auto advance internally tracked time. void pipeInput(final TestRecord<K, V> record); //Methods to pipe list of records void pipeValueList(final List<V> values); void pipeKeyValueList(final List<KeyValue<K, V>> keyValues); // Use provided timestamp, does not auto advance internally tracked time. void pipeValueList(final List<V> values, final Instant startTimestamp, final Duration advanceMs); void pipeKeyValueList(final List<KeyValue<K, V>> keyValues, final Instant startTimestamp, final Duration advanceMs); // If record timestamp set, does not auto advance internally tracked time. void pipeRecordList(final List<? extends TestRecord<K, V>> records); } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.streams.test;
public class TestRecord<K, V> {
//Constructors
public TestRecord(final V value);
public TestRecord(final K key, final V value);
public TestRecord(final K key, final V value, final Headers headers);
public TestRecord(final K key, final V value, final Instant recordTime);
public TestRecord(final K key, final V value, final Headers headers, final Instant recordTime);
public TestRecord(final K key, final V value, final Headers headers, final Long timestamp);
//Constructor based on existing record
public TestRecord(final ConsumerRecord<K, V> record);
public TestRecord(final ProducerRecord<K, V> record);
// Methods like in ProducerRecord / ConsumerRecord
public Headers headers();
public K key();
public V value();
public Long timestamp();
// Getters
public Headers getHeaders();
public K getKey();
public V getValue();
public Instant getRecordTime();
//Overrides
public String toString();
public boolean equals(Object o);
public int hashCode();
} |
...
Code Block | ||||
---|---|---|---|---|
| ||||
public class SimpleTopicTest { private TopologyTestDriver testDriver; private TestInputTopic<String, String> inputTopic; private TestOutputTopic<String, String> outputTopic; @Before public void setup() { testDriver = new TopologyTestDriver(TestStream.getTopology(), TestStream.getConfig()); inputTopic = testDriver.createInputTopic(TestStream.INPUT_TOPIC, new Serdes.StringSerdeStringDeserializer(), new Serdes.StringSerdeStringDeserializer()); outputTopic = testDriver.createOutputTopic(TestStream.OUTPUT_TOPIC, new Serdes.StringSerdeStringSerializer(), new Serdes.LongSerdeLongSerializer()); } @After public void tearDown() { testDriver.close(); } @Test public void testOneWord() { //Feed word "Hello" to inputTopic and no kafka key, timestamp is irrelevant in this case inputTopic.pipeInput("Hello"); assertThat(outputTopic.readValue()).isEqualTo("Hello"); //No more output in topic assertThat(outputTopic.isEmpty()).isTrue(); } } |
...