Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleStreamThreadStateStoreProvider.java
collapsetrue
package org.apache.kafka.streams.state.internals;

public class StreamThreadStateStoreProvider {

    private final StreamThread streamThread;
    private final InternalTopologyBuilder internalTopologyBuilder;

    public StreamThreadStateStoreProvider(final StreamThread streamThread, final InternalTopologyBuilder internalTopologyBuilder) {
        this.streamThread = streamThread;
        this.internalTopologyBuilder = internalTopologyBuilder;
    }

    @SuppressWarnings("unchecked")
    public <T> List<T> stores(final String storeName,
                              final QueryableStoreType<T> queryableStoreType,
                              final StoreQueryParams storeQueryParams) {

        final TaskId keyTaskId = createKeyTaskId(storeName, storeQueryParams.getPartition());
        if (streamThread.state() == StreamThread.State.DEAD) {
            return Collections.emptyList();
        }
        final StreamThread.State state = streamThread.state();
        if (storeQueryParams.includeStaleStores() ? state.isAlive() : state == StreamThread.State.RUNNING) {
            final Map<TaskId, ? extends Task> tasks = storeQueryParams.includeStaleStores() ? streamThread.allTasks() : streamThread.activeTasks();
            final List<T> stores = new ArrayList<>();
            for (final Task streamTask : tasks.values()) {
                if (keyTaskId != null && !keyTaskId.equals(streamTask.id())) {
                    continue;
                }
                final StateStore store = streamTask.getStore(storeName);
                if (store != null && queryableStoreType.accepts(store)) {
                    if (!store.isOpen()) {
                        throw new InvalidStateStoreException(
                            "Cannot get state store " + storeName + " for task " + streamTask +
                                " because the store is not open. " +
                                "The state store may have migrated to another instances.");
                    }
                    if (store instanceof TimestampedKeyValueStore && queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) {
                        stores.add((T) new ReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore<Object, Object>) store));
                    } else if (store instanceof TimestampedWindowStore && queryableStoreType instanceof QueryableStoreTypes.WindowStoreType) {
                        stores.add((T) new ReadOnlyWindowStoreFacade<>((TimestampedWindowStore<Object, Object>) store));
                    } else {
                        stores.add((T) store);
                    }
                }
            }
            return stores;
        } else {
            throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the stream thread is " +
                                                     state + ", not RUNNING" +
                                                     (storeQueryParams.includeStaleStores() ? " or REBALANCING" : ""));
        }
    }

    private TaskId createKeyTaskId(final String storeName, final Integer partition) {
        if (partition == null) {
            return null;
        }
        final List<String> sourceTopics = internalTopologyBuilder.stateStoreNameToSourceTopics().get(storeName);
        final Set<String> sourceTopicsSet = sourceTopics.stream().collect(Collectors.toSet());
        final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = internalTopologyBuilder.topicGroups();
        for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> topicGroup : topicGroups.entrySet()) {
            if (topicGroup.getValue().sourceTopics.containsAll(sourceTopicsSet)) {
                return new TaskId(topicGroup.getKey(), partition.intValue());
            }
        }
        throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the requested partition " + partition + "is" +

    @SuppressWarnings("unchecked")
    public <T> List<T> stores(final String storeName,
                              final QueryableStoreType<T> queryableStoreType,
                            "not available on this instance");
  final StoreQueryParams }storeQueryParams) {
	}


Proposed Changes:

  • Add a new public class StoreQueryParams.java to set options for what kind of stores a user wants.
  • Create a taskId from the combination of store name and partition provided by the user as shown in StreamThreadStateStoreProvider#createKeyTaskId() above.
  • In StreamThreadStateStoreProvider.java return only the stores for the task requested by the user and also check the condition to return only running stores or standby/recovering stores as well.

...

Compatibility, Deprecation, and Migration Plan:

  • As a part of KIP-535  KIP the functions QueryableStoreProvider#getStore() and StreamThreadStateStoreProvider#stores() have already been changed so we will overwrite those functions once more before merging shipping both the features together in 2.5.0535, we overloaded KafkaStreams#store() which will be changed to include the StoreQueryParams rather than includeStaleStores flag, so no deprecation required.

Rejected Alternatives:

  • Overload the QueryableStoreProvider#getStore() and StreamThreadStateStoreProvider#stores() with new parameters to pass a list of partitions along with the currently passed flag includeStaleStores.