Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleThe IterationBody API
linenumberstrue
/** The iteration body specify the sub-graph inside the iteration. */
public interface IterationBody {

	/** This annotation marks the function as it would builds the subgraph. */
	@Target(ElementType.METHOD)
	@Retention(RetentionPolicy.RUNTIME)
	public @interface IterationFunction {}

	/** 
	* This annotation marks a parameter of the iteration function as an input to the subgraph. 
	* The input is the union of the initial inputs bound to the iteration and the corresponding 
	* feedback. 
	*/
	@Target(ElementType.PARAMETER)
	@Retention(RetentionPolicy.RUNTIME)
	public @interface IterationInput {
		String value();
	}
}

/** An example usage for the iteration body with two inputs. */
new IterationBody() {
	@IterationFunction
	UnboundedIterationDeclarative iterate(
		@IterationInput("first") DataStream<Integer> first
		@IterationInput("second") DataStream<> second,
	) {
		DataStream<Integer> feedBack1 = ...;
		DataStream<String> output1 = ...;
		return new UnboundedIterationDeclarationBuilder()
			.withFeedback("first", feedBack1)
			.withOutput("output1", output1)
			.build();
		}
	}

...

Code Block
languagejava
titleUnbounded Iteration API
linenumberstrue
/** Builder for the unbounded iteration. */
public class UnboundedIteration {

	/** Set the body of the iteration. */
    UnboundedIteration withBody(IterationBody body) {...}
	
	/** Bind the initial input with the specific name. */
	UnboundedIteration bindInput(String name, DataStream<?> input) {...}

	ResultStreams apply/** Generates and adds the subgraph corresponding to the iteration.  */
	ResultStreams build() {...}
}

/** The expected return type of the iteration function, which specifies the feedbacks and outputs. */
public class UnboundedIterationDeclaration {
	
	public static class Builder {

		public Builder withFeedback(String name, DataStream<?> feedback) {...}

		public Builder withOutput(String name, DataStream<?> output) {...}
	
		UnboundedIterationDeclaration build() {...}
	}
}

/** The map of the output streams of an iteration.  */
public class ResultStreams {

	public <T> DataStream<T> getStream(String name/** 
		* Specify the feedback corresponding to the specific name. The feedback would
		* be union with the initial input with the same name to provide to the iteration
		* body. 
		*/
		public Builder withFeedback(String name, DataStream<?> feedback) {...}

}

To avoid more data is read from the inputs while too much data accumulate inside the iteration, the iteration would first process the feedback data if both side of data is available. 

For termination detection, the iteration would continue until

  1. All the inputs are terminated.
  2. And there is no records inside the iteration subgraph. 

Then the iteration terminates.

Bounded Iteration

As mentioned in the motivation, the existing dataset iteration API uses the "per-round" semantics: it views the iteration as a repeat execution of the same DAG, thus underlying it would automatically merge the inputs and feedbacks and replay the inputs without feedbacks, and the operators inside the iteration live only for one-round. This might cause bad performance for some algorithms who could cache these data in a more efficient way. 

To avoid this issue, similar to the unbounded iteration, by default we use the "per-iteration" semantics: 

		/** Specify one output with the specific name. */
		public Builder withOutput(String name, DataStream<?> output) {...}
		
		/** Generate the Declaration. */
		UnboundedIterationDeclaration build() {...}
	}
}

/** The map of the output streams of an iteration.  */
public class ResultStreams {
	
	/** Gets the DataStream with the specific name. */
	public <T> DataStream<T> getStream(String name) {...}

}

To avoid more data is read from the inputs while too much data accumulate inside the iteration, the iteration would first process the feedback data if both side of data is available. 

For termination detection, the iteration would continue until

  1. All the inputs are terminated.
  2. And there is no records inside the iteration subgraph. 

Then the iteration terminates.

Bounded Iteration

As mentioned in the motivation, the existing dataset iteration API uses the "per-round" semantics: it views the iteration as a repeat execution of the same DAG, thus underlying it would automatically merge the inputs and feedbacks and replay the inputs without feedbacks, and the operators inside the iteration live only for one-round. This might cause bad performance for some algorithms who could cache these data in a more efficient way. 

To avoid this issue, similar to the unbounded iteration, by default we use the "per-iteration" semantics: 

  1. Operators inside the iteration would live till the whole iteration is finished.
  2. We do not automatically merge the inputs and feedbacks. Instead, we union the original inputs and the feedbacks
  3. Operators inside the iteration would live till the whole iteration is finished.
  4. We do not automatically merge the inputs and feedbacks. Instead, we union the original inputs and the feedbacks so that users could decide how the merge them.
  5. We do not replay the inputs without feedbacks. Users could decide to how to cache them more efficiently. 

...

Code Block
languagejava
titleBounded Iteration API
linenumberstrue
/** Builder for the bounded iteration. */
public class BoundedIteration {
	
	/** Set the body of the iteration. */
    BoundedIteration withBody(IterationBody body) {...}

	/** Bind the initial input with the specific name. */
	BoundedIteration bindInput(String name, DataStream<?> input) {...}
	
	/** Generates and adds the subgraph corresponding to the iteration.  */
	ResultStreams build() {...}
}

/** The expected return type of the iteration function, which specifies the feedbacks, outputs and termination conditions. */
public class BoundedIterationDeclaration {
	
	public static class Builder {
		
		/** 
		* Specify the feedback corresponding to the specific name. The feedback would
		* be union with the initial input with the same name to provide to the iteration
		* body. 
		*/
		public Builder withFeedback(String name, DataStream<?> feedback) {...}

		/** Specify one output with the specific name. */
		public Builder withOutput(String name, DataStream<?> output) {...}
		
		/** Specify the termination condition of the iteration. */
		<U> Builder until(TerminationCondition terminationCondition) { ... }
		
		/** Generate the Declaration. */
		BoundedIterationDeclaration build() {...}
	}
}

/** 
* The termination condition judges if iteration should stop based on the round number
* or the records of a given data stream in one round. 
* 
* Since there might be asynchronous iteration that multiple rounds are executed in parallel,
* Builder for the condition is evaluated at the boundedend iteration. */
public class BoundedIteration {

    BoundedIteration withBody(IterationBody body) {...}

	BoundedIteration bindInput(String name, DataStream<?> input) {...}

	ResultStreams apply() {...}
}

/** The expected return type of the iteration function, which specifies the feedbacks, outputs and termination conditions. of each round. If it is evaluated to true for one
* round, then the iteration would desert the feedback records for the following rounds, but
* the already emitted records would not be withdrawal. 
*/
public class BoundedIterationDeclarationTerminationCondition {
	
	public static class Builder {

		public Builder withFeedback(String name, DataStream<?> feedback) {...}

		public Builder withOutput(String name, DataStream<?> output) {...}

		<U> Builder until(TerminationCondition terminationCondition) { ... }
	
		BoundedIterationDeclaration build() {...}
	}
}

/** The termination condition judges if iteration should stop based on the round or the records of a data stream in one round. */
public class TerminationCondition {
	
	@Nullable DataStream<?> refStream;

/**
	* The records of the given DataStream will be collected in each
	* round to be used in judging whether the loop should terminate.
	*/
	@Nullable DataStream<?> refStream;

	/**
	* A user-defined function that is evaluated at the end of each round.
	*/
	Function<Context, Boolean> isConverged;

	TerminationCondition(DataStream<?> refStream

	interface Context {

interface Context {
		
		/** The round number. */
     	int[] getRound();

] getRound();
		
		/** The list of records of the referred stream. */
     	<T> List<T> getStreamRecords();
  	}	
}

/** The progress tracking interface for UDF / Operator */
public interface BoundedIterationProgressListener<T> {
	
	/** Sets a tool to be used to query the round number of the current record. */
	default void setCurrentRecordRoundsQuerier(Supplier<int[]> querier) {}
	
	/** Notified at the end of each round. */
	void onRoundEnd(int[] round, Context context, Collector<T> collector);
	
	/** Notified at the end of the whole iteration. */
	default void onIterationEnd(int[] rounds, Context context) {}

    public interface Context {
		
		<X> void output(OutputTag<X> outputTag, X value);
		
		Long timestamp();

		TimerService timerService();
	}
}

/** The utility methods to support per-round semantics. */
public class BoundedIterationPerRoundUtils {
	/** The builder that creates the subgraph that executed with the per-round semantics **/	
	public interface EachRound {
	
		Map<String, DataStream<?>> executeInEachRound();

	}
	
	/** Create a subgraph inside the iteration that execute with per-round semantics */
	static ResultStreams forEachRound(EachRound eachRoundBuilder);

	/** A cache to a Datastream. For each round it replaces the Datastream with the records received in this round and outputs all these records. */	
	static <T> ProcessFunction bulkCache(DataStream<T> inputStream);

	/** A cache to a KeyedStream. For each round it updates the KeyedStream with the records received in this round and outputs the updated records. */	
	static <K, T> ProcessFunction deltaCache(KeyedStream<K, T> inputStream);
}

...