You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

Status

Current state: Draft

Discussion thread: TODO

JIRA Unable to render Jira issues macro, execution error.

Released: target version 1.2.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

KIP-247 adds support for testing a complete topology, but authors of Processor, Transformer, and ValueTransformer implementations can benefit from writing lighter and faster unit tests.

This isn't impossible today, but it requires writing some fairly complicated mock code for the ProcessorContext. We want to simplify this task by providing a general purpose ProcessorContext for unit testing.

Public Interface

 

package org.apache.kafka.streams;

public class MockProcessorContext extends ProcessorContext {
    public MockProcessorContext(final String applicationId, final TaskId taskId, final Properties config);
 
    @Override StreamsMetrics metrics(); // return a StreamsMetrics instance for the processor and test code to use

    // high level metadata inherited from ProcessorContext (set in constructor) ===
 
    @Override String applicationId();
    @Override TaskId taskId();
    @Override Map<String, Object> appConfigs();
    @Override Map<String, Object> appConfigsWithPrefix(String prefix);
    @Override Serde<?> keySerde();
    @Override Serde<?> valueSerde();

 
    @Override File stateDir(); // throws an UnsupportedOperationException

    // record metadata ============================================================

    // setters provided for test code to use
    void setRecordMetadata(String topic, int partition, long offset, long timestamp);
    void setRecordMetadataTopic(String topic);
    void setRecordMetadataPartition(int partition);
    void setRecordMetadataOffset(long offset);
    void setRecordMetadataTimestamp(long timestamp);

    // getters inherited from ProcessorContext
    @Override String topic();
    @Override int partition();
    @Override long offset();
    @Override long timestamp();

    // mocked methods =============================================================
 
    // StateStore setter and getter
    @Override void register(StateStore store, boolean loggingEnabledIsDeprecatedAndIgnored, StateRestoreCallback stateRestoreCallback);
    @Override StateStore getStateStore(String name);

 
    // Punctuator capture
    @Override Cancellable schedule(long intervalMs, PunctuationType type, Punctuator callback);
    @Override void schedule(long interval); // throw UnsupportedOperationException
    List<CapturedPunctuator> scheduledPunctuators();

    // captures forward() data
    @Override <K, V> void forward(K key, V value);
    @Override <K, V> void forward(K key, V value, int childIndex);
    @Override <K, V> void forward(K key, V value, String childName);

    // returns captured forward data
    <K, V> List<KeyValue<K, V>> forwarded();
    <K, V> List<KeyValue<K, V>> forwarded(int childIndex);
    <K, V> List<KeyValue<K, V>> forwarded(String childName);

    // clears captured forward data
    void resetForwards();

    // captures whether commit() gets called
    @Override void commit();

    // true iff commit() has been called
    boolean committed();

    // resets whether commit() has been called
    void resetCommits();

	// structure for capturing punctuators in to schedule()
    public static class CapturedPunctuator {
        public CapturedPunctuator(final long intervalMs, final PunctuationType punctuationType, final Punctuator punctuator) {}

        public long getIntervalMs();
        public PunctuationType getPunctuationType();
        public Punctuator getPunctuator();
    }
}


Proposed Changes

We will add the above test utility to the "kafka-streams-test-utils" artifact.

In the initial release, we mark all new classes with annotation @Evolving

Compatibility, Deprecation, and Migration Plan

We are only adding new classes. There are no compatiblity issues.

Test Plan

We need to test all added classes with unit tests. Integration or system test are not required.

Rejected Alternatives

None.

  • No labels