Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Expand
titleCall trace 7: ReadOnlySessionStore#fetch(from, to)
  • CompositeReadOnlySessionStore#fetch(from, to) (v)
    • WrappingStoreProvider#stores() (v)
      • StreamThreadStateStoreProvider#stores() (v)
    • return new DelegatingPeekingKeyValueIterator<>()
  • DelegatingPeekingKeyValueIterator#hasNext() (v)
    • CompositKeyValueIterator#hasNext()
      • NextIteratorFunction#apply(store)
        • MeteredSessionStore#fetch(from, to)
          • MeteredSessionStore#findSession(from, to, 0, t)
            • CachingSessionStore#findSession(from, to, 0, t)
              • CachingSessionStore#validStoreOpen()
                AbstractStateStore#validStoreOpen() (v)
                • ChangeLoggingSessionBytesStore#isOpen()
                  • RocksDBSessionStore#isOpen()
                    AbstractStateStore#isOpen()
                    • RocksDBSegmentedBytesStore#isOpen()
                    • return
                  • return
                • return
              • ChangeLoggingSessionBytesStore#findSesisons()
                • RocksDBSessionStore#findSessions()
                  • RocksDBSegmentedBytesStore#fetch()
                    • SessionKeySchema#segmentsToSearch()
                      • Segments#segments() (v)
                        • RocksDBStore#isOpen()
                      • return new SegmentIterator()
                    • return new WrappedSessionStoreIterator()
                  • return
                • return
              • return new MergedSortedCacheSessionStoreIterator()
            • return MeteredWindowedKeyValueIterator()
          • return
        • return
      • MeteredWindowedKeyValueIterator#hasNext()
        • MergedSortedCacheSessionStoreIterator#hasNext()
          AbstractMergedSortedCacheStoreIterator#hasNext()
          • FilteredCacheIterator#hasNext()
          • WrappedSessionStoreIterator#hasNext()
            • SegmentIterator#hasNext()
              • Segment.range(from, to)
                RocksDBStore.range(from, to)
                • RocksDBStore.validateStoreOpen() (v)
                • return new RocksDBRangeIterator()
              • return
            • return
          • return
        • return
      • return
    • CompositeKeyValueIterator#next()
    • return
  • DelegatingPeekingKeyValueIterator#next()
    • return keyvalue

 

 

 

Changed Classes

Code Block
languagejava
public interface QueryableStoreType<T> {
    T create(final KafkaStreams streams, final StateStoreProvider storeProvider, final String storeName);
}
Code Block
languagejava
titleKafkaStreams#store()
collapsetrue
public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType) {
    validateIsRunning();
    return queryableStoreProvider.getStore(storeName, queryableStoreType);
}
Code Block
languagejava
titleQueryableStoreProvider#getStore()
collapsetrue
public <T> T getStore(final String storeName, final QueryableStoreType<T> queryableStoreType) {
    final List<T> globalStore = globalStoreProvider.stores(storeName, queryableStoreType);
    if (!globalStore.isEmpty()) {
        return queryableStoreType.create(new WrappingStoreProvider(Collections.<StateStoreProvider>singletonList(globalStoreProvider)), storeName);
    }
    final List<T> allStores = new ArrayList<>();
    for (StateStoreProvider storeProvider : storeProviders) {
        allStores.addAll(storeProvider.stores(storeName, queryableStoreType));
    }
    if (allStores.isEmpty()) {
        throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
    }
    return queryableStoreType.create(
            new WrappingStoreProvider(storeProviders),
            storeName);
}
Code Block
languagejava
titleGlobalStateStoreProvider#stores()
collapsetrue
public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) {
    final StateStore store = globalStateStores.get(storeName);
    if (store == null || !queryableStoreType.accepts(store)) {
        return Collections.emptyList();
    }
    if (!store.isOpen()) {
        throw new InvalidStateStoreException("the state store, " + storeName + ", is not open.");
    }
    return (List<T>) Collections.singletonList(store);
}
Code Block
languagejava
titleStreamThreadStateStoreProvider#stores()
collapsetrue
 public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) {
    if (streamThread.state() == StreamThread.State.DEAD) {
        return Collections.emptyList();
    }
    if (!streamThread.isRunningAndNotRebalancing()) {
        throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the stream thread is " +
                streamThread.state() + ", not RUNNING");
    }
    final List<T> stores = new ArrayList<>();
    for (Task streamTask : streamThread.tasks().values()) {
        final StateStore store = streamTask.getStore(storeName);
        if (store != null && queryableStoreType.accepts(store)) {
            if (!store.isOpen()) {
                throw new InvalidStateStoreException("Cannot get state store " + storeName + " for task " + streamTask +
                        " because the store is not open. The state store may have migrated to another instances.");
            }
            stores.add((T) store);
        }
    }
    return stores;
}
// TODO: 
// During user call one of following methods, we should check KafkaStreams state by the following rule when InvalidStateStoreException is thrown:
// * If state is RUNNING or REBALANCING
//   * StateStoreClosedException: should be wrapped to StateStoreRetriableException
//   * StateStoreMigratedException: should not be wrapped, directly thrown
// * If state is PENDING_SHUTDOW or ERROR or NOT_RUNNINGwrap InvalidStateStoreException to StateStoreFailException
//
 
public class KafkaStreams {
    public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType);
}
 
public class CompositeReadOnlyKeyValueStore<K, V> implements ReadOnlyKeyValueStore<K, V> {
    public V get(final K key);
    public KeyValueIterator<K, V> range(final K from, final K to);
    public KeyValueIterator<K, V> all();
    public long approximateNumEntries();
}
 
public class CompositeReadOnlySessionStore<K, V> implements ReadOnlySessionStore<K, V> {
    public KeyValueIterator<Windowed<K>, V> fetch(final K key);
    public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to);
}
 
public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K, V> {
    public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo);
    public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo);
    public KeyValueIterator<Windowed<K>, V> all();
    public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo);
}
 
class DelegatingPeekingKeyValueIterator<K, V> implements KeyValueIterator<K, V>, PeekingKeyValueIterator<K, V> {
    public synchronized boolean hasNext();
    public synchronized KeyValue<K, V> next();
    public KeyValue<K, V> peekNext();
}
 
Code Block
languagejava
public interface QueryableStoreType<T> {
    // TODO: pass stream instance parameter
    T create(final KafkaStreams streams, final StateStoreProvider storeProvider, final String storeName)
Code Block
languagejava
titleWrappingStoreProvider#stores()
collapsetrue
public <T> List<T> stores(final String storeName, QueryableStoreType<T> type) {
    final List<T> allStores = new ArrayList<>();
    for (StateStoreProvider provider : storeProviders) {
        final List<T> stores =
            provider.stores(storeName, type);
        allStores.addAll(stores);
    }
    if (allStores.isEmpty()) {
        throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
    }
    return allStores;
}
Code Block
languagejava
titleCompositeReadOnlyKeyValueStoreQueryableStoreProvider#getStore()
collapsetrue
public <T> VT getgetStore(final K key) {
String storeName, final QueryableStoreType<T> queryableStoreType) {
    final List<T> globalStore = globalStoreProvider.stores(storeName, queryableStoreType);
    if Objects(!globalStore.requireNonNullisEmpty(key)); {
     final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.stores(storeName, storeType);
    for (ReadOnlyKeyValueStore<K, V> store : stores) {return queryableStoreType.create(new WrappingStoreProvider(Collections.<StateStoreProvider>singletonList(globalStoreProvider)), storeName);
    }
    final List<T> allStores = new ArrayList<>();
    for (StateStoreProvider storeProvider : trystoreProviders) {
            final V result = store.get(keyallStores.addAll(storeProvider.stores(storeName, queryableStoreType));
    }
    if (allStores.isEmpty()) {
  if (result != null) {
  // TODO: Replace with StateStoreMigratedException
        throw new return result;
            }InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
    }
    } catch (InvalidStateStoreException e) {
 return queryableStoreType.create(
           throw new InvalidStateStoreException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata.");WrappingStoreProvider(storeProviders),
            storeName);
}
Code Block
languagejava
titleGlobalStateStoreProvider#stores()
collapsetrue
public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) {
    final StateStore store  }

    }
    return null;
}
 
public KeyValueIterator<K, V> range(final K from, final K to) {
    Objects.requireNonNull(from);
    Objects.requireNonNull(to);
    final NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K, V>> nextIteratorFunction = new NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K, V>>() {= globalStateStores.get(storeName);
    if (store == null || !queryableStoreType.accepts(store)) {
        return Collections.emptyList();
    }
    if (!store.isOpen()) {
        // TODO: Replace with StateStoreClosedException
        @Override
throw new InvalidStateStoreException("the state store, " + storeName public+ KeyValueIterator<K", V> apply(final ReadOnlyKeyValueStore<K, V> store) {is not open.");
    }
    return (List<T>) Collections.singletonList(store);
}
Code Block
languagejava
titleStreamThreadStateStoreProvider#stores()
collapsetrue
 public <T> List<T> stores(final String storeName, final QueryableStoreType<T> tryqueryableStoreType) {
    if (streamThread.state() == StreamThread.State.DEAD) {
         return storeCollections.rangeemptyList(from, to);
    }
    if (!streamThread.isRunningAndNotRebalancing()) {
  } catch (InvalidStateStoreException e) {
  // TODO: Replace with  StateStoreMigratedException
         throw new InvalidStateStoreException("State storeCannot get state store " + storeName + " because the stream thread is not" available+
 anymore and may have been migrated to another instance; please re-discover its location from the streamThread.state metadata.() + ", not RUNNING");
    }
    final List<T> stores  }= new ArrayList<>();
    for (Task streamTask :  }streamThread.tasks().values()) {
    };
    final List<ReadOnlyKeyValueStore<K, V>> storesStateStore store = storeProviderstreamTask.storesgetStore(storeName, storeType);
       return newif DelegatingPeekingKeyValueIterator<>(storeName, new CompositeKeyValueIterator<>(stores.iterator(), nextIteratorFunction));
}

public KeyValueIterator<K, V> all()(store != null && queryableStoreType.accepts(store)) {
            if (!store.isOpen()) {
    final NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K, V>> nextIteratorFunction = new NextIteratorFunction<K, V, ReadOnlyKeyValueStore<K, V>>() {
     // TODO: Replace with StateStoreClosedException
    @Override
        public KeyValueIterator<K, V> apply(final ReadOnlyKeyValueStore<K,throw V> store) {
            try {new InvalidStateStoreException("Cannot get state store " + storeName + " for task " + streamTask +
                return store.all();
       " because the store is }not catchopen. (InvalidStateStoreExceptionThe e)state {
store may have migrated to another instances.");
          throw new InvalidStateStoreException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata.");
            }
        }
    } }
            stores.add((T) store);
        }
    }
    return stores;
}
Code Block
languagejava
titleWrappingStoreProvider#stores()
collapsetrue
public <T> List<T> stores(final String storeName, QueryableStoreType<T> type) {
    final List<T> allStores = new ArrayList<>();
    final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.stores(storeName, storeType);
for (StateStoreProvider provider : storeProviders) {
      return new DelegatingPeekingKeyValueIterator<>(storeName, new CompositeKeyValueIterator<>(stores.iterator(), nextIteratorFunction));
}
 
public long approximateNumEntries() {
final List<T> stores =
       final List<ReadOnlyKeyValueStore<K, V>> stores = storeProviderprovider.stores(storeName, storeTypetype);
    long total = 0;
    for (ReadOnlyKeyValueStore<K, V> store : stores) {allStores.addAll(stores);
    }
    total += store.approximateNumEntries();if (allStores.isEmpty()) {
        if (total < 0) {// Replace with StateStoreMigratedException
        throw new InvalidStateStoreException("the  return Long.MAX_VALUE;
        }state store, " + storeName + ", may have migrated to another instance.");
    }
    return totalallStores;
}
Code Block
languagejava
titleAbstractStateStore#validateStoreOpen()
collapsetrue
void validateStoreOpen() {
    if (!innerState.isOpen()) {)) {
        // TODO: Replace with StateStoreClosedException
        throw new InvalidStateStoreException("Store " + innerState.name() + " is currently closed.");
    }
}
Code Block
languagejava
titleRocksDBStore#validateStoreOpen()
collapsetrue
private void validateStoreOpen() {
    if (!open) {
        // TODO: Replace with StateStoreClosedException
        throw new InvalidStateStoreException("Store " + this.name + " is currently closed");
    }
}
Code Block
languagejava
titleRocksDBIterator#hasNext()
collapsetrue
public synchronized boolean hasNext() {
    if (!open) {
        // TODO: Replace with StateStoreClosedException
        throw new InvalidStateStoreException(String.format("RocksDB store %s has closed", storeName));
    }

    return iter.isValid();
}
Code Block
languagejava
titleDelegatingPeekingKeyValueIterator#hasNext()
collapsetrue
public synchronized boolean hasNext() {
    if (!open) {
        // TODO: Replace with StateStoreClosedException
        throw new InvalidStateStoreException(String.format("Store %s has closed", storeName));
    }
    if (next != null) {
        return true;
    }

    if (!underlying.hasNext()) {
        return false;
    }

    next = underlying.next();
    return true;
}

...