THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
package org.apache.kafka.streams; public class MockProcessorContext extends ProcessorContext { public MockProcessorContext(final String applicationId, final TaskId taskId, final Properties config); @Override StreamsMetrics metrics(); // return a StreamsMetrics instance for the processor and test code to use // high level metadata inherited from ProcessorContext (set in constructor) ==================================================== @Override String applicationId(); @Override TaskId taskId(); @Override Map<String, Object> appConfigs(); @Override Map<String, Object> appConfigsWithPrefix(String prefix); @Override Serde<?> keySerde(); @Override Serde<?> valueSerde(); @Override File stateDir(); // throws an UnsupportedOperationException // record metadata ============================================================ // setters provided for test code to use void setRecordMetadata(String topic, int partition, long offset, long timestamp); void setRecordMetadataTopic(String topic); void setRecordMetadataPartition(int partition); void setRecordMetadataOffset(long offset); void setRecordMetadataTimestamp(long timestamp); // getters inherited from ProcessorContext @Override String topic(); @Override int partition(); @Override long offset(); @Override long timestamp(); // mocked methods ============================================================= // StateStore setter and getter @Override void register(StateStore store, boolean loggingEnabledIsDeprecatedAndIgnored, StateRestoreCallback stateRestoreCallback); @Override StateStore getStateStore(String name); // Punctuator capture @Override Cancellable schedule(long intervalMs, PunctuationType type, Punctuator callback); @Override void schedule(long interval); // throw UnsupportedOperationException List<CapturedPunctuator> scheduledPunctuators(); @Override// voidcaptures scheduleforward(long interval); ) data @Override <K, V> void forward(K key, V value); @Override <K, V> void forward(K key, V value, int childIndex); @Override <K, V> void forward(K key, V value, String childName); // returns captured forward data <K, V> List<KeyValue<K, V>> forwarded(); <K, V> List<KeyValue<K, V>> forwarded(int childIndex); <K, V> List<KeyValue<K, V>> forwarded(String childName); // clears captured forward data void resetForwards(); @Override void// captures whether commit(); gets called @Override booleanvoid committedcommit(); // true voidiff resetCommitscommit(); has been called public static class CapturedPunctuator {boolean committed(); // resets whether commit() privatehas final long intervalMs;been called void resetCommits(); // structure for privatecapturing punctuators finalin PunctuationType punctuationType;to schedule() public static class private final Punctuator punctuator; CapturedPunctuator { public CapturedPunctuator(final long intervalMs, final PunctuationType punctuationType, final Punctuator punctuator) { this.intervalMs = intervalMs; this.punctuationType = punctuationType; this.punctuator = punctuator; } public long getIntervalMs() { return intervalMs;} public PunctuationType getPunctuationType() { return punctuationType;} public Punctuator getPunctuator() { return punctuator;} } } |
Proposed ChangesTODO
We will add the above test utility to the "kafka-streams-test-utils" artifact.
In the initial release, we mark all new classes with annotation @Evolving
Compatibility, Deprecation, and Migration Plan
...