THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
interface SourceReader { void start() throws IOException; CompletableFuture<?> available() throws IOException; ReaderStatus emitNext(SourceOutput<E> output) throws IOException; void addSplits(List<SplitT> splits) throws IOException; List<SplitT> snapshotState(); } |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
final BlockingQueue<Runnable> mailbox = new LinkedBlockingQueue<>(); final SourceReader<T> reader = ...; final Runnable readerLoop = new Runnable() { public void run() { while (true) { ReaderStatus status = reader.emitNext(output); if (status == MORE_AVAILABLE) { if (mailbox.isEmpty()) { continue; } else { addReaderLoopToMailbox(); break; } } else if (status == NOTHING_AVAILABLE) { reader.available().thenAccept((ignored) -> addReaderLoopToMailbox()); break; } else if (status == END_OF_SPLIT_DATA) { break; } } }; void addReaderLoopToMailbox() { mailbox.add(readerLoop); } void triggerCheckpoint(CheckpointOptions options) { mailbox.add(new CheckpointAction(options)); } void taskMainLoop() { while (taskAlive) { mailbox.take().run(); } } } |
...