Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
package org.apache.kafka.streams;

public class MockProcessorContext extends ProcessorContext {
    public MockProcessorContext(final String applicationId, final TaskId taskId, final Properties config);
 
    @Override StreamsMetrics metrics(); // return a StreamsMetrics instance for the processor and test code to use

    // high level metadata inherited from ProcessorContext (set in constructor) ====================================================
 
    @Override String applicationId();

    @Override TaskId taskId();

    @Override Map<String, Object> appConfigs();

    @Override Map<String, Object> appConfigsWithPrefix(String prefix);

    @Override Serde<?> keySerde();

    @Override Serde<?> valueSerde();

 
    @Override File stateDir(); // throws an UnsupportedOperationException

    // record metadata ============================================================

    // setters provided for test code to use
    void setRecordMetadata(String topic, int partition, long offset, long timestamp);

    void setRecordMetadataTopic(String topic);

    void setRecordMetadataPartition(int partition);

    void setRecordMetadataOffset(long offset);

    void setRecordMetadataTimestamp(long timestamp);

    // getters inherited from ProcessorContext
    @Override String topic();

    @Override int partition();

    @Override long offset();

    @Override long timestamp();

    // mocked methods =============================================================
 
    // StateStore setter and getter
    @Override void register(StateStore store, boolean loggingEnabledIsDeprecatedAndIgnored, StateRestoreCallback stateRestoreCallback);

    @Override StateStore getStateStore(String name);

 
    // Punctuator capture
    @Override Cancellable schedule(long intervalMs, PunctuationType type, Punctuator callback);
    @Override void schedule(long interval); // throw UnsupportedOperationException
    List<CapturedPunctuator> scheduledPunctuators();

    @Override// voidcaptures scheduleforward(long interval);
) data
    @Override <K, V> void forward(K key, V value);

    @Override <K, V> void forward(K key, V value, int childIndex);

    @Override <K, V> void forward(K key, V value, String childName);

    // returns captured forward data
    <K, V> List<KeyValue<K, V>> forwarded();

    <K, V> List<KeyValue<K, V>> forwarded(int childIndex);

    <K, V> List<KeyValue<K, V>> forwarded(String childName);

    // clears captured forward data
    void resetForwards();

    @Override void// captures whether commit();

 gets called
    @Override booleanvoid committedcommit();

    // true voidiff resetCommitscommit();

 has been called
 public static class CapturedPunctuator {boolean committed();

    // resets whether commit() privatehas final long intervalMs;been called
    void resetCommits();

	// structure for privatecapturing punctuators finalin PunctuationType punctuationType;to schedule()
    public static class  private final Punctuator punctuator;

CapturedPunctuator {
        public CapturedPunctuator(final long intervalMs, final PunctuationType punctuationType, final Punctuator punctuator) {
            this.intervalMs = intervalMs;
            this.punctuationType = punctuationType;
            this.punctuator = punctuator;
        }

        public long getIntervalMs() { return intervalMs;}

        public PunctuationType getPunctuationType() { return punctuationType;}

        public Punctuator getPunctuator() { return punctuator;}
    }
}


Proposed ChangesTODO

We will add the above test utility to the "kafka-streams-test-utils" artifact.

In the initial release, we mark all new classes with annotation @Evolving

Compatibility, Deprecation, and Migration Plan

...