You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current state: Under Discussion

Discussion thread: TODO

JIRA Unable to render Jira issues macro, execution error.

Released: target version 1.2.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

KIP-247 adds support for testing a complete topology, but authors of Processor, Transformer, and ValueTransformer implementations can benefit from writing lighter and faster unit tests.

This isn't impossible today, but it requires writing some fairly complicated mock code for the ProcessorContext. We want to simplify this task by providing a general purpose ProcessorContext for unit testing.

Public Interface

 

package org.apache.kafka.streams;

public class MockProcessorContext extends ProcessorContext {
    public MockProcessorContext(final String applicationId, final TaskId taskId, final Properties config);
 
    @Override StreamsMetrics metrics();

    // high level metadata ====================================================
    @Override String applicationId();

    @Override TaskId taskId();

    @Override Map<String, Object> appConfigs();

    @Override Map<String, Object> appConfigsWithPrefix(String prefix);

    @Override Serde<?> keySerde();

    @Override Serde<?> valueSerde();

    @Override File stateDir();

    // record metadata ========================================================
    void setRecordMetadata(String topic, int partition, long offset, long timestamp);

    void setRecordMetadataTopic(String topic);

    void setRecordMetadataPartition(int partition);

    void setRecordMetadataOffset(long offset);

    void setRecordMetadataTimestamp(long timestamp);

    @Override String topic();

    @Override int partition();

    @Override long offset();

    @Override long timestamp();

    // mocked methods =========================================================
    @Override void register(StateStore store, boolean loggingEnabledIsDeprecatedAndIgnored, StateRestoreCallback stateRestoreCallback);

    @Override StateStore getStateStore(String name);

    @Override Cancellable schedule(long intervalMs, PunctuationType type, Punctuator callback);

    List<CapturedPunctuator> scheduledPunctuators();

    @Override void schedule(long interval);

    @Override <K, V> void forward(K key, V value);

    @Override <K, V> void forward(K key, V value, int childIndex);

    @Override <K, V> void forward(K key, V value, String childName);

    <K, V> List<KeyValue<K, V>> forwarded();

    <K, V> List<KeyValue<K, V>> forwarded(int childIndex);

    <K, V> List<KeyValue<K, V>> forwarded(String childName);

    void resetForwards();

    @Override void commit();

    boolean committed();

    void resetCommits();

    public static class CapturedPunctuator {
        private final long intervalMs;
        private final PunctuationType punctuationType;
        private final Punctuator punctuator;

        public CapturedPunctuator(final long intervalMs, final PunctuationType punctuationType, final Punctuator punctuator) {
            this.intervalMs = intervalMs;
            this.punctuationType = punctuationType;
            this.punctuator = punctuator;
        }

        public long getIntervalMs() { return intervalMs;}

        public PunctuationType getPunctuationType() { return punctuationType;}

        public Punctuator getPunctuator() { return punctuator;}
    }
}

package org.apache.kafka.streams.test;

public class ConsumerRecordFactory<K, V> {

    // default 
    public ConsumerRecordFactory(String defaultTopicName, Serializer<K> keySerializer, Serializer<V> valueSerializer); // initialized startTimestampMs with System.currentTimeMillis() and autoAdvanceMs with zero
    public ConsumerRecordFactory(String defaultTopicName, Serializer<K> keySerializer, Serializer<V> valueSerializer, long startTimestampMs);
    public ConsumerRecordFactory(String defaultTopicName, Serializer<K> keySerializer, Serializer<V> valueSerializer, long startTimestampMs, long autoAdvanceMs);

    // no default topic name; requires to specify topic name in #create(...)
    public ConsumerRecordFactory(Serializer<K> keySerializer, Serializer<V> valueSerializer);
    public ConsumerRecordFactory(Serializer<K> keySerializer, Serializer<V> valueSerializer, long startTimestampMs);
    public ConsumerRecordFactory(Serializer<K> keySerializer, Serializer<V> valueSerializer, long startTimestampMs, long autoAdvanceMs);

    public void advanceTimeMs(long advanceMs);

    // create single records with default topic name
    public ConsumerRecord<byte[], byte[]> create(K key, V value, long timestampMs);
    public ConsumerRecord<byte[], byte[]> create(K key, V value);
    public ConsumerRecord<byte[], byte[]> create(V value, long timestampMs);
    public ConsumerRecord<byte[], byte[]> create(V value);


    // create list of records with default topic name
    public List<ConsumerRecord<byte[], byte[]>> create(List<KeyValue<K, V>> keyValues);
    public List<ConsumerRecord<byte[], byte[]>> create(List<KeyValue<K, V>> keyValues, long startTimestamp, long advanceMs);
    public List<ConsumerRecord<byte[], byte[]>> create(List<KeyValue<K, V>> keyValues, long startTimestamp);


    // overwrite default topic name
    public ConsumerRecord<byte[], byte[]> create(String topicName, K key, V value, long timestampMs);
    public ConsumerRecord<byte[], byte[]> create(String topicName, K key, V value);
    public ConsumerRecord<byte[], byte[]> create(String topicName, V value, long timestampMs);
    public ConsumerRecord<byte[], byte[]> create(String topicName, V value);

    // those methods allow to access regular as well as global stores
    public List<ConsumerRecord<byte[], byte[]>> create(String topicName, List<KeyValue<K, V>> keyValues);
    public List<ConsumerRecord<byte[], byte[]>> create(String topicName, List<KeyValue<K, V>> keyValues, long startTimestamp, long advanceMs);
    public List<ConsumerRecord<byte[], byte[]>> create(String topicName, List<KeyValue<K, V>> keyValues, long startTimestamp);

}
package org.apache.kafka.streams.test;

public class OutputVerifier {

    public static <K,V> void compareValue(ProducerRecord<K, V> record, V expectedValue) throws AssertionError;
    public static <K,V> void compareValue(ProducerRecord<K, V> record, ProducerRecord<K, V> expectedRecord) throws AssertionError;

    public static <K,V> void compareKeyValue(ProducerRecord<K, V> record, K expectedKey, V expectedValue) throws AssertionError;
    public static <K,V> void compareKeyValue(ProducerRecord<K, V> record, ProducerRecord<K, V> expectedRecord) throws AssertionError;

    public static <K,V> void compareValueTimestamp(ProducerRecord<K, V> record, V expectedValue, long expectedTimestamp) throws AssertionError;
    public static <K,V> void compareValueTimestamp(ProducerRecord<K, V> record, ProducerRecord<K, V> expectedRecord) throws AssertionError;

    public static <K,V> void compareKeyValueTimestamp(ProducerRecord<K, V> record, K expectedKey, V expectedValue, long expectedTimestamp) throws AssertionError;
    public static <K,V> void compareKeyValueTimestamp(ProducerRecord<K, V> record, ProducerRecord<K, V> expectedRecord) throws AssertionError;

}

 

Proposed Changes

We are adding the above described test helper classes in a new artifact streams-test-utils such that people can easily include it as a dependency to their build. The main test class is the test driver while the others a auxiliary classes to generate test data etc.

For time base operations it's important to allow fine grained controll over time. Therefore, we provide a mock time class that can be ingested into the driver and can be used to generte input data. To avoid testing boiler plate code, we provide a rich amount of overload methods and a ConsumerRecordFactory that simplifies to generate ConsumeRecords<byte[],byte[]> instead of using the verbose ConsumerRecord constructor.

In the initla release, we mark all new classes with annotation @Evolving.

Compatibility, Deprecation, and Migration Plan

We are only adding new classes. There are no compatiblity issues.

Test Plan

We need to test all added classes with unit tests. Integration or system test are not required.

Rejected Alternatives

None.

  • No labels