Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: update for call trace 1,2

...

Call Trace 1: KafkaStreams#store()

Expand
titleExpand...
  • KafkaStreams#store()

    ...

      • QueryableStoreProvider#getStore()

      ...

          • GlobalStateStore#stores()
          • StreamThreadStateStroeProvider#stores()


      Call Trace 2: CompositeReadOnlyKeyValueStore#get()

      Expand
      titleExpand...
      • CompositeReadOnlyKeyValueStore#get()
        • WrappingStoreProvider#stores()
          • StreamThreadStateStoreProvider#stores()
        • MeteredKeyValueBytesStore#get()
          • InnerMeteredKeyValueStore#get()
            • CachingKeyValueStore#get()
              • WrappedStateStore.AbstractStateStore#validateStoreOpen()
                • RocksDBStore#isOpen()
            • CachingKeyValueStore#getInternal()
              • ChangeLoggingKeyValueBytesStore#get()
                • RocksDBStore#get()
                  • RocksDBStore#validateStoreOpen()
                • RocksDBStore#getInternal()

       

       

       

      Code Block
      languagejava
      titleGlobalStateStoreProvider#storesKafkaStreams#store()
      collapsetrue
      public <T> List<T>T storesstore(final String storeName, final QueryableStoreType<T> queryableStoreType) {
          validateIsRunning();
          try {
          final  StateStore store =return globalStateStoresqueryableStoreProvider.getgetStore(storeName, queryableStoreType);
          } catch (InvalidStateStoreException e) {
              if (store state==State.RUNNING null || !queryableStoreType.accepts(store)state==State.REBALANCING) {
              return Collections.emptyList();
          if (e instanceof StateStoreClosedException)
               }
             throw ifnew StateStoreRetryableException(!store.isOpen()) {
      e);
                  else // Before:
      e instanceof StateStoreMigratedException
                //      throw new InvalidStateStoreException("the state store, " + storeName + ", is not open.");
       e;
              } else {
                  //  throw new StateStoreClosedException("the state store, " + storeName + ", is not open."state==State.PENDING_SHUTDOWN || state==State.ERROR || state==State.NOT_RUNNING
                  throw new StateStoreFailException(e);
              }
          return (List<T>) Collections.singletonList(store);}
      }
      Code Block
      languagejava
      titleStreamThreadStateStoreProvider#storesQueryableStoreProvider#getStore()
      collapsetrue
      public <T> List<T>T storesgetStore(final String storeName, final QueryableStoreType<T> queryableStoreType) {
          if (streamThread.state() == StreamThread.State.DEADfinal List<T> globalStore = globalStoreProvider.stores(storeName, queryableStoreType);
          if (!globalStore.isEmpty()) {
              return queryableStoreType.create(streams,
                      new WrappingStoreProvider(Collections.emptyList(<StateStoreProvider>singletonList(globalStoreProvider)),
                      storeName);
          }
           if (!streamThread.isRunningAndNotRebalancing()) {final List<T> allStores = new ArrayList<>();
          for (StateStoreProvider storeProvider : storeProviders) {
              allStores.addAll(storeProvider.stores(storeName, queryableStoreType));
          }
          // Before: if (allStores.isEmpty()) {
              //   throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
              throw new StateStoreMigratedException("the state store, " + storeName + ", may have migrated to another instance.");
          }
           final List<T> stores =return queryableStoreType.create(streams,
                  new ArrayList<>WrappingStoreProvider(storeProviders);,
          for (Task streamTask : streamThread.tasks().values()) {
                      storeName);
      }
      Code Block
      languagejava
      titleGlobalStateStoreProvider#stores()
      collapsetrue
      public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) {
          final StateStore store = streamTaskglobalStateStores.getStoreget(storeName);
              if (store !== null &&|| !queryableStoreType.accepts(store)) {
              return Collections.emptyList();
          }
          if (!store.isOpen()) {
                      // Before:
                      //   throw new InvalidStateStoreException("the state store, " + storeName + ", mayis have migrated to another instancenot open.");
                      throw new StateStoreMigratedExceptionStateStoreClosedException("the state store, " + storeName + ", mayis have migrated to another instancenot open.");
                  }
          return        stores.add((T) store);
              }
          }
          return stores(List<T>) Collections.singletonList(store);
      }
      Code Block
      languagejava
      titleKafkaStreams#storeStreamThreadStateStoreProvider#stores()
      collapsetrue
      public <T> TList<T> storestores(final String storeName, final QueryableStoreType<T> queryableStoreType) {
          if validateIsRunning(streamThread.state();
       ==   try StreamThread.State.DEAD) {
              return queryableStoreProviderCollections.getStoreemptyList(storeName, queryableStoreType);
          }
         catch if (InvalidStateStoreException e!streamThread.isRunningAndNotRebalancing()) {
              if (state==State.RUNNING || state==State.REBALANCING) {
      // Before: 
              //   throw new InvalidStateStoreException("the state ifstore, (e" instanceof+ StateStoreClosedException)
      storeName + ", may have migrated to another instance.");
              throw new StateStoreRetryableException(e);
                  else // e instanceof StateStoreMigratedException
                      throw e;StateStoreMigratedException("the state store, " + storeName + ", may have migrated to another instance.");
          }
          final List<T> stores = new ArrayList<>();
          for (Task streamTask : streamThread.tasks().values()) {
              }final elseStateStore {
      store = streamTask.getStore(storeName);
              if (store // state==State.PENDING_SHUTDOWN || state==State.ERROR || state==State.NOT_RUNNING!= null && queryableStoreType.accepts(store)) {
                  throw new StateStoreFailException(e);if (!store.isOpen()) {
              }
              }
      }

       

      Call Trace 2: CompositeReadOnlyKeyValueStore#get()

      ...

      // Before:
               

      ...

       

      ...

        

      ...

       

      ...

         // 

      ...

       

      ...

       throw new InvalidStateStoreException("the state store, 

      ...

      " + storeName + ", may have migrated to another instance.");
                

      ...

       

      ...

           throw 

      ...

      new 

      ...

      StateStoreMigratedException("the state store, " + storeName + ", may have migrated to another instance.");
                

      ...

       

      ...

       }
                 

      ...

       stores.add((T) store);
              }
          }
        

      ...

        return stores;
      }
      Code Block
      languagejava
      titleWrappingStoreProvider#stores()
      collapsetrue
          public <T> List<T> stores(final String storeName, QueryableStoreType<T> type) {
              final List<T> allStores = new ArrayList<>();
              for (StateStoreProvider provider : storeProviders) {
                  final List<T> stores =
                      provider.stores(storeName, type);
                  allStores.addAll(stores);
              }
              if (allStores.isEmpty()) {
                  // Before: throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
                  throw new StateStoreMigratedException("the state store, " + storeName + ", may have migrated to another instance.");
              }
              return allStores;
          }

      ...