Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
package org.apache.kafka.streams.test;

public class ConsumerRecordFactory<K, V> {

    // default 
    public ConsumerRecordFactory(String defaultTopicName, Serializer<K> keySerializer, Serializer<V> valueSerializer); // initialized startTimestampMs with System.currentTimeMillis() and autoAdvanceMs with zero
    public ConsumerRecordFactory(String defaultTopicName, Serializer<K> keySerializer, Serializer<V> valueSerializer, long startTimestampMs);
    public ConsumerRecordFactory(String defaultTopicName, Serializer<K> keySerializer, Serializer<V> valueSerializer, long startTimestampMs, long autoAdvanceMs);

    // no default topic name; requires to specify topic name in #create(...)
    public ConsumerRecordFactory(Serializer<K> keySerializer, Serializer<V> valueSerializer);
    public ConsumerRecordFactory(Serializer<K> keySerializer, Serializer<V> valueSerializer, long startTimestampMs);
    public ConsumerRecordFactory(Serializer<K> keySerializer, Serializer<V> valueSerializer, long startTimestampMs, long autoAdvanceMs);

    public void advanceTimeMs(long advanceMs);

    // create single records with default topic name
    public ConsumerRecord<byte[], byte[]> create(K key, V value, long timestampMs);
    public ConsumerRecord<byte[], byte[]> create(K key, V value);
    public ConsumerRecord<byte[], byte[]> create(V value, long timestampMs);
    public ConsumerRecord<byte[], byte[]> create(V value);


    // create list of records with default topic name
    public List<ConsumerRecord<byte[], byte[]>> create(List<KeyValue<K, V>> keyValues);
    public List<ConsumerRecord<byte[], byte[]>> create(List<KeyValue<K, V>> keyValues, long startTimestamp, long advanceMs);
    public List<ConsumerRecord<byte[], byte[]>> create(List<KeyValue<K, V>> keyValues, long startTimestamp);


    // overwrite default topic name
    public ConsumerRecord<byte[], byte[]> create(String topicName, K key, V value, long timestampMs);
    public ConsumerRecord<byte[], byte[]> create(String topicName, K key, V value);
    public ConsumerRecord<byte[], byte[]> create(String topicName, V value, long timestampMs);
    public ConsumerRecord<byte[], byte[]> create(String topicName, V value);

    // those methods allow to access regular as well as global stores
    public List<ConsumerRecord<byte[], byte[]>> create(String topicName, List<KeyValue<K, V>> keyValues);
    public List<ConsumerRecord<byte[], byte[]>> create(String topicName, List<KeyValue<K, V>> keyValues, long startTimestamp, long advanceMs);
    public List<ConsumerRecord<byte[], byte[]>> create(String topicName, List<KeyValue<K, V>> keyValues, long startTimestamp);

}
Code Block
languagejava
package org.apache.kafka.streams.test;

public class OutputVerifier {

    public static <K,V> void compareValue(ProducerRecord<K, V> record, V expectedValue) throws AssertionError;
    public static <K,V> void compareValue(ProducerRecord<K, V> record, ProducerRecord<K, V> expectedRecord) throws AssertionError;

    public static <K,V> void compareKeyValue(ProducerRecord<K, V> record, K expectedKey, V expectedValue) throws AssertionError;
    public static <K,V> void compareKeyValue(ProducerRecord<K, V> record, ProducerRecord<K, V> expectedRecord) throws AssertionError;

    public static <K,V> void compareValueTimestamp(ProducerRecord<K, V> record, V expectedValue, long expectedTimestamp) throws AssertionError;
    public static <K,V> void compareValueTimestamp(ProducerRecord<K, V> record, ProducerRecord<K, V> expectedRecord, long expectedTimestamp) throws AssertionError;

    public static <K,V> void compareKeyValueTimestamp(ProducerRecord<K, V> record, K expectedKey, V expectedValue, long expectedTimestamp) throws AssertionError;
    public static <K,V> void compareKeyValueTimestamp(ProducerRecord<K, V> record, ProducerRecord<K, V> expectedRecord, long expectedTimestamp) throws AssertionError;

}

 

Proposed Changes

We are adding the above described test helper classes in a new artifact streams-test-utils such that people can easily include it as a dependency to their build. The main test class is the test driver while the others a auxiliary classes to generate test data etc.

...