Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion Merged (target: 1.2: https://github.com/apache/kafka/pull/4736)

Discussion thread: TODO [DISCUSS] KIP-267: Add Processor Unit Test Support to Kafka Streams Test Utils

JIRA

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-6473

Released: target version 1.2.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Code Block
languagejava
package org.apache.kafka.streams.processor;

public class MockProcessorContext extends ProcessorContext {
    public MockProcessorContext(final String applicationId, final TaskId taskId, final Properties config);
    public MockProcessorContext(final String applicationId, final TaskId taskId, final Properties config, final File stateDir);
 
    @Override StreamsMetrics metrics(); // return a StreamsMetrics instance for the processor and test code to use

    // high level metadata ====================================================inherited from ProcessorContext (set in constructor) ===
 
    @Override String applicationId();

    @Override TaskId taskId();

    @Override Map<String, Object> appConfigs();

    @Override Map<String, Object> appConfigsWithPrefix(String prefix);

    @Override Serde<?> keySerde();

    @Override Serde<?> valueSerde();

    @Override File stateDir();

    // record metadata ============================================================

    // setters provided for test code to use
    void setRecordMetadata(String topic, int partition, long offset, long timestamp);

    void setRecordMetadataTopic(String topic);

    void setRecordMetadataPartition(int partition);

    void setRecordMetadataOffset(long offset);

    void setRecordMetadataTimestamp(long timestamp);

    // getters inherited from ProcessorContext
    @Override String topic();

    @Override int partition();

    @Override long offset();

    @Override long timestamp();

    // mocked methods =============================================================
 
    // StateStore setter and getter
    @Override void register(StateStore store, boolean loggingEnabledIsDeprecatedAndIgnored, StateRestoreCallback stateRestoreCallback);

    @Override StateStore getStateStore(String name);

 
    // Punctuator capture: NOTE: punctuators will not be triggered automatically. Instead, they'll be captured so test code can retrieve and trigger them if desired.
    @Override Cancellable schedule(long intervalMs, PunctuationType type, Punctuator callback);
    @Override void schedule(long interval); // throw UnsupportedOperationException
    List<CapturedPunctuator> scheduledPunctuators();

    @Override// voidcaptures scheduleforward(long) interval);data

    @Override <K, V> void forward(K key, V value);

    @Override <K, V> void forward(K key, V value, int childIndex);

    @Override <K, V> void forward(K key, V value, String childName);

    // returns captured forward data
    <K, V> List<KeyValue<K, V>> forwarded();

    <K, V> List<KeyValue<K, V>> forwarded(int childIndex);

    <K, V> List<KeyValue<K, V>> forwarded(String childName);

    // clears captured forward data
    void resetForwards();

    @Override void// captures whether commit();
 gets called
    boolean@Override void committedcommit();

    void resetCommits// true iff commit();

 has been called
 public static class CapturedPunctuator {boolean committed();

    // resets whether commit() privatehas final long intervalMs;been called
    void resetCommits();

	// structure for privatecapturing punctuators finalin PunctuationType punctuationType;to schedule()
    public static class  private final Punctuator punctuator;

CapturedPunctuator {
        public CapturedPunctuator(final long intervalMs, final PunctuationType punctuationType, final Punctuator punctuator) {
            this.intervalMs = intervalMs;
            this.punctuationType = punctuationType;
            this.punctuator = punctuator;
        }

        public long getIntervalMs() { return intervalMs;}

        public PunctuationType getPunctuationType() { return punctuationType;}

        public Punctuator getPunctuator() { return punctuator;}
    }
}

Code Block
languagejava
package org.apache.kafka.streams.test;

public class ConsumerRecordFactory<K, V> {

    // default 
    public ConsumerRecordFactory(String defaultTopicName, Serializer<K> keySerializer, Serializer<V> valueSerializer); // initialized startTimestampMs with System.currentTimeMillis() and autoAdvanceMs with zero
    public ConsumerRecordFactory(String defaultTopicName, Serializer<K> keySerializer, Serializer<V> valueSerializer, long startTimestampMs);
    public ConsumerRecordFactory(String defaultTopicName, Serializer<K> keySerializer, Serializer<V> valueSerializer, long startTimestampMs, long autoAdvanceMs);

    // no default topic name; requires to specify topic name in #create(...)
    public ConsumerRecordFactory(Serializer<K> keySerializer, Serializer<V> valueSerializer);
    public ConsumerRecordFactory(Serializer<K> keySerializer, Serializer<V> valueSerializer, long startTimestampMs);
    public ConsumerRecordFactory(Serializer<K> keySerializer, Serializer<V> valueSerializer, long startTimestampMs, long autoAdvanceMs);

    public void advanceTimeMs(long advanceMs);

    // create single records with default topic name
    public ConsumerRecord<byte[], byte[]> create(K key, V value, long timestampMs);
    public ConsumerRecord<byte[], byte[]> create(K key, V value);
    public ConsumerRecord<byte[], byte[]> create(V value, long timestampMs);
    public ConsumerRecord<byte[], byte[]> create(V value);


    // create list of records with default topic name
    public List<ConsumerRecord<byte[], byte[]>> create(List<KeyValue<K, V>> keyValues);
    public List<ConsumerRecord<byte[], byte[]>> create(List<KeyValue<K, V>> keyValues, long startTimestamp, long advanceMs);
    public List<ConsumerRecord<byte[], byte[]>> create(List<KeyValue<K, V>> keyValues, long startTimestamp);


    // overwrite default topic name
    public ConsumerRecord<byte[], byte[]> create(String topicName, K key, V value, long timestampMs);
    public ConsumerRecord<byte[], byte[]> create(String topicName, K key, V value);
    public ConsumerRecord<byte[], byte[]> create(String topicName, V value, long timestampMs);
    public ConsumerRecord<byte[], byte[]> create(String topicName, V value);

    // those methods allow to access regular as well as global stores
    public List<ConsumerRecord<byte[], byte[]>> create(String topicName, List<KeyValue<K, V>> keyValues);
    public List<ConsumerRecord<byte[], byte[]>> create(String topicName, List<KeyValue<K, V>> keyValues, long startTimestamp, long advanceMs);
    public List<ConsumerRecord<byte[], byte[]>> create(String topicName, List<KeyValue<K, V>> keyValues, long startTimestamp);

}
Code Block
languagejava
package org.apache.kafka.streams.test;

public class OutputVerifier {

    public static <K,V> void compareValue(ProducerRecord<K, V> record, V expectedValue) throws AssertionError;
    public static <K,V> void compareValue(ProducerRecord<K, V> record, ProducerRecord<K, V> expectedRecord) throws AssertionError;

    public static <K,V> void compareKeyValue(ProducerRecord<K, V> record, K expectedKey, V expectedValue) throws AssertionError;
    public static <K,V> void compareKeyValue(ProducerRecord<K, V> record, ProducerRecord<K, V> expectedRecord) throws AssertionError;

    public static <K,V> void compareValueTimestamp(ProducerRecord<K, V> record, V expectedValue, long expectedTimestamp) throws AssertionError;
    public static <K,V> void compareValueTimestamp(ProducerRecord<K, V> record, ProducerRecord<K, V> expectedRecord) throws AssertionError;

    public static <K,V> void compareKeyValueTimestamp(ProducerRecord<K, V> record, K expectedKey, V expectedValue, long expectedTimestamp) throws AssertionError;
    public static <K,V> void compareKeyValueTimestamp(ProducerRecord<K, V> record, ProducerRecord<K, V> expectedRecord) throws AssertionError;

}

 

Proposed Changes

We are adding the above described test helper classes in a new artifact streams-test-utils such that people can easily include it as a dependency to their build. The main test class is the test driver while the others a auxiliary classes to generate test data etc.

For time base operations it's important to allow fine grained controll over time. Therefore, we provide a mock time class that can be ingested into the driver and can be used to generte input data. To avoid testing boiler plate code, we provide a rich amount of overload methods and a ConsumerRecordFactory that simplifies to generate ConsumeRecords<byte[],byte[]> instead of using the verbose ConsumerRecord constructor.

In the initla release, we mark all new classes with annotation @Evolving.


Proposed Changes

We add the above new class to the kafka-streams-test-utils artifact (introduced in 1.1.0).

In the initial release, we mark our new class with annotation @Evolving.

All the methods annotated with @Override are implementing the ProcessorContext interface. 

We add new methods for the purposes of setting some properties (like topic, partition, etc.) that the processor may want to access.

We also capture data that the processor may put into the context, like k/v pairs in calls to forward(), but also state stores that get registered, punctuators that get scheduled, and whether commit() gets called.

We provide a mechanism to reset the commit and forwarded data captures to make it easy to make assertions about the behavior of the processor after various calls to Processor#process().

There is one method we plan not to implement:

  • void schedule(long interval): it's not possible to implement it without providing a handle on the processor to the context. This is a little tricky, since the mocked ProcessorContext may be used to test not just a Processor, but also a Transformer or a ValueTransformer. Since this method is deprecated in the interface, we feel it's reasonable to update client code to use Punctuators instead. This method will throw an UnsupportedOperationException.

Also note that we won't automatically trigger sheduled punctuators. Instead, we'll just capture them so test code can retrieve and trigger them if desired. This keeps the mock truly a "mock" and not a "driver", which maintains the simplicity of the code and enables users to write linear and sensible tests. If folks want to have their punctuators triggered automatically, they can use the TopologyTestDriver, which does that. This is a potential sharp edge, so I'm planning to make sure it's well documented in both the javadoc and html docs.

 

Example Usage

This mocked ProcessorContext would enable unit tests like the following:

Code Block
languagejava
// ===========================
// Initialize the test harness
 
final Properties config = new Properties();
final MockProcessorContext context = new MockProcessorContext("test-app", new TaskId(0, 0), config);
final KeyValueStore<String, Long> myStore = ...;
context.register(myStore, false, null);
final Processor<String, Long> processor = new MyProcessor();
processor.init(context);

// ==============================
// Verify some processor behavior
processor.process("a", 1L);
processor.process("a", 2L);
final KeyValue<Object, Object> capture1 = context.forwarded().get(0);
Assert.assertEquals(new KeyValue<>("a", 1L), capture1);
final KeyValue<Object, Object> capture2 = context.forwarded().get(1);
Assert.assertEquals(new KeyValue<>("a", 2L), capture2);

Assert.assertTrue(context.committed());

context.resetForwards();
context.resetCommits();

processor.process("a", 3L);
final KeyValue<Object, Object> capture3 = context.forwarded().get(0);
Assert.assertEquals(new KeyValue<>("a", 3L), capture3);
Assert.assertFalse(context.committed());
 
// ===============================
// Verify some Punctuator behavior
context.scheduledPunctuators().get(0).getPunctuator().punctuate(0L);
Assert.assertNull(myStore.get("a")); // assuming the Processor registers a Punctuator that clears this key

 

Compatibility, Deprecation, and Migration Plan

We are only adding a new classesclass. There are no compatiblity issues.

...