Table of Contents |
---|
Status
Current state: Under Discussion Draft
Discussion thread: TODO
JIRA:
Jira | ||||||||
---|---|---|---|---|---|---|---|---|
|
...
Code Block | ||
---|---|---|
| ||
package org.apache.kafka.streams; public class MockProcessorContext extends ProcessorContext { public MockProcessorContext(final String applicationId, final TaskId taskId, final Properties config); @Override StreamsMetrics metrics(); // high level metadata ==================================================== @Override String applicationId(); @Override TaskId taskId(); @Override Map<String, Object> appConfigs(); @Override Map<String, Object> appConfigsWithPrefix(String prefix); @Override Serde<?> keySerde(); @Override Serde<?> valueSerde(); @Override File stateDir(); // record metadata ======================================================== void setRecordMetadata(String topic, int partition, long offset, long timestamp); void setRecordMetadataTopic(String topic); void setRecordMetadataPartition(int partition); void setRecordMetadataOffset(long offset); void setRecordMetadataTimestamp(long timestamp); @Override String topic(); @Override int partition(); @Override long offset(); @Override long timestamp(); // mocked methods ========================================================= @Override void register(StateStore store, boolean loggingEnabledIsDeprecatedAndIgnored, StateRestoreCallback stateRestoreCallback); @Override StateStore getStateStore(String name); @Override Cancellable schedule(long intervalMs, PunctuationType type, Punctuator callback); List<CapturedPunctuator> scheduledPunctuators(); @Override void schedule(long interval); @Override <K, V> void forward(K key, V value); @Override <K, V> void forward(K key, V value, int childIndex); @Override <K, V> void forward(K key, V value, String childName); <K, V> List<KeyValue<K, V>> forwarded(); <K, V> List<KeyValue<K, V>> forwarded(int childIndex); <K, V> List<KeyValue<K, V>> forwarded(String childName); void resetForwards(); @Override void commit(); boolean committed(); void resetCommits(); public static class CapturedPunctuator { private final long intervalMs; private final PunctuationType punctuationType; private final Punctuator punctuator; public CapturedPunctuator(final long intervalMs, final PunctuationType punctuationType, final Punctuator punctuator) { this.intervalMs = intervalMs; this.punctuationType = punctuationType; this.punctuator = punctuator; } public long getIntervalMs() { return intervalMs;} public PunctuationType getPunctuationType() { return punctuationType;} public Punctuator getPunctuator() { return punctuator;} } } | ||
| ||
package org.apache.kafka.streams.test;
public class ConsumerRecordFactory<K, V> {
// default
public ConsumerRecordFactory(String defaultTopicName, Serializer<K> keySerializer, Serializer<V> valueSerializer); // initialized startTimestampMs with System.currentTimeMillis() and autoAdvanceMs with zero
public ConsumerRecordFactory(String defaultTopicName, Serializer<K> keySerializer, Serializer<V> valueSerializer, long startTimestampMs);
public ConsumerRecordFactory(String defaultTopicName, Serializer<K> keySerializer, Serializer<V> valueSerializer, long startTimestampMs, long autoAdvanceMs);
// no default topic name; requires to specify topic name in #create(...)
public ConsumerRecordFactory(Serializer<K> keySerializer, Serializer<V> valueSerializer);
public ConsumerRecordFactory(Serializer<K> keySerializer, Serializer<V> valueSerializer, long startTimestampMs);
public ConsumerRecordFactory(Serializer<K> keySerializer, Serializer<V> valueSerializer, long startTimestampMs, long autoAdvanceMs);
public void advanceTimeMs(long advanceMs);
// create single records with default topic name
public ConsumerRecord<byte[], byte[]> create(K key, V value, long timestampMs);
public ConsumerRecord<byte[], byte[]> create(K key, V value);
public ConsumerRecord<byte[], byte[]> create(V value, long timestampMs);
public ConsumerRecord<byte[], byte[]> create(V value);
// create list of records with default topic name
public List<ConsumerRecord<byte[], byte[]>> create(List<KeyValue<K, V>> keyValues);
public List<ConsumerRecord<byte[], byte[]>> create(List<KeyValue<K, V>> keyValues, long startTimestamp, long advanceMs);
public List<ConsumerRecord<byte[], byte[]>> create(List<KeyValue<K, V>> keyValues, long startTimestamp);
// overwrite default topic name
public ConsumerRecord<byte[], byte[]> create(String topicName, K key, V value, long timestampMs);
public ConsumerRecord<byte[], byte[]> create(String topicName, K key, V value);
public ConsumerRecord<byte[], byte[]> create(String topicName, V value, long timestampMs);
public ConsumerRecord<byte[], byte[]> create(String topicName, V value);
// those methods allow to access regular as well as global stores
public List<ConsumerRecord<byte[], byte[]>> create(String topicName, List<KeyValue<K, V>> keyValues);
public List<ConsumerRecord<byte[], byte[]>> create(String topicName, List<KeyValue<K, V>> keyValues, long startTimestamp, long advanceMs);
public List<ConsumerRecord<byte[], byte[]>> create(String topicName, List<KeyValue<K, V>> keyValues, long startTimestamp);
} |
Code Block | ||
---|---|---|
| ||
package org.apache.kafka.streams.test;
public class OutputVerifier {
public static <K,V> void compareValue(ProducerRecord<K, V> record, V expectedValue) throws AssertionError;
public static <K,V> void compareValue(ProducerRecord<K, V> record, ProducerRecord<K, V> expectedRecord) throws AssertionError;
public static <K,V> void compareKeyValue(ProducerRecord<K, V> record, K expectedKey, V expectedValue) throws AssertionError;
public static <K,V> void compareKeyValue(ProducerRecord<K, V> record, ProducerRecord<K, V> expectedRecord) throws AssertionError;
public static <K,V> void compareValueTimestamp(ProducerRecord<K, V> record, V expectedValue, long expectedTimestamp) throws AssertionError;
public static <K,V> void compareValueTimestamp(ProducerRecord<K, V> record, ProducerRecord<K, V> expectedRecord) throws AssertionError;
public static <K,V> void compareKeyValueTimestamp(ProducerRecord<K, V> record, K expectedKey, V expectedValue, long expectedTimestamp) throws AssertionError;
public static <K,V> void compareKeyValueTimestamp(ProducerRecord<K, V> record, ProducerRecord<K, V> expectedRecord) throws AssertionError;
} |
Proposed Changes
We are adding the above described test helper classes in a new artifact streams-test-utils
such that people can easily include it as a dependency to their build. The main test class is the test driver while the others a auxiliary classes to generate test data etc.
For time base operations it's important to allow fine grained controll over time. Therefore, we provide a mock time class that can be ingested into the driver and can be used to generte input data. To avoid testing boiler plate code, we provide a rich amount of overload methods and a ConsumerRecordFactory
that simplifies to generate ConsumeRecords<byte[],byte[]>
instead of using the verbose ConsumerRecord
constructor.
Proposed Changes
TODOIn the initla release, we mark all new classes with annotation @Evolving
.
Compatibility, Deprecation, and Migration Plan
...