Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Here we use a specialized reduce function that that handles the boundary of the round. However, outside of the iteration we might already be able to do the reduce with windowAllI().reduce(), to reuse the operators outside of the iteration, we could use the forEachRound utility method. 

Code Block
languagejava
 			DataStreamList resultStreams = 
            IterationUtils.iterateBoundedStreamsUntilTermination(
				DataStreamList.of(initParameters), 
				ReplayableDataStreamList.of(noReplay(dataset)), 
				IterationConfig.newBuilder().setOperatorRoundMode(ALL_ROUND).build();
				(variableStreams, dataStreams) -> {
                DataStream<double[]> parameterUpdates = variableStreams.get(0);
                	DataStream<Tuple2<double[], Double>> dataset = dataStreams.get(0);

                	SingleOutputStreamOperator<double[]> parameters = parameterUpdates.process(new ParametersCacheFunction());
                	DataStream<double[]> modelUpdate = parameters.setParallelism(1)
                   		.broadcast()
	                    .connect(dataset)
    	                .coProcess(new TrainFunction())
        	            .setParallelism(10)
					
			     	DataStream<double[]> reduced = PerRoundGraphBuilder.forEachRound(DataStreamList.of(modelUpdate), streams -> {
						return streams.<double[]>get(0).windowAll().reduce((x, y) -> ArrayUtils.add(x, y));
					}).<double[]>get(0);
	
                	return new IterationBodyResult(DataStreamList.of(modelUpdate), DataStreamList.of(parameters.getSideOut(FINAL_MODEL_OUTPUT_TAG)));
            	});

This would be very helpful for complex scenarios that we could reuse the ability of the the current datastream and table operators. 



If instead we want to do asynchronous training, we would need to do the following change:

...