Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleStoreQueryParams.java
collapsetrue
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.streams;

import org.apache.kafka.streams.state.QueryableStoreType;

import java.util.Objects;

/**
 * Represents all the query options that a user can provide to state what kind of stores it is expecting. The options would be whether a user would want to enable/disable stale stores* or whether it knows the list of partitions that it specifically wants to fetch. If this information is not provided the default behavior is to fetch the stores for all the partitions available on that instance* for that particular store name.
 * It contains a partition, which for a point queries can be populated from the  KeyQueryMetadata.
 */
public class StoreQueryParams<T> {

    private Integer partition;
    private boolean includeStaleStores;
    private final String storeName;
    private final QueryableStoreType<T> queryableStoreType;

    private StoreQueryParams(final String storeName, final QueryableStoreType<T>  queryableStoreType) {
        this.storeName = storeName;
        this.queryableStoreType = queryableStoreType;
    }

    public static <T> StoreQueryParams<T> fromNameAndType(final String storeName, final QueryableStoreType<T>  queryableStoreType) {
        return new StoreQueryParams(storeName, queryableStoreType);
    }

    /**
     * Get the partition to be used to fetch list of Queryable store from QueryableStoreProvider.
     * If the function returns null, it would mean that no specific partition has been requested so all the local partitions
     * for the store will be returned.
     *
     * @return Integer partition
     */
    public Integer getPartition() {
        return partition;
    }

    /**
     * Get the flag includeStaleStores. If true, include standbys and recovering stores along with running stores.
     *
     * @return boolean includeStaleStores
     */
    public boolean isIncludeStaleStores() {
        return includeStaleStores;
    }

    /**
     * Get the {@link StoreQueryParams} with stale(standby, restoring) stores added via fetching the stores.
     *
     * @param partition   The specific integer partition to be fetched from the stores list by using {@link StoreQueryParams}.
     *
     * @return String storeName
     */
    public StoreQueryParams<T> withPartition(final Integer partition) {
package org.apache.kafka.streams;


/**
 * Represents all the query options that a user can provide to state what kind of stores it is expecting. The options would be whether a user would want to enable/disable stale stores* or whether it knows the list of partitions that it specifically wants to fetch. If this information is not provided the default behavior is to fetch the stores for all the partitions available on that instance* for that particular store name.
 * It contains a partition, which for a point queries can be populated from the  KeyQueryMetadata.
 */
public class StoreQueryParams<T> {

    private Integer partition;
    private boolean includeStaleStores;
    private final String storeName;
    private final QueryableStoreType<T> queryableStoreType;

    private StoreQueryParams(final String storeName, final QueryableStoreType<T>  queryableStoreType) {
        this.storeName = storeName;
        this.partitionqueryableStoreType = partitionqueryableStoreType;
    }

    public static return<T> this;
    }

    /**StoreQueryParams<T> fromNameAndType(final String storeName, final QueryableStoreType<T>  queryableStoreType) {
     *  Get thereturn {@linknew StoreQueryParams} with stale(standbystoreName, restoringqueryableStoreType);
 stores added via fetching the stores. }

     /**
     * @return String storeName
* Get the partition to */
be used to fetch publiclist StoreQueryParams<T>of withIncludeStaleStores() {
  Queryable store from QueryableStoreProvider.
     * this.includeStaleStoresIf =the true;
function returns null, it would mean that no returnspecific this;
partition has been requested }

so all the local /**partitions
     * Getfor the store namewill for which key is queried by the userbe returned.
     *
     * @return StringInteger storeNamepartition
     */
    public StringInteger getStoreNamegetPartition() {
        return storeNamepartition;
    }

    /**
     * Get the flag includeStaleStores. queryableIf storetrue, typeinclude forstandbys whichand keyrecovering isstores queriedalong bywith therunning userstores.
     *
     * @return QueryableStoreTypeboolean queryableStoreTypeincludeStaleStores
     */
    public QueryableStoreType<T>boolean getQueryableStoreTypeincludeStaleStores() {
        return queryableStoreTypeincludeStaleStores;
    }

    @Override
    public boolean equals(final Object obj) {/**
     * Get the {@link StoreQueryParams} with stale(standby, restoring) stores added via fetching the stores.
     *
     * @param partition if (!(obj instanceofThe StoreQueryParams)) {
            return false;specific integer partition to be fetched from the stores list by using {@link StoreQueryParams}.
        }*
     * @return String finalstoreName
 StoreQueryParams storeQueryParams = (StoreQueryParams) obj; */
    public StoreQueryParams<T> withPartition(final  return Objects.equals(storeQueryParams.partition, partition)
Integer partition) {
        this.partition = partition;
       && Objects.equals(storeQueryParams.includeStaleStores, includeStaleStores) return this;
    }

    /**
     * Get  && Objects.equals(storeQueryParams.storeName, storeName)
      the {@link StoreQueryParams} with stale(standby, restoring) stores added via fetching the stores.
        *
  && Objects.equals(storeQueryParams.queryableStoreType, queryableStoreType);
 * @return String }storeName

     @Override*/
    public StringStoreQueryParams<T> toStringwithIncludeStaleStores() {
        this.includeStaleStores = true;
        return this;
  "StoreQueryParams {" +}

    /**
     * Get the store name for which "partition=" + partition +
  key is queried by the user.
     *
     * @return String storeName
 ", includeStaleStores=" + includeStaleStores +*/
    public String storeName() {
        return ", storeName=" + storeName +storeName;
    }

     /**
     * Get the queryable store type ", queryableStoreType=" + queryableStoreType +
   for which key is queried by the user.
     *
     * @return QueryableStoreType '}';queryableStoreType
    }

    @Override*/
    public intQueryableStoreType<T> hashCodequeryableStoreType() {
        return Objects.hash(partition, includeStaleStores, storeName,return queryableStoreType);
    }
}



  • Deprecating Changing the KafkaStreams#store(final String storeName, final QueryableStoreType<T> queryableStoreType, final boolean includeStaleStores) in favour of the funtion mentioned below as this one hasn't been released yet.


Code Block
languagejava
titleKafkaStreams.java
collapsetrue
     

    /**
     * Get a facade wrapping the local {@link StateStore} instances with the provided {@link StoreQueryParams}.
     * StoreQueryParams need required parameters to be set, which are {@code storeName} and if
     * type is accepted by the provided {@link QueryableStoreType#accepts(StateStore) queryableStoreType}.
     * The optional parameters to the StoreQueryParams include {@code partition} and {@code includeStaleStores}.
     * The returned object can be used to query the {@link StateStore} instances.
     *
     * @param storeQueryParams    If StoreQueryParams.fromNameAndType(storeName, queryableStoreType).withPartition(int partition) is used, it allow queries on the specific partition irrespective if it is a standby
     *                            or a restoring replicas in addition to active ones.
     *                            If StoreQueryParams.fromNameAndType(storeName, queryableStoreType).withIncludeStaleStores() is used, it allow queries on standbys and restoring replicas in addition to active ones for all the local partitions on the instance.
     *                            If StoreQueryParams.fromNameAndType(storeName, queryableStoreType).withIncludeStaleStores().withPartition(int partition), it allow queries on the specific partition irrespective if it is a standby
     *                            or a restoring replicas in addition to active ones..
     *                            By default, if just storeQueryParams is used, it returns all the local partitions for the store which are in running state.
     * @param <T>                 return type
     * @return A facade wrapping the local {@link StateStore} instances
     * @throws InvalidStateStoreException if Kafka Streams is (re-)initializing or a store with {@code storeName} and
     * {@code queryableStoreType} doesn't exist
     */
    public <T> T store(final StoreQueryParams<T> storeQueryParams) {
        validateIsRunningOrRebalancing();
        return queryableStoreProvider.getStore(storeQueryParams);
    }

...

  • KafkaStreams#store(final String storeName, final QueryableStoreType<T> queryableStoreType, final boolean includeStaleStores) will be changed to the one mentioned in the Public Interfaces changes. Since the mentioned function is not released yet in any version, no deprecation is required.
  • Deprecating store(final String storeName, final QueryableStoreType<T> queryableStoreType) method in favour of  public <T> T store(final StoreQueryParams<T> storeQueryParams) as both store name and queryableStoreType have been added to StoreQueryParams. 

Rejected Alternatives:

  • Overload the QueryableStoreProvider#getStore() and StreamThreadStateStoreProvider#stores() with new parameters to pass a list of partitions along with the currently passed flag includeStaleStores.