THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Public Interfaces:
- Adding new class
StoreQueryParams
StoreQueryParameters to provide user options to theQueryableStoreProvider
layer to understand what kind of stores a user wants. It would currently include whether a user is okay with serving stale data and if user already knows what is the partition of the store a user is looking at. Since store name and partition would be a unique combination, a taskId can be generated from this information to return the store for that particular task.
...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.streams; // Represents all the query options that a user can provide to state what kind of stores it is expecting public class StoreQueryParams<T>StoreQueryParameters<T> { public static <T> StoreQueryParams<T>StoreQueryParameters<T> fromNameAndType(final String storeName, final QueryableStoreType<T> queryableStoreType); public StoreQueryParams<T>StoreQueryParameters<T> withPartition(final Integer partition); public StoreQueryParams<T>StoreQueryParameters<T> enableStaleStores(); public Integer partition(); public boolean staleStoresEnabled(); public String storeName(); public QueryableStoreType<T> queryableStoreType(); } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
public class KafkaStreams { @Deprecated public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType); // remove (was added via KIP-535 and was never released) public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType, final boolean staleStores); // newly added public <T> T store(final StoreQueryParams<T>StoreQueryParameters<T> storeQueryParamsstoreQueryParameters); } |
Proposed Changes:
- Add a new public class
StoreQueryParams
classStoreQueryParameters
to set options for what kind of stores a user wants. - Create a taskId from the combination of store name and partition provided by the user.
- In
StreamThreadStateStoreProvider.java
return only the stores for the task requested by the user and also check the condition to return only running stores or standby/recovering stores as well.
...
- KafkaStreams#store(final String storeName, final QueryableStoreType<T> queryableStoreType, final boolean includeStaleStores) will be changed to the one mentioned in the Public Interfaces changes. Since the mentioned function is not released yet in any version, no deprecation is required.
- Deprecating store(final String storeName, final QueryableStoreType<T> queryableStoreType) method in favour of public <T> T store(final StoreQueryParams<T> storeQueryParamsStoreQueryParameters<T> storeQueryParameters) as both store name and queryableStoreType have been added to StoreQueryParamsto StoreQueryParameters.
Rejected Alternatives:
- Overload the QueryableStoreProvider#getStore() and StreamThreadStateStoreProvider#stores() with new parameters to pass a list of partitions along with the currently passed flag includeStaleStores.