Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleKafkaStreams.java
collapsetrue
     /**
     * Get a facade wrapping the local {@link StateStore} instances with the provided {@code storeName} if the Store's
     * type is accepted by the provided {@link QueryableStoreType#accepts(StateStore) queryableStoreType}.
     * The returned object can be used to query the {@link StateStore} instances.
     *
     * Only permits queries on active replicas of the store (no standbys or restoring replicas).
     * See {@link KafkaStreams#store(StoreQueryParams)}
     * for the option to set {@code StoreQueryParams.withAllPartitionAndStaleStoresEnabled or StoreQueryParams.withPartitionAndStaleStoresEnabled(final Integer partition)} and trade off consistency in favor of availability.
     *
     * @param storeName           name of the store to find
     * @param queryableStoreType  accept only stores that are accepted by {@link QueryableStoreType#accepts(StateStore)}
     * @param <T>                 return type
     * @return A facade wrapping the local {@link StateStore} instances
     * @throws InvalidStateStoreException if Kafka Streams is (re-)initializing or a store with {@code storeName} and
     * {@code queryableStoreType} doesn't exist
     */
    public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType) {
        return store(new StoreQueryParams<T>(storeName, queryableStoreType));
    }

    /**
     * Get a facade wrapping the local {@link StateStore} instances with the provided {@link StoreQueryParams}.
     * StoreQueryParams need required parameters to be set, which are {@code storeName} and if
     * type is accepted by the provided {@link QueryableStoreType#accepts(StateStore) queryableStoreType}.
     * The optional parameters to the StoreQueryParams include {@code partition} and {@code includeStaleStores}.
     * The returned object can be used to query the {@link StateStore} instances.
     *
     * @param storeQueryParams    If storeQueryParams.withPartition(int partition) is used, it allow queries on the specific partition irrespective if it is a standby
     *                            or a restoring replicas in addition to active ones.
     *                            If storeQueryParams.withIncludeStaleStores() is used, it allow queries on standbys and restoring replicas in addition to active ones for all the local partitions on the instance.
     *                            If StoreQueryParams.withIncludeStaleStores().withPartition(int partition), it allow queries on the specific partition irrespective if it is a standby
     *                            or a restoring replicas in addition to active ones..
     *                            By default, if just storeQueryParams is used, it returns all the local partitions for the store which are in running state.
     * @param <T>                 return type
     * @return A facade wrapping the local {@link StateStore} instances
     * @throws InvalidStateStoreException if Kafka Streams is (re-)initializing or a store with {@code storeName} and
     * {@code queryableStoreType} doesn't exist which are provided in {@code storeQueryParams}
     */
    public <T> T store(final StoreQueryParams<T> storeQueryParams) {
        validateIsRunningOrRebalancing();
        return queryableStoreProvider.getStore(storeQueryParams);
    }

...