THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
@PublicEvolving public interface Sink<InputT> extends Serializable { ... current methods @PublicEvolving public interface InitContext { ... current methods boolean isObjectReuseEnabled(); <IN> TypeSerializer<IN> createInputSerializer(); <IN> TypeSerializer<IN> createSerializer(TypeInformation<IN> inType); JobID getJobId(); } } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<CommittableMessage<CommT>> implements OneInputStreamOperator<InputT, CommittableMessage<CommT>>, BoundedOneInput { private static class InitContextImpl implements Sink.InitContext { ... current methods private final StreamConfig operatorConfig; public InitContextImpl( StreamingRuntimeContext runtimeContext, ProcessingTimeService processingTimeService, MailboxExecutor mailboxExecutor, SinkWriterMetricGroup metricGroup, StreamConfig operatorConfig, @Nullable Long restoredCheckpointId) { this.runtimeContext = checkNotNull(runtimeContext); this.mailboxExecutor = checkNotNull(mailboxExecutor); this.processingTimeService = checkNotNull(processingTimeService); this.metricGroup = checkNotNull(metricGroup); this.restoredCheckpointId = restoredCheckpointId; this.operatorConfig = operatorConfig; } @Override public boolean isObjectReuseEnabled() { return runtimeContext.getExecutionConfig().isObjectReuseEnabled(); } @Override public <IN> TypeSerializer<IN> createInputSerializer() { return operatorConfig .<IN>getTypeSerializerIn(0, runtimeContext.getUserCodeClassLoader()) .duplicate(); } @Override public <IN> TypeSerializer<IN> createSerializer(TypeInformation<IN> inType) { return inType.createSerializer(runtimeContext.getExecutionConfig()); } @Override public JobID getJobId() { return runtimeContext.getJobId(); } } } |
...