Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Public Interfaces:

  • Adding new class StoreQueryParams StoreQueryParameters to provide user options to the QueryableStoreProvider layer to understand what kind of stores a user wants. It would currently include whether a user is okay with serving stale data and if user already knows what is the partition of the store a user is looking at. Since store name and partition would be a unique combination, a taskId can be generated from this information to return the store for that particular task.

...

Code Block
languagejava
titleStoreQueryParamsStoreQueryParameters.java
package org.apache.kafka.streams;

// Represents all the query options that a user can provide to state what kind of stores it is expecting
public class StoreQueryParams<T>StoreQueryParameters<T> {

    public static <T> StoreQueryParams<T>StoreQueryParameters<T> fromNameAndType(final String storeName, final QueryableStoreType<T>  queryableStoreType);

    public StoreQueryParams<T>StoreQueryParameters<T> withPartition(final Integer partition);

    public StoreQueryParams<T>StoreQueryParameters<T> enableStaleStores();

    public Integer partition();

    public boolean staleStoresEnabled();

    public String storeName();

    public QueryableStoreType<T> queryableStoreType();
}

...

Code Block
languagejava
titleKafkaStreams.java
public class KafkaStreams {
  @Deprecated
  public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType);

  // remove (was added via KIP-535 and was never released)
  public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType, final boolean staleStores);

  // newly added 
  public <T> T store(final StoreQueryParams<T>StoreQueryParameters<T> storeQueryParamsstoreQueryParameters);
}


Proposed Changes:

  • Add a new public class StoreQueryParamsclass StoreQueryParameters to set options for what kind of stores a user wants.
  • Create a taskId from the combination of store name and partition provided by the user.
  • In StreamThreadStateStoreProvider.java  return only the stores for the task requested by the user and also check the condition to return only running stores or standby/recovering stores as well.

...

  • KafkaStreams#store(final String storeName, final QueryableStoreType<T> queryableStoreType, final boolean includeStaleStores) will be changed to the one mentioned in the Public Interfaces changes. Since the mentioned function is not released yet in any version, no deprecation is required.
  • Deprecating store(final String storeName, final QueryableStoreType<T> queryableStoreType) method in favour of  public <T> T store(final StoreQueryParams<T> storeQueryParamsStoreQueryParameters<T> storeQueryParameters) as both store name and queryableStoreType have been added to StoreQueryParamsto StoreQueryParameters

Rejected Alternatives:

  • Overload the QueryableStoreProvider#getStore() and StreamThreadStateStoreProvider#stores() with new parameters to pass a list of partitions along with the currently passed flag includeStaleStores.