Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleSink.InitContext
@PublicEvolving
public interface Sink<InputT> extends Serializable {
    ... current methods
  
    @PublicEvolving
    public interface InitContext {
        ... current methods           

        boolean isObjectReuseEnabled();

        <IN> TypeSerializer<IN> createInputSerializer();

        <IN> TypeSerializer<IN> createSerializer(TypeInformation<IN> inType);          
      
        JobID getJobId();    
    }
}

...

Code Block
languagejava
titleSinkWriterOperator.InitContextImpl
class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<CommittableMessage<CommT>>
        implements OneInputStreamOperator<InputT, CommittableMessage<CommT>>, BoundedOneInput {


	private static class InitContextImpl implements Sink.InitContext {
    	... current methods
  
    	private final StreamConfig operatorConfig;

    	public InitContextImpl(
        	        StreamingRuntimeContext runtimeContext,
        	        ProcessingTimeService processingTimeService,
                	MailboxExecutor mailboxExecutor,
                	SinkWriterMetricGroup metricGroup,
                	StreamConfig operatorConfig,
                	@Nullable Long restoredCheckpointId) {
            this.runtimeContext = checkNotNull(runtimeContext);
            this.mailboxExecutor = checkNotNull(mailboxExecutor);
            this.processingTimeService = checkNotNull(processingTimeService);
            this.metricGroup = checkNotNull(metricGroup);
            this.restoredCheckpointId = restoredCheckpointId;
            this.operatorConfig = operatorConfig;
		}

		@Override
    	public boolean isObjectReuseEnabled() {
        	return runtimeContext.getExecutionConfig().isObjectReuseEnabled();
   	 	}

    	@Override
    	public <IN> TypeSerializer<IN> createInputSerializer() {
        	return operatorConfig
                	.<IN>getTypeSerializerIn(0, runtimeContext.getUserCodeClassLoader())
                	.duplicate();
    	}

    	@Override
    	public <IN> TypeSerializer<IN> createSerializer(TypeInformation<IN> inType) {
        	return inType.createSerializer(runtimeContext.getExecutionConfig());
    	}

    	@Override
    	public JobID getJobId() {
        	return runtimeContext.getJobId();
    	}
	}
}

...