Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleSourceReader reading methods
interface SourceReader {

    void start() throws IOException;

	    CompletableFuture<?> available() throws IOException;

	
    ReaderStatus emitNext(SourceOutput<E> output) throws IOException;


	    void addSplits(List<SplitT> splits) throws IOException;


	    List<SplitT> snapshotState();
}

...

Code Block
languagejava
titlereader loop pseudo code
linenumberstrue
final BlockingQueue<Runnable> mailbox = new LinkedBlockingQueue<>();


final SourceReader<T> reader = ...;


final Runnable readerLoop = new Runnable() {


    public void run() {
        while (true) {
            ReaderStatus status = reader.emitNext(output);
            if (status == MORE_AVAILABLE) {
                if (mailbox.isEmpty()) {
                    continue;
                }
                else {
                    addReaderLoopToMailbox();
                    break;
                }
            }
            else if (status == NOTHING_AVAILABLE) {
                reader.available().thenAccept((ignored) -> addReaderLoopToMailbox());
                break;
            }
            else if (status == END_OF_SPLIT_DATA) {
                break;
            }
        }
    };


    void addReaderLoopToMailbox() {
        mailbox.add(readerLoop);
    }


    void triggerCheckpoint(CheckpointOptions options) {
        mailbox.add(new CheckpointAction(options));
    }


    void taskMainLoop() {
        while (taskAlive) {
            mailbox.take().run();
        }
    }
}

...