Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Whenever a call is made to get a particular key from a Kafka Streams instance, currently it returns a Queryable store that contains a list of the stores for all the running and restoring/replica(with KIP-535: Allow state stores to serve stale reads during rebalance) on the instance via StreamThreadStateStoreProvider#stores(). This list of stores is then provided to CompositeReadOnlyKeyValueStore#get() which looks into each store one by one. With the changes that went in as a part of KIP-535 since we have access to the information that a key belongs to which partition, we should have the capability to fetch store for that particular partition and look for the key in store for that partition only. It would be a good improvement for improving latencies for applications that contain multiple partitions on a single instance and don't have bloom filters enabled internally for RocksDB.

So, while adding the new functionality to query a particular partition of a store, it makes sense to combine all the input parameters to KafkaStreams#store() under a new public class StoreQueryParams for ease of use. This requires changes in the public API, namely KafkaStreams.

Public Interfaces:

Adding new Class StoreQueryParams.java to provide user options to the QueryableStoreProvider layer to understand what kind of stores a user wants. It would currently include whether a user is okay with serving stale data and if user already knows what is the partition of the store a user is looking at. Since store name and partition would be a unique combination, a taskId can be generated from this information to return the store for that particular task.

...

Code Block
languagejava
titleStoreQueryParams.java
collapsetrue

package org.apache.kafka.streams;

/**
 * RepresentsLicensed allto the queryApache optionsSoftware thatFoundation a(ASF) userunder canone provideor tomore
 state* whatcontributor kindlicense ofagreements. storesSee itthe isNOTICE expecting.file Thedistributed optionswith
 would* bethis whetherwork afor useradditional wouldinformation wantregarding to enable/disable stale stores* or whether it knows the specific partition that it specifically wants to fetch. If this information is not provided the default behavior is to fetch the stores for all the partitions available on that instance* for that particular store name.
 * It contains a partition, which for a point queries can be populated from the  KeyQueryMetadata.
 */
public class StoreQueryParams {

    private final Integer partition;
    private final boolean includeStaleStores;

    public static final StoreQueryParams withPartitionAndStaleStoresDisabled(final Integer partition) {
        return new StoreQueryParams(partition, false);
    }

    public static final StoreQueryParams withPartitionAndStaleStoresEnabled(final Integer partition) {
        return new StoreQueryParams(partition, true);
    }

    public static final StoreQueryParams withAllPartitionAndStaleStoresDisabled() {
        return new StoreQueryParams(null, false);
    }

    public static final StoreQueryParams withAllPartitionAndStaleStoresEnabled() {
        return new StoreQueryParams(null, true);
    }

    private StoreQueryParams(final Integer partition, final boolean includeStaleStores) {
        this.partition = partition;
        this.includeStaleStores = includeStaleStores;
    }


    /**
     * Get the partition to be used to fetch list of Queryable store from QueryableStoreProvider.
     *
     * @return an Integer partition
     */
    public Integer getPartition() {
        return partition;
    }

    /**
     * Get the flag includeStaleStores. If true, include standbys and recovering stores along with running stores
     *
     * @return boolean includeStaleStores
     */
    public boolean includeStaleStores() {
        return includeStaleStores;
    }

    /**
     * Get whether the store query params are fetching all partitions or a single partition.
     *
     * @return boolean. True, if all partitions are requests or false if a specific partition is requested
     */
    public boolean getAllLocalPartitions() {
        return partition == null ? true : false;
    }

    @Override
    public boolean equals(final Object obj) {
        if (!(obj instanceof StoreQueryParams)) {
            return false;copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.streams;

import org.apache.kafka.streams.state.QueryableStoreType;

import java.util.Objects;

/**
 * Represents all the query options that a user can provide to state what kind of stores it is expecting. The options would be whether a user would want to enable/disable stale stores* or whether it knows the list of partitions that it specifically wants to fetch. If this information is not provided the default behavior is to fetch the stores for all the partitions available on that instance* for that particular store name.
 * It contains a partition, which for a point queries can be populated from the  KeyQueryMetadata.
 */
public class StoreQueryParams<T> {

    private Integer partition;
    private boolean includeStaleStores;
    private final String storeName;
    private final QueryableStoreType<T> queryableStoreType;

    public StoreQueryParams(final String storeName, final QueryableStoreType<T>  queryableStoreType) {
        this.storeName = storeName;
        this.queryableStoreType = queryableStoreType;
    }

    public StoreQueryParams withPartition(final Integer partition) {
        }
this.partition = partition;
      final StoreQueryParams storeQueryParams = (StoreQueryParams) objreturn this;
    }

    public returnStoreQueryParams Objects.equals(storeQueryParams.partition, partition)
withIncludeStaleStores(final boolean includeStaleStores) {
        this.includeStaleStores = includeStaleStores;
        && Objects.equals(storeQueryParams.includeStaleStores, includeStaleStores)return this;
    }


    @Override/**
     public* StringGet toString() {
        return "StoreQueryParams {" +
the partition to be used to fetch list of Queryable store from QueryableStoreProvider.
     *
     * @return an Integer partition
   "partition=" + partition +*/
    public Integer getPartition() {
        return ", includeStaleStores=" + includeStaleStores +partition;
    }

    /**
     * Get the flag includeStaleStores. If  '}';
    }


    @Override
    public int hashCode() {true, include standbys and recovering stores along with running stores
     *
     *   return Objects.hash(partition,@return boolean includeStaleStores);
     }
}
Code Block
languagejava
titleKafkaStreams.java
collapsetrue
     /***/
    public boolean includeStaleStores() {
     * Get a facadereturn wrappingincludeStaleStores;
 the local {@link StateStore}

  instances with the/**
 provided {@code storeName} if the Store's
     * type* Get the store name for which key is acceptedqueried by the provided {@link QueryableStoreType#accepts(StateStore) queryableStoreType}user.
     * The returned object can be used to query the {@link StateStore} instances.
     * @return String storeName
     */
    public String * Only permits queries on active replicas of the store (no standbys or restoring replicas).getStoreName() {
        return storeName;
    }

     /**
 See {@link KafkaStreams#store(String, QueryableStoreType, StoreQueryParams)}
     * for the option to set {@code StoreQueryParams.withAllPartitionAndStaleStoresEnabled or StoreQueryParams.withPartitionAndStaleStoresEnabled(final Integer partition)} and trade off consistency in favor of availability. * Get the queryable store type for which key is queried by the user.
     *
     * @return QueryableStoreType queryableStoreType
     */
    public QueryableStoreType<T> getQueryableStoreType() *{
 @param storeName      return queryableStoreType;
    name of the store to find}

    /**
     * Get whether @paramthe queryableStoreTypestore query acceptparams onlyare storesfetching thatall arepartitions acceptedor bya {@link QueryableStoreType#accepts(StateStore)}single partition.
     *
  @param <T>  * @return boolean. True, if all partitions are requests or false if a specific partition returnis typerequested
     */
 @return A facade wrappingpublic theboolean localgetAllLocalPartitions() {@link StateStore} instances
     * @throws InvalidStateStoreException ifreturn Kafkapartition Streams== is (re-)initializing or a store with {@code storeName} and
     * {@code queryableStoreType} doesn't existnull ? true : false;
    }

     */@Override
    public <T>boolean T storeequals(final String storeName, final QueryableStoreType<T> queryableStoreTypeObject obj) {
        returnif store(!(storeName,obj queryableStoreType,instanceof StoreQueryParams.withAllPartitionAndStaleStoresDisabled()); {
    }

    /**
    return *false;
 Get a facade wrapping the local {@link StateStore}
 instances with the provided {@code storeName} if the Store's
     * type is accepted by the provided {@link QueryableStoreType#accepts(StateStore) queryableStoreType}.
     * The returned object can be used to query the {@link StateStore} instances.
     *
final StoreQueryParams storeQueryParams = (StoreQueryParams) obj;
        return Objects.equals(storeQueryParams.partition, partition)
                && Objects.equals(storeQueryParams.includeStaleStores, includeStaleStores)
      * @param storeName        && Objects.equals(storeQueryParams.storeName, storeName)
 name of the store to find
     * @param queryableStoreType  accept only stores that are accepted by {@link QueryableStoreType#accepts(StateStore)}&& Objects.equals(storeQueryParams.queryableStoreType, queryableStoreType);
    }

     * @param storeQueryParams@Override
    public IfString StoreQueryParams.withAllPartitionAndStaleStoresDisabledtoString() is{
 used, it only permit queries on the activereturn replicas"StoreQueryParams for all the partitions{" +
     *            "partition=" + partition +
             available on the local instance", andincludeStaleStores=" only+ ifincludeStaleStores the+
     *           ", storeName=" + storeName   +
           task for that partition is running. I.e. ", thequeryableStoreType=" state+ store is not a standby replica,queryableStoreType +
     *           '}';
    }

    @Override
    public int hashCode() {
  and it is not restoring from thereturn Objects.hash(partition, includeStaleStores, storeName, queryableStoreType);
    }
}


Code Block
languagejava
titleKafkaStreams.java
collapsetrue
changelog.
     *       /**
     * Get a facade wrapping the local {@link StateStore} instances with the provided {@code storeName} if If StoreQueryParams.withPartitionAndStaleStoresEnabled(final Integer partition) is used, it only permit queries on the specific provided active replicasthe Store's
     * type is accepted by the provided {@link QueryableStoreType#accepts(StateStore) queryableStoreType}.
     * The returned object can be used to query the {@link StateStore} instances.
     *
     * Only permits queries on active replicas forof the partitionstore provided(no instandbys theor parameter, and only if therestoring replicas).
     * See        {@link KafkaStreams#store(String, QueryableStoreType, StoreQueryParams)}
     * for the option to set {@code StoreQueryParams.withAllPartitionAndStaleStoresEnabled or StoreQueryParams.withPartitionAndStaleStoresEnabled(final Integer partition)} and trade taskoff forconsistency thatin partitionfavor isof running. I.e., the state store is not a standby replica,availability.
     *
     * @param storeName           name of the store to find
     * @param queryableStoreType  accept andonly itstores isthat notare restoringaccepted fromby the changelog.{@link QueryableStoreType#accepts(StateStore)}
     * @param <T>                 return type
     * @return A If StoreQueryParams.withAllPartitionAndStaleStoresEnabled(), allow queries on standbys and restoring replicas in addition to active ones.
     *        facade wrapping the local {@link StateStore} instances
     * @throws InvalidStateStoreException if Kafka Streams is (re-)initializing or a store with {@code storeName} and
     * {@code queryableStoreType} doesn't exist
     */
    public <T> IfT StoreQueryParams.withPartitionAndStaleStoresEnabledstore(final IntegerString partition)storeName, allow queries on the specific partition irrespective if it is a standby
     *final QueryableStoreType<T> queryableStoreType) {
        return store(storeName, queryableStoreType, StoreQueryParams.withAllPartitionAndStaleStoresDisabled());
    }

    /**
     * Get a facade wrapping the local {@link StateStore} instances with the provided {@code storeName} orif a restoring replicas in addition to active ones.
     * @param <T> the Store's
     * type is accepted by the provided {@link QueryableStoreType#accepts(StateStore) queryableStoreType}.
     * The returned object can be used to query the {@link returnStateStore} typeinstances.
     * @return
 A facade wrapping the local {@link StateStore} instances
  * @param storeName    * @throws InvalidStateStoreException if Kafka Streams is (re-)initializing or aname of the store with {@code storeName} andto find
     * {@code@param queryableStoreType} doesn't exist
accept only stores that are */
accepted by   public <T> T store(final String storeName,
{@link QueryableStoreType#accepts(StateStore)}
     * @param storeQueryParams    If StoreQueryParams.withAllPartitionAndStaleStoresDisabled() is used, it only permit queries on the active replicas for finalall QueryableStoreType<T>the queryableStoreType,partitions
     *                  final StoreQueryParams storeQueryParams) {
       available validateIsRunningOrRebalancing();
on the local instance,     return queryableStoreProvider.getStore(storeName, queryableStoreType, storeQueryParams);
and only if the
      }


Code Block
languagejava
titleQueryableStoreProvider.java
collapsetrue
  /**
*        * Get a composite object wrapping the instances of the {@link StateStore} with the provided
     * storeNametask andfor {@link QueryableStoreType}
     *
     * @param storeNamethat partition is running. I.e., the state store is not a standby replica,
     *      name of the store
     * @param queryableStoreType accept stores passing {@link QueryableStoreType#accepts(StateStore)}
     * @param storeQueryParamsand it is not restoring from the ifchangelog.
 stateStoresEnabled is used i.e. includeStaleStores* is true, include standbys and recovering stores;
     *                If StoreQueryParams.withPartitionAndStaleStoresEnabled(final Integer partition) is used, it only permit queries on the specific provided active replicas
     *    if stateStoresDisabled i.e. includeStaleStores is false, only include running actives;
											  if partition is null then it fetches all local partitions on the instance;
											  if for the partition isprovided setin thenthe itparameter, fetchesand aonly specificif partition.the
     * @param <T>                The expected type of the returned store
     * @return A compositetask objectfor that wrapspartition theis store instances.
     */
    public <T> T getStore(final String storeName,
running. I.e., the state store is not a standby replica,
     *                          final QueryableStoreType<T> queryableStoreType,
and it is not restoring from the changelog.
     *              final StoreQueryParams storeQueryParams) {
        final List<T> globalStore =If globalStoreProviderStoreQueryParams.storeswithAllPartitionAndStaleStoresEnabled(storeName), queryableStoreType);
        if (!globalStore.isEmpty()) {
  allow queries on standbys and restoring replicas in addition to active ones.
     *           return queryableStoreType.create(globalStoreProvider, storeName);
        }
       If StoreQueryParams.withPartitionAndStaleStoresEnabled(final List<T> allStoresInteger =partition), new ArrayList<>();
        for (final StreamThreadStateStoreProvider storeProvider : storeProviders) {
 allow queries on the specific partition irrespective if it is a standby
     *           allStores.addAll(storeProvider.stores(storeName, queryableStoreType, storeQueryParams));
        }
       or if (allStores.isEmpty()) {
     a restoring replicas in addition to active ones.
     * @param throw<T> new InvalidStateStoreException("The state store, " + storeName + ", may have migrated to another instance.");
  return type
     }
* @return A facade wrapping the local {@link return queryableStoreType.create(StateStore} instances
     * @throws InvalidStateStoreException if Kafka Streams  newis WrappingStoreProvider(storeProviders, storeQueryParams),
            storeName(re-)initializing or a store with {@code storeName} and
     * {@code  );
    }
Code Block
languagejava
titleStreamThreadStateStoreProvider.java
collapsetrue

queryableStoreType} doesn't exist
     @SuppressWarnings("unchecked")*/
    public <T> List<T>T storesstore(final String storeName,
                       final QueryableStoreType<T> queryableStoreType,
           final QueryableStoreType<T> queryableStoreType,
          final StoreQueryParams storeQueryParams) {
        validateIsRunningOrRebalancing();
         final StoreQueryParamsreturn queryableStoreProvider.getStore(storeName, queryableStoreType, storeQueryParams) {
	};
    }



Proposed Changes:

  • Add a new public class StoreQueryParams.java to set options for what kind of stores a user wants.
  • Create a taskId from the combination of store name and partition provided by the user.
  • In StreamThreadStateStoreProvider.java return only the stores for the task requested by the user and also check the condition to return only running stores or standby/recovering stores as well.

...