Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleStoreQueryParams.java
collapsetrue
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.streams;

import java.util.Objects;

/**
 * Represents all the query options that a user can provide to state what kind of stores it is expecting. The options would be whether a user would want to enable/disable stale stores* or whether it knows the list of partitions that it specifically wants to fetch. If this information is not provided the default behavior is to fetch the stores for all the partitions available on that instance* for that particular store name.
 * It contains a list of partitions, which for a point queries can be populated from the  KeyQueryMetadata.
 */
public class StoreQueryParams {

    private final Integer partition;
    private final boolean includeStaleStores;

    public static final StoreQueryParams withPartitionAndStaleStoresDisabled(final Integer partition, final boolean includeStaleStores) {
        this.partitionreturn =new StoreQueryParams(partition, false);
    }

    public this.includeStaleStoresstatic =final includeStaleStores;
    }


StoreQueryParams withPartitionAndStaleStoresEnabled(final Integer partition) {
    /**
    return * Get the list of partitions to be used to fetch list of Queryable store from QueryableStoreProvider.new StoreQueryParams(partition, true);
    }

    public static final StoreQueryParams withAllPartitionAndStaleStoresDisabled() {
     *
   return new * @return list of partitionsStoreQueryParams(null, false);
     */}

    public static Integerfinal StoreQueryParams getPartitionwithAllPartitionAndStaleStoresEnabled() {
        return partition new StoreQueryParams(null, true);
    }

    /**
     * Get the flag includeStaleStores. If true, include standbys and recovering stores along with running stores
private StoreQueryParams(final Integer partition, final boolean includeStaleStores) {
        this.partition = partition;
        *
this.includeStaleStores = includeStaleStores;
   * @return includeStaleStores }


     /**/
    public boolean* getIncludeStaleStores() {
        return includeStaleStores;
    }


Get the partition to be used to fetch list of Queryable store from QueryableStoreProvider.
     @Override*
    public boolean* equals(final@return Objectan obj)Integer {partition
     */
   if (!(obj instanceof StoreQueryParams))public Integer getPartition() {
            return falsepartition;
    }

    }/**
     * Get the finalflag StoreQueryParamsincludeStaleStores. storeQueryParamsIf = (StoreQueryParams) obj;
        return Objects.equals(storeQueryParams.partition, partition)true, include standbys and recovering stores along with running stores
     *
     * @return     && Objects.equals(storeQueryParams.includeStaleStores, includeStaleStores);boolean includeStaleStores
    }


    @Override */
    public Stringboolean toStringincludeStaleStores() {
        return "StoreQueryParams {" +includeStaleStores;
    }

    /**
     * Get whether "partition=" + partition +
        the store query params are fetching all partitions or a single partition.
     *
   ", includeStaleStores=" +* includeStaleStores@return +
boolean. True, if all partitions are requests or false if a specific partition is   '}';
requested
     }*/


    @Override
public    public int hashCodeboolean getAllLocalPartitions() {
        return Objects.hash(partition, includeStaleStores)partition == null ? true : false;
    }
}
Code Block
languagejava
titleKafkaStreams.java
collapsetrue
 public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType
    @Override
    public boolean equals(final Object obj) {
 return store(storeName, queryableStoreType, new StoreQueryParams(null, false));
 }   


public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType,       if (!(obj instanceof StoreQueryParams)) {
            return false;
        }
        final StoreQueryParams storeQueryParams) = {
validateIsRunningOrRebalancing(StoreQueryParams) obj;
return queryableStoreProvider.getStore(storeName, queryableStoreType, storeQueryParams);
}


Code Block
languagejava
titleQueryableStoreProvider.java
collapsetrue
 public <T> T getStore(final String storeName, final QueryableStoreType<T> queryableStoreType, final StoreQueryParams storeQueryParams) {
        return Objects.equals(storeQueryParams.partition, partition)
             final List<T> globalStore =&& globalStoreProviderObjects.storesequals(storeNamestoreQueryParams.includeStaleStores, queryableStoreTypeincludeStaleStores);
    }


    if (!globalStore.isEmpty())@Override
    public String toString() {
        return "StoreQueryParams {" +
 return queryableStoreType.create(globalStoreProvider, storeName);
        }
     "partition=" + partition final List<T>+
 allStores = new ArrayList<>();
        for (final StreamThreadStateStoreProvider storeProvider : storeProviders) {
  ", includeStaleStores=" + includeStaleStores +
          allStores.addAll(storeProvider.stores(storeName, queryableStoreType, storeQueryParams));
    '}';
    }


    @Override
    ifpublic int hashCode(allStores.isEmpty()) {
            throw new InvalidStateStoreException("The state store, " + storeName + ", may have migrated to another instance.");return Objects.hash(partition, includeStaleStores);
    }
}


Code Block
languagejava
titleKafkaStreams.java
collapsetrue
     /**
     * Get a }
facade wrapping the local {@link StateStore} instances with return queryableStoreType.create(
     the provided {@code storeName} if the Store's
     * type new WrappingStoreProvider(storeProviders, storeQueryParams),
     is accepted by the provided {@link QueryableStoreType#accepts(StateStore) queryableStoreType}.
     * The storeName
returned object can be used to query the );
{@link }StateStore} instances.
     *
     * Only permits queries on active replicas of the store (no standbys or restoring replicas).
     * See {@link KafkaStreams#store(String, QueryableStoreType, StoreQueryParams)}
     * for the option to set {@code StoreQueryParams.withAllPartitionAndStaleStoresEnabled or StoreQueryParams.withPartitionAndStaleStoresEnabled(final Integer partition)} and trade off consistency in favor of availability.
     *
     * @param storeName           name of the store to find
     * @param queryableStoreType  accept only stores that are accepted by {@link QueryableStoreType#accepts(StateStore)}
     * @param <T>                 return type
     * @return A facade wrapping the local {@link StateStore} instances
     * @throws InvalidStateStoreException if Kafka Streams is (re-)initializing or a store with {@code storeName} and
     * {@code queryableStoreType} doesn't exist
     */
    public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType) {
        return store(storeName, queryableStoreType, StoreQueryParams.withAllPartitionAndStaleStoresDisabled());
    }

    /**
     * Get a facade wrapping the local {@link StateStore} instances with the provided {@code storeName} if the Store's
     * type is accepted by the provided {@link QueryableStoreType#accepts(StateStore) queryableStoreType}.
     * The returned object can be used to query the {@link StateStore} instances.
     *
     * @param storeName           name of the store to find
     * @param queryableStoreType  accept only stores that are accepted by {@link QueryableStoreType#accepts(StateStore)}
     * @param storeQueryParams    If StoreQueryParams.withAllPartitionAndStaleStoresDisabled() is used, it only permit queries on the active replicas for all the partitions
     *                            available on the local instance, and only if the
     *                            task for that partition is running. I.e., the state store is not a standby replica,
     *                            and it is not restoring from the changelog.
     *                            If StoreQueryParams.withPartitionAndStaleStoresEnabled(final Integer partition) is used, it only permit queries on the specific provided active replicas
     *                            for the partition provided in the parameter, and only if the
     *                            task for that partition is running. I.e., the state store is not a standby replica,
     *                            and it is not restoring from the changelog.
     *                            If StoreQueryParams.withAllPartitionAndStaleStoresEnabled(), allow queries on standbys and restoring replicas in addition to active ones.
     *                            If StoreQueryParams.withPartitionAndStaleStoresEnabled(final Integer partition), allow queries on the specific partition irrespective if it is a standby
     *                            or a restoring replicas in addition to active ones.
     * @param <T>                 return type
     * @return A facade wrapping the local {@link StateStore} instances
     * @throws InvalidStateStoreException if Kafka Streams is (re-)initializing or a store with {@code storeName} and
     * {@code queryableStoreType} doesn't exist
     */
    public <T> T store(final String storeName,
                       final QueryableStoreType<T> queryableStoreType,
                       final StoreQueryParams storeQueryParams) {
        validateIsRunningOrRebalancing();
        return queryableStoreProvider.getStore(storeName, queryableStoreType, storeQueryParams);
    }



Code Block
languagejava
titleQueryableStoreProvider.java
collapsetrue
  /**
     * Get a composite object wrapping the instances of the {@link StateStore} with the provided
     * storeName and {@link QueryableStoreType}
     *
     * @param storeName          name of the store
     * @param queryableStoreType accept stores passing {@link QueryableStoreType#accepts(StateStore)}
     * @param storeQueryParams       if stateStoresEnabled is used i.e. includeStaleStores is true, include standbys and recovering stores;
     *                                        if stateStoresDisabled i.e. includeStaleStores is false, only include running actives;
											  if partition is null then it fetches all local partitions on the instance;
											  if partition is set then it fetches a specific partition.
     * @param <T>                The expected type of the returned store
     * @return A composite object that wraps the store instances.
     */
    public <T> T getStore(final String storeName,
                          final QueryableStoreType<T> queryableStoreType,
                          final StoreQueryParams storeQueryParams) {
        final List<T> globalStore = globalStoreProvider.stores(storeName, queryableStoreType);
        if (!globalStore.isEmpty()) {
            return queryableStoreType.create(globalStoreProvider, storeName);
        }
        final List<T> allStores = new ArrayList<>();
        for (final StreamThreadStateStoreProvider storeProvider : storeProviders) {
            allStores.addAll(storeProvider.stores(storeName, queryableStoreType, storeQueryParams));
        }
        if (allStores.isEmpty()) {
            throw new InvalidStateStoreException("The state store, " + storeName + ", may have migrated to another instance.");
        }
        return queryableStoreType.create(
            new WrappingStoreProvider(storeProviders, storeQueryParams),
            storeName
        );
    }


Code Block
languagejava
titleStreamThreadStateStoreProvider.java
collapsetrue
package org.apache.kafka.streams.state.internals
Code Block
languagejava
titleStreamThreadStateStoreProvider.java
collapsetrue
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.streams.StoreQueryParams;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class StreamThreadStateStoreProvider {

    private final StreamThread streamThread;
    private final InternalTopologyBuilder internalTopologyBuilder;

    public StreamThreadStateStoreProvider(final StreamThread streamThread, final InternalTopologyBuilder internalTopologyBuilder) {
        this.streamThread = streamThread;
        this.internalTopologyBuilder = internalTopologyBuilder;
    }

    @SuppressWarnings("unchecked")
    public <T> List<T> stores(final String storeName,
                              final QueryableStoreType<T> queryableStoreType,
                              final StoreQueryParams storeQueryParams) {

        final TaskId keyTaskId = createKeyTaskId(storeName, storeQueryParams.getPartition());
        if (streamThread.state() == StreamThread.State.DEAD) {
            return Collections.emptyList();
        }
        final StreamThread.State state = streamThread.state();
        if (storeQueryParams.getIncludeStaleStoresincludeStaleStores() ? state.isAlive() : state == StreamThread.State.RUNNING) {
            final Map<TaskId, ? extends Task> tasks = storeQueryParams.getIncludeStaleStoresincludeStaleStores() ? streamThread.allTasks() : streamThread.activeTasks();
            final List<T> stores = new ArrayList<>();
            for (final Task streamTask : tasks.values()) {
                if (keyTaskId != null && !keyTaskId.equals(streamTask.id())) {
                    continue;
                }
                final StateStore store = streamTask.getStore(storeName);
                if (store != null && queryableStoreType.accepts(store)) {
                    if (!store.isOpen()) {
                        throw new InvalidStateStoreException(
                                "Cannot get state store " + storeName + " for task " + streamTask +
         +
                                " because the store is not open. " +
                                        "The state store may have migrated to another instances.");
                    }
                    if (store instanceof TimestampedKeyValueStore && queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) {
                        stores.add((T) new ReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore<Object, Object>) store));
                    } else if (store instanceof TimestampedWindowStore && queryableStoreType instanceof QueryableStoreTypes.WindowStoreType) {
                        stores.add((T) new ReadOnlyWindowStoreFacade<>((TimestampedWindowStore<Object, Object>Object>) store));
                    } else {
                        stores.add((T) store));
                    } else {
                }
            }
            stores.add((T) store)return stores;
        } else {
          }
                }  throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the stream thread is " +
            }
              return stores;
        } else {
            throw new InvalidStateStoreException("Cannot get state store+ ", +not storeNameRUNNING" +
 " because the stream thread is " +
                    state + ", not RUNNING" +
                    (storeQueryParams.getIncludeStaleStoresincludeStaleStores() ? " or REBALANCING" : ""));
        }
    }

    private TaskId createKeyTaskId(final String storeName, final Integer partition) {
        if (partition == null) {
            return null;
        }
        final List<String> sourceTopics = internalTopologyBuilder.stateStoreNameToSourceTopics().get(storeName);
        final Set<String> sourceTopicsSet = sourceTopics.stream().collect(Collectors.toSet());
        final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = internalTopologyBuilder.topicGroups();
        for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> topicGroup : topicGroups.entrySet()) {
            if (topicGroup.getValue().sourceTopics.containsAll(sourceTopicsSet)) {
                return new TaskId(topicGroup.getKey(), partition.intValue());
            }
        }
        throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the requested partition " + partition + "is" +
                                                "not available on this instance");
    }
}


Proposed Changes:

  • Add a new public class StoreQueryParams.java to set options for what kind of stores a user wants.
  • Create a taskId from the combination of store name and partition provided by the user as shown in StreamThreadStateStoreProvider#createKeyTaskId() above.
  • In StreamThreadStateStoreProvider.java return only the stores for the task requested by the user and also check the condition to return only running stores or standby/recovering stores as well.

...