THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.streams; public class TopologyTestDriver { public TopologyTestDriver(Topology topology, Properties config); // existing constructor @Deprecate public TopologyTestDriver(Topology topology, Properties config, long initialWallClockTimeMs); public TopologyTestDriver(Topology topology, Properties config, Instant initialWallClockTime); @Deprecate public void advanceWallClockTime(long advanceMs); // can trigger wall-clock-time punctuation public void advanceWallClockTime(Duration advance); // can trigger wall-clock-time punctuation //Deprecate old pipe and read methods @Deprecate public void pipeInput(ConsumerRecord<byte[], byte[]> record); // can trigger event-time punctuation @Deprecate public void pipeInput(List<ConsumerRecord<byte[], byte[]>> records); // can trigger event-time punctuation @Deprecate public ProducerRecord<byte[], byte[]> readOutput(String topic); @Deprecate public <K, V> ProducerRecord<K, V> readOutput(String topic, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer); // methods for TestTopic object creation public final <K, V> TestOutputTopic<K, V> createOutputTopic(final String topicName, final Serde<K>Serializer<K> keySerdekeySerializer, final Serde<V>Serializer<V> valueSerdevalueSerializer); // Uses current system time as start timestamp. Auto-advance is disabled. public final <K, V> TestInputTopic<K, V> createInputTopic(final String topicName, final Serde<K>Deserializer<K> keySerdekeyDeserializer, final Serde<V>Deserializer<V> valueSerdevalueDeserializer); //Uses provided startTimestamp and autoAdvance duration for timestamp generation public final <K, V> TestInputTopic<K, V> createInputTopic(final String topicName, final Serde<K>Deserializer<K> keySerdekeyDeserializer, final Serde<V>Deserializer<V> valueSerdevalueDeserializer, final Instant startTimestamp, final Duration autoAdvance); ... } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
public class SimpleTopicTest { private TopologyTestDriver testDriver; private TestInputTopic<String, String> inputTopic; private TestOutputTopic<String, String> outputTopic; @Before public void setup() { testDriver = new TopologyTestDriver(TestStream.getTopology(), TestStream.getConfig()); inputTopic = testDriver.createInputTopic(TestStream.INPUT_TOPIC, new Serdes.StringSerdeStringDeserializer(), new Serdes.StringSerdeStringDeserializer()); outputTopic = testDriver.createOutputTopic(TestStream.OUTPUT_TOPIC, new Serdes.StringSerdeStringSerializer(), new Serdes.LongSerdeLongSerializer()); } @After public void tearDown() { testDriver.close(); } @Test public void testOneWord() { //Feed word "Hello" to inputTopic and no kafka key, timestamp is irrelevant in this case inputTopic.pipeInput("Hello"); assertThat(outputTopic.readValue()).isEqualTo("Hello"); //No more output in topic assertThat(outputTopic.isEmpty()).isTrue(); } } |
...