...
This is for adding mocks for state stores used in Streams unit testing. We'd like to use mocks for different types of state stores: KV, window, session - that can be used to record the number of expected put / get calls used in the DSL operator unit testing. These will provide convenience for developers when they are writing unit test for Kafka stream and other related modules.
Public Interfaces
These will be internal classes, so no public API/interface.
Proposed Changes
We add some new classes to the kafka-streams-test-utils
under a new org.apache.kafka.streams.state package.
We will provide a MockStoreFactory to generate Mock store builders:
...
Code Block |
---|
public class MockKeyValueStore extends InMemoryKeyValueStore { private final boolean persistent; public boolean initialized = false; public boolean flushed = false; public final List<KeyValue> capturedPutCalls = new LinkedList<>(); public final List<KeyValue> capturedGetCalls = new LinkedList<>(); public final List<KeyValue> capturedDeleteCalls = new LinkedList<>(); public MockKeyValueStore(final String name, final boolean persistent) { super(name); this.persistent = persistent; } @Override public void flush() { flushed = true; super.flush(); } @Override public boolean persistent() { return persistent; } @Override public void put(final Bytes key, final byte[] value) { super.put(key, value); capturedPutCalls.add(new KeyValue(key, value)); } @Override public byte[] get(final Bytes key) { byte[] value = super.get(key); capturedGetCalls.add(new KeyValue(key, value)); return value; } @Override public byte[] delete(final Bytes key) { byte[] value = super.delete(key); capturedDeleteCalls.add(new KeyValue(key, value)); return value; } @Override public byte[] putIfAbsent(final Bytes key, final byte[] value) { final byte[] originalValue = get(key); if (originalValue == null) { put(key, value); capturedPutCalls.add(new KeyValue(key, value)); } return originalValue; } @Override public void putAll(final List<KeyValue<Bytes, byte[]>> entries) { for (final KeyValue<Bytes, byte[]> entry : entries) { put(entry.key, entry.value); capturedPutCalls.add(entry); } } } |
Proposed Changes
Compatibility, Deprecation, and Migration Plan
...