Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleKafkaStreams.java
collapsetrue
   @Deprecated
 	public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType) {
        return store(StoreQueryParams.fromNameAndType(storeName, queryableStoreType));
    }     

    /**
     * Get a facade wrapping the local {@link StateStore} instances with the provided {@link StoreQueryParams}.
     * StoreQueryParams need required parameters to be set, which are {@code storeName} and if
     * type is accepted by the provided {@link QueryableStoreType#accepts(StateStore) queryableStoreType}.
     * The optional parameters to the StoreQueryParams include {@code partition} and {@code includeStaleStores}.
     * The returned object can be used to query the {@link StateStore} instances.
     *
     * @param storeQueryParams    If StoreQueryParams.fromNameAndType(storeName, queryableStoreType).withPartition(int partition) is used, it allow queries on the specific partition irrespective if it is a standby
     *                            or a restoring replicas in addition to active ones.
     *                            If StoreQueryParams.fromNameAndType(storeName, queryableStoreType).withIncludeStaleStores() is used, it allow queries on standbys and restoring replicas in addition to active ones for all the local partitions on the instance.
     *                            If StoreQueryParams.fromNameAndType(storeName, queryableStoreType).withIncludeStaleStores().withPartition(int partition), it allow queries on the specific partition irrespective if it is a standby
     *                            or a restoring replicas in addition to active ones..
     *                            By default, if just storeQueryParams is used, it returns all the local partitions for the store which are in running state.
     * @param <T>                 return type
     * @return A facade wrapping the local {@link StateStore} instances
     * @throws InvalidStateStoreException if Kafka Streams is (re-)initializing or a store with {@code storeName} and
     * {@code queryableStoreType} doesn't exist
     */
    public <T> T store(final StoreQueryParams<T> storeQueryParams) {
        validateIsRunningOrRebalancing();
        return queryableStoreProvider.getStore(storeQueryParams);
    }


Proposed Changes:

  • Add a new public class StoreQueryParams.java to set options for what kind of stores a user wants.
  • Create a taskId from the combination of store name and partition provided by the user.
  • In StreamThreadStateStoreProvider.java return only the stores for the task requested by the user and also check the condition to return only running stores or standby/recovering stores as well.

...