THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/**
* Get a facade wrapping the local {@link StateStore} instances with the provided {@code storeName} if the Store's
* type is accepted by the provided {@link QueryableStoreType#accepts(StateStore) queryableStoreType}.
* The returned object can be used to query the {@link StateStore} instances.
*
* Only permits queries on active replicas of the store (no standbys or restoring replicas).
* See {@link KafkaStreams#store(StoreQueryParams)}
* for the option to set {@code StoreQueryParams.withAllPartitionAndStaleStoresEnabled or StoreQueryParams.withPartitionAndStaleStoresEnabled(final Integer partition)} and trade off consistency in favor of availability.
*
* @param storeName name of the store to find
* @param queryableStoreType accept only stores that are accepted by {@link QueryableStoreType#accepts(StateStore)}
* @param <T> return type
* @return A facade wrapping the local {@link StateStore} instances
* @throws InvalidStateStoreException if Kafka Streams is (re-)initializing or a store with {@code storeName} and
* {@code queryableStoreType} doesn't exist
*/
public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType) {
return store(new StoreQueryParams<T>(storeName, queryableStoreType));
}
/**
* Get a facade wrapping the local {@link StateStore} instances with the provided {@link StoreQueryParams}.
* StoreQueryParams need required parameters to be set, which are {@code storeName} and if
* type is accepted by the provided {@link QueryableStoreType#accepts(StateStore) queryableStoreType}.
* The optional parameters to the StoreQueryParams include {@code partition} and {@code includeStaleStores}.
* The returned object can be used to query the {@link StateStore} instances.
*
* @param storeQueryParams If storeQueryParams.withPartition(int partition) is used, it allow queries on the specific partition irrespective if it is a standby
* or a restoring replicas in addition to active ones.
* If storeQueryParams.withIncludeStaleStores() is used, it allow queries on standbys and restoring replicas in addition to active ones for all the local partitions on the instance.
* If StoreQueryParams.withIncludeStaleStores().withPartition(int partition), it allow queries on the specific partition irrespective if it is a standby
* or a restoring replicas in addition to active ones..
* By default, if just storeQueryParams is used, it returns all the local partitions for the store which are in running state.
* @param <T> return type
* @return A facade wrapping the local {@link StateStore} instances
* @throws InvalidStateStoreException if Kafka Streams is (re-)initializing or a store with {@code storeName} and
* {@code queryableStoreType} doesn't exist
*/
public <T> T store(final StoreQueryParams<T> storeQueryParams) {
validateIsRunningOrRebalancing();
return queryableStoreProvider.getStore(storeQueryParams);
}
|
...