Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
package org.apache.kafka.streams;

public class MockProcessorContext extends ProcessorContext {
    public MockProcessorContext(final String applicationId, final TaskId taskId, final Properties config);
    public MockProcessorContext(final String applicationId, final TaskId taskId, final Properties config, final File stateDir);
 
    @Override StreamsMetrics metrics(); // return a StreamsMetrics instance for the processor and test code to use

    // high level metadata inherited from ProcessorContext (set in constructor) ===
 
    @Override String applicationId();
    @Override TaskId taskId();
    @Override Map<String, Object> appConfigs();
    @Override Map<String, Object> appConfigsWithPrefix(String prefix);
    @Override Serde<?> keySerde();
    @Override Serde<?> valueSerde();

 
    @Override File stateDir(); // throws an UnsupportedOperationException

    // record metadata ============================================================

    // setters provided for test code to use
    void setRecordMetadata(String topic, int partition, long offset, long timestamp);
    void setRecordMetadataTopic(String topic);
    void setRecordMetadataPartition(int partition);
    void setRecordMetadataOffset(long offset);
    void setRecordMetadataTimestamp(long timestamp);

    // getters inherited from ProcessorContext
    @Override String topic();
    @Override int partition();
    @Override long offset();
    @Override long timestamp();

    // mocked methods =============================================================
 
    // StateStore setter and getter
    @Override void register(StateStore store, boolean loggingEnabledIsDeprecatedAndIgnored, StateRestoreCallback stateRestoreCallback);
    @Override StateStore getStateStore(String name);

 
    // Punctuator capture
    @Override Cancellable schedule(long intervalMs, PunctuationType type, Punctuator callback);
    @Override void schedule(long interval); // throw UnsupportedOperationException
    List<CapturedPunctuator> scheduledPunctuators();

    // captures forward() data
    @Override <K, V> void forward(K key, V value);
    @Override <K, V> void forward(K key, V value, int childIndex);
    @Override <K, V> void forward(K key, V value, String childName);

    // returns captured forward data
    <K, V> List<KeyValue<K, V>> forwarded();
    <K, V> List<KeyValue<K, V>> forwarded(int childIndex);
    <K, V> List<KeyValue<K, V>> forwarded(String childName);

    // clears captured forward data
    void resetForwards();

    // captures whether commit() gets called
    @Override void commit();

    // true iff commit() has been called
    boolean committed();

    // resets whether commit() has been called
    void resetCommits();

	// structure for capturing punctuators in to schedule()
    public static class CapturedPunctuator {
        public CapturedPunctuator(final long intervalMs, final PunctuationType punctuationType, final Punctuator punctuator) {}

        public long getIntervalMs();
        public PunctuationType getPunctuationType();
        public Punctuator getPunctuator();
    }
}


Proposed Changes

We will add the above test utility new class to the "kafka-streams-test-utils" artifact (introduced in 1.1.0).

In the initial release, we mark all our new classes with annotation @Evolvingclass with annotation @Evolving.

All the methods annotated with @Override are implementing the ProcessorContext interface. 

We add new methods for the purposes of setting some properties (like topic, partition, etc.) that the processor may want to access.

We also capture data that the processor may put into the context, like k/v pairs in calls to forward(), but also state stores that get registered, punctuators that get scheduled, and whether commit() gets called.

We provide a mechanism to reset the commit and forwarded data captures to make it easy to make assertions about the behavior of the processor after various calls to Processor#process().

There is one method we plan not to implement:

  • void schedule(long interval): it's not possible to implement it without providing a handle on the processor to the context. This is a little tricky, since the mocked ProcessorContext may be used to test not just a Processor, but also a Transformer or a ValueTransformer. Since this method is deprecated in the interface, we feel it's reasonable to update client code to use Punctuators instead.

This method will throw an UnsupportedOperationException.

Example Usage

This mocked ProcessorContext would enable unit tests like the following:

Code Block
languagejava
final Properties config = new Properties();
final MockProcessorContext context = new MockProcessorContext("test-app", new TaskId(0, 0), config);
final Processor<String, Long> processor = new MyProcessor();
processor.init(context);

processor.process("a", 1L);
processor.process("a", 2L);
final KeyValue<Object, Object> capture1 = context.forwarded().get(0);
Assert.assertEquals(new KeyValue<>("a", 1L), capture1);
final KeyValue<Object, Object> capture2 = context.forwarded().get(1);
Assert.assertEquals(new KeyValue<>("a", 2L), capture2);

Assert.assertTrue(context.committed());

context.resetForwards();
context.resetCommits();

processor.process("a", 3L);
final KeyValue<Object, Object> capture3 = context.forwarded().get(0);
Assert.assertEquals(new KeyValue<>("a", 3L), capture3);
Assert.assertFalse(context.committed());

 

Compatibility, Deprecation, and Migration Plan

We are only adding a new classesclass. There are no compatiblity issues.

...