THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
DataStream<String> input = ... // User's HBaseAsyncFunc doesn't change. AsyncFunction<String, Tuple<String, String>> asyncFunc = new HBaseAsyncFunc... // Create an async retry strategy via utility class or a user defined strategy. AsyncRetryStrategy asyncRetryStrategy = new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 100L) // maxAttempts=3, fixedDelay=100ms .retryIfResult(RetryPredicates.EMPTY_RESULT_PREDICATE) .retryIfException(RetryPredicates.HAS_EXCEPTION_PREDICATE) .build(); // ordered output mode AsyncDataStream.orderedWaitWithRetyorderedWaitWithRetry(input, asyncFunc, timeout, TimeUnit.MILLISECONDS, 100, asyncRetryStrategy); // or use unordered output mode |
...