Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleKafkaStreams.java
collapsetrue
 public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType) {
 return store(storeName, queryableStoreType, new StoreQueryParams(null, false));
 }   


public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType, final StoreQueryParams storeQueryParams) {
validateIsRunningOrRebalancing();
return queryableStoreProvider.getStore(storeName, queryableStoreType, storeQueryParams);
}



Code Block
languagejava
titleQueryableStoreProvider.java
collapsetrue
 public <T> T getStore(final String storeName, final QueryableStoreType<T> queryableStoreType, final StoreQueryParams storeQueryParams) {
        final List<T> globalStore = globalStoreProvider.stores(storeName, queryableStoreType);
        if (!globalStore.isEmpty()) {
            return queryableStoreType.create(globalStoreProvider, storeName);
        }
        final List<T> allStores = new ArrayList<>();
        for (final StreamThreadStateStoreProvider storeProvider : storeProviders) {
            allStores.addAll(storeProvider.stores(storeName, queryableStoreType, storeQueryParams));
        }
        if (allStores.isEmpty()) {
            throw new InvalidStateStoreException("The state store, " + storeName + ", may have migrated to another instance.");
        }
        return queryableStoreType.create(
            new WrappingStoreProvider(storeProviders, storeQueryParams),
            storeName
        );
 }

Proposed Changes:

  • Add a new public class StoreQueryParams.java to set options for what kind of stores a user wants.
  • Create a taskId from the combination of store name and partition provided by the user.
  • In StreamThreadStateStoreProvider.java return only the stores for the task requested by the user and also check the condition to return only running stores or standby/recovering stores as well.

...