THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
package org.apache.kafka.streams.internals;
public class MockStoreFactory<K, V> {
public final Map<String, StoreBuilder> stateStores = new LinkedHashMap<>();
public MockStoreFactory () {
}
public KeyValueStoreBuilder createKeyValueStoreBuilder(KeyValueBytesStoreSupplier keyValueBytesStoreSupplier,
final Serde<K> keySerde,
final Serde<V> valueSerde,
boolean persistent){
String storeName = keyValueBytesStoreSupplier.name();
stateStores.put(storeName, new MockKeyValueStoreBuilder<>(storeName, keyValueBytesStoreSupplier, keySerde, valueSerde, persistent));
return (KeyValueStoreBuilder)stateStores.get(storeName);
}
public WindowStoreBuilder createWindowStoreBuilder(KeyValueBytesStoreSupplier keyValueBytesStoreSupplier,
final Serde<K> keySerde,
final Serde<V> valueSerde,
final Time time){
...
}
public SessionStoreBuilder createSessionStoreBuilder(KeyValueBytesStoreSupplier keyValueBytesStoreSupplier,
final Serde<K> keySerde,
final Serde<V> valueSerde,
final Time time){
...
}
public StoreBuilder getStore(String storeName) {
return stateStores.get(storeName);
}
} |
...
Code Block |
---|
package org.apache.kafka.streams.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.MockTime; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.internals.AbstractStoreBuilder; public class MockKeyValueStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, KeyValueStore> { private final boolean persistent; private final KeyValueBytesStoreSupplier storeSupplier; public MockKeyValueStoreBuilder(final String storeName, final KeyValueBytesStoreSupplier storeSupplier, final Serde<K> keySerde, final Serde<V> valueSerde, final boolean persistent) { super(storeNamestoreSupplier.name(), keySerde, valueSerde, new MockTime(0)); this.persistent = persistent; this.storeSupplier = storeSupplier; } @Override public KeyValueStore build() { return new MockKeyValueStore<>(name, storeSupplier.get(), persistent); } } |
...