You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 11 Next »

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state"Under Discussion"

Discussion thread: here 

JIRA: KAFKA-6460

Motivation

This is for adding mocks for state stores used in Streams unit testing. We'd like to use mocks for different types of state stores: KV, window, session - that can be used to record the number of expected put / get calls used in the DSL operator unit testing. These will provide conveniency for developers when they are writing unit test for kafka stream and other related modules.

Public Interfaces

These will be internal classes, so no public API/interface.

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

I will use the KeyValueStore as an example. Window and Session will have a similar structure.

We will provide a MockStoreFactory to generate Mock store builders:


package org.apache.kafka.streams.internals;

public class MockStoreFactory<K, V> {

    final String storeName;
    final Serde<K> keySerde;
    final Serde<V> valueSerde;
    final Time time;
    final Boolean persistent;

    public MockStoreFactory (final String storeName,
                             final Serde<K> keySerde,
                             final Serde<V> valueSerde,
                             final Time time,
                             final Boolean persistent) {

        this.storeName = storeName;
        this.keySerde = keySerde;
        this.valueSerde = valueSerde;
        this.time = time;
        this.persistent = persistent;
    }

    public MockKeyValueStoreBuilder createKeyValueStoreBuilder(){
        return new MockKeyValueStoreBuilder<K,V>(storeName, keySerde, valueSerde, time, persistent);
    }
}

Each Store builder will have a build method:

package org.apache.kafka.streams.internals;

public class MockKeyValueStoreBuilder<K, V>  extends AbstractStoreBuilder<K, V, StateStore> {

    final Boolean persistent;

    public MockKeyValueStoreBuilder(final String storeName,
                                    final Serde<K> keySerde,
                                    final Serde<V> valueSerde,
                                    final Time time,
                                    final Boolean persistent) {
        super(storeName, keySerde, valueSerde, time);
        this.persistent = persistent;
    }

    @Override
    public KeyValueStore build() {
        return new MockKeyValueStore(name, persistent);
    }
}


Then in the Store, we will be a wrapper around a InMemoryStore, and capture all the get/put calls (excluding Iterators)

public class MockKeyValueStore extends InMemoryKeyValueStore {
    private final boolean persistent;

    public boolean initialized = false;
    public boolean flushed = false;

    public final List<KeyValue> capturedPutCalls = new LinkedList<>();
    public final List<KeyValue> capturedGetCalls = new LinkedList<>();
    public final List<KeyValue> capturedDeleteCalls = new LinkedList<>();

    public MockKeyValueStore(final String name,
                             final boolean persistent) {
        super(name);
        this.persistent = persistent;
    }

    @Override
    public void flush() {
        flushed = true;
        super.flush();
    }

    @Override
    public boolean persistent() {
        return persistent;
    }

    @Override
    public void put(final Bytes key, final byte[] value) {
        super.put(key, value);
        capturedPutCalls.add(new KeyValue(key, value));
    }

    @Override
    public byte[] get(final Bytes key) {
        byte[] value = super.get(key);
        capturedGetCalls.add(new KeyValue(key, value));
        return value;
    }

    @Override
    public byte[] delete(final Bytes key) {
        byte[] value = super.delete(key);
        capturedDeleteCalls.add(new KeyValue(key, value));
        return value;
    }

    @Override
    public byte[] putIfAbsent(final Bytes key, final byte[] value) {
        final byte[] originalValue = get(key);
        if (originalValue == null) {
            put(key, value);
            capturedPutCalls.add(new KeyValue(key, value));
        }
        return originalValue;
    }

    @Override
    public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
        for (final KeyValue<Bytes, byte[]> entry : entries) {
            put(entry.key, entry.value);
            capturedPutCalls.add(entry);
        }
    }
}


Compatibility, Deprecation, and Migration Plan

Until we are refactoring with these new MockStateStores, there shouldn't be any compatibility issues. But the next phase should be refactoring, including:

1) Remove and refactor redundant MockStores (i.e. org.apache.kafka.test.MockKeyValueStore)

2) Examine the current tests (i.e. org.apache.kafka.streams.TopologyTest) and refactor with the new MockStateStores.

Rejected Alternatives

1) Rebuilding a MockStateStores vs extending an InMemoryStore

  • If we are rebuilding the functionality of a store from scratch in memory, it will basically act like an InMemoryStore.

2) Track all calls in a total order.

  • Now I haven't get a feature request to track calls within a total order, so now we track the order separately. If there is a need, we can definitely adjust it.



  • No labels