THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
public class StateStoreClosedException extends InvalidStateStoreException
public class StateStoreMigratedException extends InvalidStateStoreException
public class StateStoreRetryableException extends InvalidStateStoreException
public class StateStoreFailException extends InvalidStateStoreException |
Call Trace
Expand |
---|
...
|
...
|
...
Expand | ||
---|---|---|
| ||
|
Expand | ||
---|---|---|
|
...
| ||
Expand | ||
---|---|---|
| ||
|
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType) {
validateIsRunning();
try {
return queryableStoreProvider.getStore(storeName, queryableStoreType);
} catch (InvalidStateStoreException e) {
if (state==State.RUNNING || state==State.REBALANCING) {
if (e instanceof StateStoreClosedException)
throw new StateStoreRetryableException(e);
else // e instanceof StateStoreMigratedException
throw e;
} else {
// state==State.PENDING_SHUTDOWN || state==State.ERROR || state==State.NOT_RUNNING
throw new StateStoreFailException(e);
}
}
} |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public <T> T getStore(final String storeName, final QueryableStoreType<T> queryableStoreType) {
final List<T> globalStore = globalStoreProvider.stores(storeName, queryableStoreType);
if (!globalStore.isEmpty()) {
return queryableStoreType.create(streams,
new WrappingStoreProvider(Collections.<StateStoreProvider>singletonList(globalStoreProvider)),
storeName);
}
final List<T> allStores = new ArrayList<>();
for (StateStoreProvider storeProvider : storeProviders) {
allStores.addAll(storeProvider.stores(storeName, queryableStoreType));
}
if (allStores.isEmpty()) {
// throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
throw new StateStoreMigratedException("the state store, " + storeName + ", may have migrated to another instance.");
}
return queryableStoreType.create(streams,
new WrappingStoreProvider(storeProviders),
storeName);
} |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) {
final StateStore store = globalStateStores.get(storeName);
if (store == null || !queryableStoreType.accepts(store)) {
return Collections.emptyList();
}
if (!store.isOpen()) {
// Before:
// throw new InvalidStateStoreException("the state store, " + storeName + ", is not open.");
throw new StateStoreClosedException("the state store, " + storeName + ", is not open.");
}
return (List<T>) Collections.singletonList(store);
} |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) {
if (streamThread.state() == StreamThread.State.DEAD) {
return Collections.emptyList();
}
if (!streamThread.isRunningAndNotRebalancing()) {
// Before:
// throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
throw new StateStoreMigratedException("the state store, " + storeName + ", may have migrated to another instance.");
}
final List<T> stores = new ArrayList<>();
for (Task streamTask : streamThread.tasks().values()) {
final StateStore store = streamTask.getStore(storeName);
if (store != null && queryableStoreType.accepts(store)) {
if (!store.isOpen()) {
// Before:
// throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
throw new StateStoreMigratedException("the state store, " + storeName + ", may have migrated to another instance.");
}
stores.add((T) store);
}
}
return stores;
} |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public <T> List<T> stores(final String storeName, QueryableStoreType<T> type) {
final List<T> allStores = new ArrayList<>();
for (StateStoreProvider provider : storeProviders) {
final List<T> stores =
provider.stores(storeName, type);
allStores.addAll(stores);
}
if (allStores.isEmpty()) {
// Before: throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
throw new StateStoreMigratedException("the state store, " + storeName + ", may have migrated to another instance.");
}
return allStores;
} |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
void validateStoreOpen() {
if (!innerState.isOpen()) {
// Before: throw new InvalidStateStoreException("Store " + innerState.name() + " is currently closed.");
throw new StateStoreClosedException("Store " + innerState.name() + " is currently closed.");
}
} |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
private void validateStoreOpen() {
if (!open) {
// Before: throw new InvalidStateStoreException("Store " + this.name + " is currently closed");
throw new StateStoreClosedException("Store " + this.name + " is currently closed");
}
} |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public V get(final K key) {
Objects.requireNonNull(key);
final List<ReadOnlyKeyValueStore<K, V>> stores;
try {
try {
stores = storeProvider.stores(storeName, storeType);
} catch (StateStoreClosedException e) {
if (streams.state()== KafkaStreams.State.RUNNING || streams.state()== KafkaStreams.State.REBALANCING)
throw new StateStoreRetryableException(e);
else
throw e;
}
for (ReadOnlyKeyValueStore<K, V> store : stores) {
try {
final V result = store.get(key);
if (result != null) {
return result;
}
} catch (StateStoreClosedException e) {
// Before: throw new InvalidStateStoreException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata.");
throw new StateStoreMigratedException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata.");
}
}
} catch (InvalidStateStoreException e) {
if (streams.state()== KafkaStreams.State.PENDING_SHUTDOWN
|| streams.state()== KafkaStreams.State.ERROR
|| streams.state()==KafkaStreams.State.NOT_RUNNING) {
throw new StateStoreFailException(e);
} else
throw e;
}
return null;
} |
Expand | ||
---|---|---|
| ||
|
...