Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


JIRA:  
Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-9445

Discussion: 

Motivation:

Whenever a call is made to get a particular key from a Kafka Streams instance, currently it returns a Queryable store that contains a list of the stores for all the running and restoring/replica(with KIP-535: Allow state stores to serve stale reads during rebalance) on the instance via StreamThreadStateStoreProvider#stores(). This list of stores is then provided to CompositeReadOnlyKeyValueStore#get() which looks into each store one by one. With the changes that went in as a part of KIP-535 since we have access to the information that a key belongs to which partition, we should have the capability to fetch store for that particular partition and look for the key in store for that partition only. It would be a good improvement for improving latencies for applications that contain multiple partitions on a single instance and don't have bloom filters enabled internally for RocksDB.


Public Interfaces:

Adding new Class StoreQueryParams.java to provide user options to the QueryableStoreProvider layer to understand what kind of stores a user wants. It would currently include whether a user is okay with serving stale data and if user already knows what is the partition of the store a user is looking at. Since store name and partition would be a unique combination, a taskId can be generated from this information to return the store for that particular task.

...

Code Block
languagejava
titleStoreQueryParams.java
collapsetrue
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams;


/**
* Represents all the query options that a user can provide to state what kind of stores it is expecting. The options would be whether a user would want to enable/disable stale stores* or whether it knows the list of partitions that it specifically wants to fetch. If this information is not provided the default behavior is to fetch the stores for all the partitions available on that instance* for that particular store name.
* It contains a list of partitions, which for a point queries can be populated from the  KeyQueryMetadata.
*/
public class StoreQueryParams {


private final List<Integer> partitions;


private final boolean includeStaleStores;


public StoreQueryParams(final List<Integer> partitions, final boolean includeStaleStores) {
this.partitions = partitions;
this.includeStaleStores = includeStaleStores;
}


/**
* Get the list of partitions to be used to fetch list of Queryable store from QueryableStoreProvider. 
*
* @return list of partitions
*/
public List<Integer> getPartitions() {
return partitions;
}

/**
* Get the flag includeStaleStores. If true, include standbys and recovering stores along with running stores
*
* @return includeStaleStores
*/
public boolean getIncludeStaleStores() {
return includeStaleStores;
}


@Override
public boolean equals(final Object obj) {
if (!(obj instanceof StoreQueryParams)) {
return false;
}
final StoreQueryParams storeQueryParams = (KeyQueryMetadata) obj;
return Objects.equals(storeQueryParams.partitions, partitions)
&& Objects.equals(storeQueryParams.includeStaleStores, includeStaleStores);
}


@Override
public String toString() {
return "StoreQueryParams {" +
"partitions=" + partitions +
", includeStaleStores=" + includeStaleStores +
'}';
}


@Override
public int hashCode() {
return Objects.hash(partitions, includeStaleStores);
}
}





Proposed Changes:

  • Add a new public class StoreQueryParams.java to set options for what kind of stores a user wants.
  • Create a taskId from the combination of store name and partition provided by the user.
  • In StreamThreadStateStoreProvider.java return only the stores for the task requested by the user along with the check to return only running store or standby and recovering stores as well.


Compatibility, Deprecation, and Migration Plan:

  • No deprecation required as this KIP changes the definition of QueryableStoreProvider#getStore() and StreamThreadStateStoreProvider#stores() and users have to adapt to the new functions.


Rejected Alternatives:

  • Overload the QueryableStoreProvider#getStore() and StreamThreadStateStoreProvider#stores() with new parameters to pass a list of partitions along with the currently passed flag includeStaleStores.