Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Serde replaced by Serializer / Deserializer

...

Code Block
languagejava
titleTopologyTestDriver
package org.apache.kafka.streams;

public class TopologyTestDriver {
    public TopologyTestDriver(Topology topology, Properties config); // existing constructor
    @Deprecate public TopologyTestDriver(Topology topology, Properties config, long initialWallClockTimeMs);
    public TopologyTestDriver(Topology topology, Properties config, Instant initialWallClockTime);

    @Deprecate public void advanceWallClockTime(long advanceMs); // can trigger wall-clock-time punctuation
	public void advanceWallClockTime(Duration advance); // can trigger wall-clock-time punctuation

    //Deprecate old pipe and read methods
    @Deprecate public void pipeInput(ConsumerRecord<byte[], byte[]> record); // can trigger event-time punctuation
    @Deprecate public void pipeInput(List<ConsumerRecord<byte[], byte[]>> records); // can trigger event-time punctuation
    @Deprecate public ProducerRecord<byte[], byte[]> readOutput(String topic);
    @Deprecate public <K, V> ProducerRecord<K, V> readOutput(String topic, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer);

	// methods for TestTopic object creation
	public final <K, V> TestOutputTopic<K, V> createOutputTopic(final String topicName, final Serde<K>Serializer<K> keySerdekeySerializer, final Serde<V>Serializer<V> valueSerdevalueSerializer);
	// Uses current system time as start timestamp. Auto-advance is disabled.
	public final <K, V> TestInputTopic<K, V> createInputTopic(final String topicName, final Serde<K>Deserializer<K> keySerdekeyDeserializer, final Serde<V>Deserializer<V> valueSerdevalueDeserializer);
    //Uses provided startTimestamp and autoAdvance duration for timestamp generation
	public final <K, V> TestInputTopic<K, V> createInputTopic(final String topicName, final Serde<K>Deserializer<K> keySerdekeyDeserializer, final Serde<V>Deserializer<V> valueSerdevalueDeserializer, final Instant startTimestamp, final Duration autoAdvance);

    ...
}

...

Code Block
languagejava
titleExample
public class SimpleTopicTest {

  private TopologyTestDriver testDriver;
  private TestInputTopic<String, String> inputTopic;
  private TestOutputTopic<String, String> outputTopic;

  @Before
  public void setup() {
    testDriver = new TopologyTestDriver(TestStream.getTopology(), TestStream.getConfig());
	inputTopic = testDriver.createInputTopic(TestStream.INPUT_TOPIC, new Serdes.StringSerdeStringDeserializer(), new Serdes.StringSerdeStringDeserializer());
    outputTopic = testDriver.createOutputTopic(TestStream.OUTPUT_TOPIC, new Serdes.StringSerdeStringSerializer(), new Serdes.LongSerdeLongSerializer());
  }

  @After
  public void tearDown() {
      testDriver.close();
  }

  @Test
  public void testOneWord() {
    //Feed word "Hello" to inputTopic and no kafka key, timestamp is irrelevant in this case
    inputTopic.pipeInput("Hello");
    assertThat(outputTopic.readValue()).isEqualTo("Hello");
    //No more output in topic
    assertThat(outputTopic.isEmpty()).isTrue();
  }
}

...