Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleKafkaStreams.java
collapsetrue
         /**
     * Get a facade wrapping the local {@link StateStore} instances with the provided {@code storeName} if the Store's
     * type is accepted by the provided {@link QueryableStoreType#accepts(StateStore) queryableStoreType}.
     * The returned object can be used to query the {@link StateStore} instances.
     *
     * Only permits queries on active replicas of the store (no standbys or restoring replicas).
     * See {@link KafkaStreams#store(String, QueryableStoreType, StoreQueryParams)}
     * for the option to set {@code StoreQueryParams.withAllPartitionAndStaleStoresEnabled or StoreQueryParams.withPartitionAndStaleStoresEnabled(final Integer partition)} and trade off consistency in favor of availability.
     *
     * @param storeName           name of the store to find
     * @param queryableStoreType  accept only stores that are accepted by {@link QueryableStoreType#accepts(StateStore)}
     * @param <T>                 return type
     * @return A facade wrapping the local {@link StateStore} instances
     * @throws InvalidStateStoreException if Kafka Streams is (re-)initializing or a store with {@code storeName} and
     * {@code queryableStoreType} doesn't exist
     */
    public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType) {
        return store(new StoreQueryParams<T>(storeName, queryableStoreType, StoreQueryParams.withAllPartitionAndStaleStoresDisabled());
    }

    /**
     * Get a facade wrapping the local {@link StateStore} instances with the provided {@link StoreQueryParams}.
     * StoreQueryParams need required parameters to be set, which are {@code storeName} if the Store'sand if
     * type is accepted by the provided {@link QueryableStoreType#accepts(StateStore) queryableStoreType}.
     * The optional parameters to the StoreQueryParams include {@code partition} and {@code includeStaleStores}.
     * The returned object can be used to query the {@link StateStore} instances.
     *
     * @param storeNamestoreQueryParams    If storeQueryParams.withPartition(int partition) is used, it allow namequeries ofon the storespecific to find
     * @param queryableStoreType  accept only stores that are accepted by {@link QueryableStoreType#accepts(StateStore)}
partition irrespective if it is a standby
     *              * @param storeQueryParams    If StoreQueryParams.withAllPartitionAndStaleStoresDisabled() is used, it only permit queries onor thea activerestoring replicas in foraddition allto theactive partitionsones.
     *                            available on the local instance, and only if the
     *            If storeQueryParams.withIncludeStaleStores() is used, it allow queries on standbys and restoring replicas in addition to active ones for all the local partitions on the instance.
     *           task for that partition is running. I.e., the state store is not a standby replica,
   If  *                StoreQueryParams.withIncludeStaleStores().withPartition(int partition), it allow queries on the specific partition irrespective if it is a standby
     *            and it is not restoring from the changelog.
     *    or a restoring replicas in addition to active ones..
     *           If StoreQueryParams.withPartitionAndStaleStoresEnabled(final Integer partition) is used, it only permit queries on the specific provided active replicas
  By default, if *just storeQueryParams is used, it returns all the local partitions for the store which are in running state.
     * @param <T>    for the partition provided in the parameter, and only if the
     *                            task for that partition is running. I.e., the state store is not a standby replica,
     *                            and it is not restoring from the changelog.
     *                            If StoreQueryParams.withAllPartitionAndStaleStoresEnabled(), allow queries on standbys and restoring replicas in addition to active ones.
     *                            If StoreQueryParams.withPartitionAndStaleStoresEnabled(final Integer partition), allow queries on the specific partition irrespective if it is a standby
     *                            or a restoring replicas in addition to active ones.
     * @param <T>                 return type
     * @return A facade wrapping the local {@link StateStore} instances
     * @throws InvalidStateStoreException if Kafka Streams is (re-)initializing or a store with {@code storeName} andreturn type
     * {@code@return queryableStoreType} doesn't exist
     */
    public <T> T store(final String storeName,
      A facade wrapping the local {@link StateStore} instances
     * @throws InvalidStateStoreException if Kafka Streams is (re-)initializing or a store with final{@code QueryableStoreType<T>storeName} queryableStoreType,and
     * {@code queryableStoreType} doesn't exist
     */
    public <T> T   store(final StoreQueryParamsStoreQueryParams<T> storeQueryParams) {
        validateIsRunningOrRebalancing();
        return queryableStoreProvider.getStore(storeName, queryableStoreType, storeQueryParams);
    }



Proposed Changes:

  • Add a new public class StoreQueryParams.java to set options for what kind of stores a user wants.
  • Create a taskId from the combination of store name and partition provided by the user.
  • In StreamThreadStateStoreProvider.java return only the stores for the task requested by the user and also check the condition to return only running stores or standby/recovering stores as well.

...