Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This is for adding mocks for state stores used in Streams unit testing. We'd like to use mocks for different types of state stores: KV, window, session - that can be used to record the number of expected put / get calls used in the DSL operator unit testing. These will provide convenience for developers when they are writing unit test for Kafka stream and other related modules.

Public Interfaces

These will be internal classes, so no public API/interface.

Proposed Changes

We add some new classes to the kafka-streams-test-utils under a new org.apache.kafka.streams.state package.

We will provide a MockStoreFactory to generate Mock store builders:

...

Code Block
public class MockKeyValueStore extends InMemoryKeyValueStore {
    private final boolean persistent;

    public boolean initialized = false;
    public boolean flushed = false;

    public final List<KeyValue> capturedPutCalls = new LinkedList<>();
    public final List<KeyValue> capturedGetCalls = new LinkedList<>();
    public final List<KeyValue> capturedDeleteCalls = new LinkedList<>();

    public MockKeyValueStore(final String name,
                             final boolean persistent) {
        super(name);
        this.persistent = persistent;
    }

    @Override
    public void flush() {
        flushed = true;
        super.flush();
    }

    @Override
    public boolean persistent() {
        return persistent;
    }

    @Override
    public void put(final Bytes key, final byte[] value) {
        super.put(key, value);
        capturedPutCalls.add(new KeyValue(key, value));
    }

    @Override
    public byte[] get(final Bytes key) {
        byte[] value = super.get(key);
        capturedGetCalls.add(new KeyValue(key, value));
        return value;
    }

    @Override
    public byte[] delete(final Bytes key) {
        byte[] value = super.delete(key);
        capturedDeleteCalls.add(new KeyValue(key, value));
        return value;
    }

    @Override
    public byte[] putIfAbsent(final Bytes key, final byte[] value) {
        final byte[] originalValue = get(key);
        if (originalValue == null) {
            put(key, value);
            capturedPutCalls.add(new KeyValue(key, value));
        }
        return originalValue;
    }

    @Override
    public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
        for (final KeyValue<Bytes, byte[]> entry : entries) {
            put(entry.key, entry.value);
            capturedPutCalls.add(entry);
        }
    }
}


Proposed Changes


Compatibility, Deprecation, and Migration Plan

...