Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

JIRA:  
Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-9445

Discussion:

...

  https://www.mail-archive.com/dev@kafka.apache.org/msg104287.html

Motivation:

Whenever a call is made to get a particular key from a Kafka Streams instance, currently it returns a Queryable store wrapper that contains a list of the stores for all the running and restoring/replica(with KIP-535: Allow state stores to serve stale reads during rebalance) on the instance via StreamThreadStateStoreProvider#stores(). This list of stores is then provided to CompositeReadOnlyKeyValueStore#get() which looks into each store one by one. With the changes that went in as a part of KIP-535 since we have access to the information that a key belongs to which partition, we should have the capability to fetch store for that particular partition and look for the key in store for that partition only. It would be a good improvement for improving latencies for applications that contain multiple partitions on a single instance and don't have bloom filters enabled internally for RocksDB

When serving queries (like `get(key)` or `range(from, to)`), the wrapper actually iterates over all the underlying stores and issues the same query on each one. This is quite inefficient, and more importantly, it disallows some capabilities that KIP-535 intended to provide.

KIP-535 introduced two discovery mechanism so that users could implement a query routing layer, the ability to find out the partition for a specific key, and the ability to find out the locations and freshness of each replica of each partition of a store. Further, it introduced one key mechanism of a resilient query fetch layer, the ability to serve queries from hot-standby replicas and not just running active ones.

What is implicit is that the query routing layer would select an instance from which to fetch each partition of a store that the query spans, and then fan out to execute sub-queries against each such partition on the selected instances. However, the current store() API disallows this last step. Callers are only able to query all partitions on the local instance, not one specific partition.

Here's an example of how this is a drawback:

Imagine we have a cluster with two instances (A and B), and a store S with two partitions (0 and 1). Imagine further that store S has one active and one standby replica configured. Say, instance A hosts (0-active and 1-standby) and instance B hosts (1-active and 0-standby). Now, suppose the query routing layer wants to query the standby replica (so as not to compete with active processing). This arrangement is currently impossible. What would happen instead is that both instance A and B would return results from both partition 0 and 1, and the query router would have to de-duplicate the results. Plus, it would not achieve the objective to avoid competing with active processing.

To fill this gap, this KIP proposes to allow querying a specific partition of a store, while still preserving the ability to query all local partitions. This would also reduce latencies while querying a particular key from an instance, as it will fetch the key only from the specific store partition where it belongs which would be very helpful in instances containing multiple partitions.

Public Interfaces:

  • Adding new

...

  • class StoreQueryParameters to provide user options to the QueryableStoreProvider

...

  •  layer to understand what kind of stores a user wants. It would currently include whether a user is okay with serving stale data and if user already knows what is the partition of the store a user is looking at. Since store name and partition would be a unique combination, a taskId can be generated from this information to return the store for that particular task.


Code Block
languagejava
titleStoreQueryParamsStoreQueryParameters.javacollapsetrue
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.streams;

import java.util.Objects;

/**
 * // Represents all the query options that a user can provide to state what kind of stores it is expecting. The options would be whether a user would want to enable/disable stale stores* or whether it knows the list of partitions that it specifically wants to fetch. If this information is not provided the default behavior is to fetch the stores for all the partitions available on that instance* for that particular store name.
 * It contains a list of partitions, which for a point queries can be populated from the  KeyQueryMetadata.
 */
public class StoreQueryParamsStoreQueryParameters<T> {

    privatepublic finalstatic Integer<T> partition;
    private final boolean includeStaleStores;

    public StoreQueryParamsStoreQueryParameters<T> fromNameAndType(final IntegerString partitionstoreName, final QueryableStoreType<T> boolean includeStaleStores) {
        this.partition = partition;
        this.includeStaleStores = includeStaleStores;
    }


    /**
     * Get the list of partitions to be used to fetch list of Queryable store from QueryableStoreProvider.
     *queryableStoreType);

     * @return list of partitions
     */
    public IntegerStoreQueryParameters<T> getPartitionwithPartition() {
        return partition;
    }

    /**
     * Get the flag includeStaleStores. If true, include standbys and recovering stores along with running stores
     *
     * @return includeStaleStores
     */
    public boolean getIncludeStaleStores() {
        return includeStaleStores;
    }


    @Override
    public boolean equals(final Object obj) {
        if (!(obj instanceof StoreQueryParams)) {
            return false;
        }
        final StoreQueryParams storeQueryParams = (StoreQueryParams) obj;
        return Objects.equals(storeQueryParams.partition, partition)
                && Objects.equals(storeQueryParams.includeStaleStores, includeStaleStores);
    }
final Integer partition);

    public StoreQueryParameters<T> enableStaleStores();

    public Integer partition();

    @Override
    public Stringboolean toString() {
        return "StoreQueryParams {" +
                "partition=" + partition +
                ", includeStaleStores=" + includeStaleStores +
                '}';
    }


    @Override
    public int hashCode() {
        return Objects.hash(partition, includeStaleStores);
    }
}
Code Block
languagejava
titleKafkaStreams.java
collapsetrue
 public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType) {
 return store(storeName, queryableStoreType, new StoreQueryParams(null, false));
 }   


public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType, final StoreQueryParams storeQueryParams) {
validateIsRunningOrRebalancing();
return queryableStoreProvider.getStore(storeName, queryableStoreType, storeQueryParams);
}


...

languagejava
titleQueryableStoreProvider.java
collapsetrue

...

staleStoresEnabled();

    public String storeName();

    public QueryableStoreType<T> queryableStoreType();
}



  • Changing the KafkaStreams#store(final String storeName, final QueryableStoreType<T> queryableStoreType, final boolean staleStores) in favour of the function mentioned below as this one hasn't been released yet.


Code Block
collapse
languagejava
titleStreamThreadStateStoreProviderKafkaStreams.javatrue
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.streams.StoreQueryParams;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class StreamThreadStateStoreProvider {

    private final StreamThread streamThread;
    private final InternalTopologyBuilder internalTopologyBuilder;

    public StreamThreadStateStoreProvider(final StreamThread streamThread, final InternalTopologyBuilder internalTopologyBuilder) {
        this.streamThread = streamThread;
        this.internalTopologyBuilder = internalTopologyBuilder;
    }

    @SuppressWarnings("unchecked")
    public <T> List<T> storespublic class KafkaStreams {
  @Deprecated
  public <T> T store(final String storeName,
                              final QueryableStoreType<T> queryableStoreType,
                              final StoreQueryParams storeQueryParams) {

        final TaskId keyTaskId = createKeyTaskId(storeName, storeQueryParams.getPartition());
        if (streamThread.state() == StreamThread.State.DEAD) {
  //          return Collections.emptyList();
        }
        final StreamThread.State state = streamThread.state();
        if (storeQueryParams.getIncludeStaleStores() ? state.isAlive() : state == StreamThread.State.RUNNING) {
            final Map<TaskId, ? extends Task> tasks = storeQueryParams.getIncludeStaleStores() ? streamThread.allTasks() : streamThread.activeTasks();
            final List<T> stores = new ArrayList<>();
            for (final Task streamTask : tasks.values()) {
                if (keyTaskId != null && !keyTaskId.equals(streamTask.id())) {
                    continue;
                }
                final StateStore store = streamTask.getStore(storeName);
                if (store != null && queryableStoreType.accepts(store)) {
                    if (!store.isOpen()) {
                        throw new InvalidStateStoreException(
                                "Cannot get state store " + storeName + " for task " + streamTask +
                                        " because the store is not open. " +
                                        "The state store may have migrated to another instances.");
                    }
                    if (store instanceof TimestampedKeyValueStore && queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) {
                        stores.add((T) new ReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore<Object, Object>) store));
                    } else if (store instanceof TimestampedWindowStore && queryableStoreType instanceof QueryableStoreTypes.WindowStoreType) {
                        stores.add((T) new ReadOnlyWindowStoreFacade<>((TimestampedWindowStore<Object, Object>) store));
                    } else {
                        stores.add((T) store);
                    }
                }
            }
            return stores;
        } else {
            throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the stream thread is " +
                    state + ", not RUNNING" +
                    (storeQueryParams.getIncludeStaleStores() ? " or REBALANCING" : ""));
        }
    }

    private TaskId createKeyTaskId(final String storeName, final Integer partition) {
        if (partition == null) {
            return null;
        }
        final List<String> sourceTopics = internalTopologyBuilder.stateStoreNameToSourceTopics().get(storeName);
        final Set<String> sourceTopicsSet = sourceTopics.stream().collect(Collectors.toSet());
        final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = internalTopologyBuilder.topicGroups();
        for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> topicGroup : topicGroups.entrySet()) {
            if (topicGroup.getValue().sourceTopics.containsAll(sourceTopicsSet)) {
                return new TaskId(topicGroup.getKey(), partition.intValue());
            }
        }
        throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the requested partition " + partition + "is" +
                "not available on this instance");
    }
}

remove (was added via KIP-535 and was never released)
  public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType, final boolean staleStores);

  // newly added 
  public <T> T store(final StoreQueryParameters<T> storeQueryParameters);
}


Proposed Changes:

  • Add a new public class StoreQueryParams.java to class StoreQueryParameters to set options for what kind of stores a user wants.
  • Create a taskId from the combination of store name and partition provided by the user as shown in StreamThreadStateStoreProvider#createKeyTaskId() above.
  • In StreamThreadStateStoreProvider.java return   return only the stores for the task requested by the user and also check the condition to return only running stores or standby/recovering stores as well.

...

Compatibility, Deprecation, and Migration Plan:

  • KafkaStreams#store(final String storeName, final QueryableStoreType<T> queryableStoreType, final boolean includeStaleStores) will be changed to the one mentioned in the Public Interfaces changes. Since the mentioned function is not released yet in any version, no deprecation is required.
  • Deprecating store(final String storeName, final QueryableStoreType<T> queryableStoreType) method in favour of  public <T> T store(final StoreQueryParameters<T> storeQueryParameters) as both store name and queryableStoreType have been added to StoreQueryParameters. As a part of KIP-535  KIP the functions QueryableStoreProvider#getStore() and StreamThreadStateStoreProvider#stores() have already been changed so we will overwrite those functions once more before merging shipping both the features together in 2.5.0.

Rejected Alternatives:

  • Overload the QueryableStoreProvider#getStore() and StreamThreadStateStoreProvider#stores() with new parameters to pass a list of partitions along with the currently passed flag includeStaleStores.