...
Before introducing the specific changes, let's first look at what the simplest job(increase every record by one) developed with the new API looks like:
Code Block | ||||
---|---|---|---|---|
| ||||
// create environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// create a stream from source
env.fromSource(someSource)
// map every element x to x + 1. This is just to show the API as comprehensively as possible. In fact, we can use lambda expressions instead.
.process(new SingleStreamProcessFunction<Integer, Integer>() {
@Override
public void processRecord(
Integer x,
Collector<Integer> output)
throws Exception {
output.collect(x + 1);
}
})
// If the sink does not support concurrent writes, we can force the stream to one partition
.global()
// sink the stream to some sink
.toSink(someSink);
// execute the job
env.execute()// create environment |
It can be seen that in addition to the three core concepts mentioned earlier, we also need some additional work: such as creating and executing the job, and adding source and sinkIt can be seen that in addition to the three core concepts mentioned earlier, we also need some additional work: such as creating and executing the job, and adding source and sink.
ExecutionEnvironment
ExecutionEnvironment is the start and stop point of user application. It provides methods to create and execute job.
...