THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Get a facade wrapping the local {@link StateStore} instances with the provided {@code storeName} if the Store's * type is accepted by the provided {@link QueryableStoreType#accepts(StateStore) queryableStoreType}. * The returned object can be used to query the {@link StateStore} instances. * * Only permits queries on active replicas of the store (no standbys or restoring replicas). * See {@link KafkaStreams#store(String, QueryableStoreType, StoreQueryParams)} * for the option to set {@code StoreQueryParams.withAllPartitionAndStaleStoresEnabled or StoreQueryParams.withPartitionAndStaleStoresEnabled(final Integer partition)} and trade off consistency in favor of availability. * * @param storeName name of the store to find * @param queryableStoreType accept only stores that are accepted by {@link QueryableStoreType#accepts(StateStore)} * @param <T> return type * @return A facade wrapping the local {@link StateStore} instances * @throws InvalidStateStoreException if Kafka Streams is (re-)initializing or a store with {@code storeName} and * {@code queryableStoreType} doesn't exist */ public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType) { return store(new StoreQueryParams<T>(storeName, queryableStoreType, StoreQueryParams.withAllPartitionAndStaleStoresDisabled()); } /** * Get a facade wrapping the local {@link StateStore} instances with the provided {@link StoreQueryParams}. * StoreQueryParams need required parameters to be set, which are {@code storeName} if the Store'sand if * type is accepted by the provided {@link QueryableStoreType#accepts(StateStore) queryableStoreType}. * The optional parameters to the StoreQueryParams include {@code partition} and {@code includeStaleStores}. * The returned object can be used to query the {@link StateStore} instances. * * @param storeNamestoreQueryParams If storeQueryParams.withPartition(int partition) is used, it allow namequeries ofon the storespecific to find * @param queryableStoreType accept only stores that are accepted by {@link QueryableStoreType#accepts(StateStore)} partition irrespective if it is a standby * * @param storeQueryParams If StoreQueryParams.withAllPartitionAndStaleStoresDisabled() is used, it only permit queries onor thea activerestoring replicas in foraddition allto theactive partitionsones. * available on the local instance, and only if the * If storeQueryParams.withIncludeStaleStores() is used, it allow queries on standbys and restoring replicas in addition to active ones for all the local partitions on the instance. * task for that partition is running. I.e., the state store is not a standby replica, If * StoreQueryParams.withIncludeStaleStores().withPartition(int partition), it allow queries on the specific partition irrespective if it is a standby * and it is not restoring from the changelog. * or a restoring replicas in addition to active ones.. * If StoreQueryParams.withPartitionAndStaleStoresEnabled(final Integer partition) is used, it only permit queries on the specific provided active replicas By default, if *just storeQueryParams is used, it returns all the local partitions for the store which are in running state. * @param <T> for the partition provided in the parameter, and only if the * task for that partition is running. I.e., the state store is not a standby replica, * and it is not restoring from the changelog. * If StoreQueryParams.withAllPartitionAndStaleStoresEnabled(), allow queries on standbys and restoring replicas in addition to active ones. * If StoreQueryParams.withPartitionAndStaleStoresEnabled(final Integer partition), allow queries on the specific partition irrespective if it is a standby * or a restoring replicas in addition to active ones. * @param <T> return type * @return A facade wrapping the local {@link StateStore} instances * @throws InvalidStateStoreException if Kafka Streams is (re-)initializing or a store with {@code storeName} andreturn type * {@code@return queryableStoreType} doesn't exist */ public <T> T store(final String storeName, A facade wrapping the local {@link StateStore} instances * @throws InvalidStateStoreException if Kafka Streams is (re-)initializing or a store with final{@code QueryableStoreType<T>storeName} queryableStoreType,and * {@code queryableStoreType} doesn't exist */ public <T> T store(final StoreQueryParamsStoreQueryParams<T> storeQueryParams) { validateIsRunningOrRebalancing(); return queryableStoreProvider.getStore(storeName, queryableStoreType, storeQueryParams); } |
Proposed Changes:
- Add a new public class StoreQueryParams.java to set options for what kind of stores a user wants.
- Create a taskId from the combination of store name and partition provided by the user.
- In StreamThreadStateStoreProvider.java return only the stores for the task requested by the user and also check the condition to return only running stores or standby/recovering stores as well.
...