Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleTestOutputTopic
package org.apache.kafka.streams;

public class TestOutputTopic<K, V> {
    //Create by TopologyTestDriver, Constructors are package private

	//Method to check queue size
    final long getQueueSize();
    final boolean isEmpty();

	//Methods to readOutput, throw NoSuchElement if no record in queue
    V readValue();
    KeyValue<K, V> readKeyValue();
    ClientRecord<K, V> readRecord();

    //Output as collection
    List<V> readValuesToList();
    Map<K, V> readKeyValuesToMap();
    List<KeyValue<K, V>> readKeyValuesToList();
    List<ClientRecord<K, V>> readRecordToListreadRecordsToList();
}


Code Block
languagejava
titleOutputVerifier
package org.apache.kafka.streams.test;
 
//OutputVerifier modified to accept ClientRecord instead of ProducerRecord
public class OutputVerifier {
 
    public static <K,V> void compareValue(ClientRecord<K, V> record, V expectedValue) throws AssertionError;
    public static <K,V> void compareValue(ClientRecord<K, V> record, ClientRecord<K, V> expectedRecord) throws AssertionError;
 
    public static <K,V> void compareKeyValue(ClientRecord<K, V> record, K expectedKey, V expectedValue) throws AssertionError;
    public static <K,V> void compareKeyValue(ClientRecord<K, V> record, ClientRecord<K, V> expectedRecord) throws AssertionError;
 
    public static <K,V> void compareValueTimestamp(ClientRecord<K, V> record, V expectedValue, long expectedTimestamp) throws AssertionError;
    public static <K,V> void compareValueTimestamp(ClientRecord<K, V> record, ClientRecord<K, V> expectedRecord) throws AssertionError;
 
    public static <K,V> void compareKeyValueTimestamp(ClientRecord<K, V> record, K expectedKey, V expectedValue, long expectedTimestamp) throws AssertionError;
    public static <K,V> void compareKeyValueTimestamp(ClientRecord<K, V> record, ClientRecord<K, V> expectedRecord) throws AssertionError;
 
}

...