THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType) { return store(storeName, queryableStoreType, new StoreQueryParams(null, false)); } public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType, final StoreQueryParams storeQueryParams) { validateIsRunningOrRebalancing(); return queryableStoreProvider.getStore(storeName, queryableStoreType, storeQueryParams); } |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public <T> T getStore(final String storeName, final QueryableStoreType<T> queryableStoreType, final StoreQueryParams storeQueryParams) {
final List<T> globalStore = globalStoreProvider.stores(storeName, queryableStoreType);
if (!globalStore.isEmpty()) {
return queryableStoreType.create(globalStoreProvider, storeName);
}
final List<T> allStores = new ArrayList<>();
for (final StreamThreadStateStoreProvider storeProvider : storeProviders) {
allStores.addAll(storeProvider.stores(storeName, queryableStoreType, storeQueryParams));
}
if (allStores.isEmpty()) {
throw new InvalidStateStoreException("The state store, " + storeName + ", may have migrated to another instance.");
}
return queryableStoreType.create(
new WrappingStoreProvider(storeProviders, storeQueryParams),
storeName
);
} |
Proposed Changes:
- Add a new public class StoreQueryParams.java to set options for what kind of stores a user wants.
- Create a taskId from the combination of store name and partition provided by the user.
- In StreamThreadStateStoreProvider.java return only the stores for the task requested by the user and also check the condition to return only running stores or standby/recovering stores as well.
...