...
Page properties | |||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The goal of this FLIP is to add additional fields to Sink#InitContext
to expose ExecutionConfig, or at least the TypeSerializer from input, ExecutionConfig#isObjectReuseEnabled and JobID.
This is necessary to migrate the current JdbcSink to the SinkV2 (FLIP-239 -> FLINK-25421) more specifically in order to be able to reuse the current code:
- the JdbcOutputFormat which today uses RuntimeContext to get the ExecutionConfig#isObjectReuseEnabled.
- the SemanticXidGenerator (for Xa-Sink) needs JobID to create the Xid.
- the use of TypeInformation.createSerializer.
This would be necessary to another connectors as well.
Public Interfaces
- We want to extend the
Sink#InitContext
to provide the ExecutionConfiga TypeSerializer, ObjectReuse and JobID.
Proposed Changes
We propose to expose the ExecutionConfig, so that all configs on this will be available, but we could only expose the isObjectReuse if we want to expose only the required configuration.
Code Block | ||||
---|---|---|---|---|
| ||||
@PublicEvolving public interface Sink<InputT> extends Serializable { ... current methods @PublicEvolving public interface InitContext { ... current methods boolean isObjectReuseEnabled(); <IN> TypeSerializer<IN> createInputSerializer(); @PublicEvolving JobID getJobId(); public interface InitContext { } } |
Code Block | ||||
---|---|---|---|---|
| ||||
class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<CommittableMessage<CommT>> implements OneInputStreamOperator<InputT, CommittableMessage<CommT>>, BoundedOneInput { private static class InitContextImpl implements Sink.InitContext { ... current methods private final StreamConfig operatorConfig; public InitContextImpl( /** StreamingRuntimeContext runtimeContext, ProcessingTimeService processingTimeService, MailboxExecutor mailboxExecutor, * Returns the {@link org.apache.flink.api.common.ExecutionConfig} for the currently executing * job. */SinkWriterMetricGroup metricGroup, StreamConfig operatorConfig, @Nullable Long restoredCheckpointId) { this.runtimeContext = checkNotNull(runtimeContext); this.mailboxExecutor = checkNotNull(mailboxExecutor); this.processingTimeService = checkNotNull(processingTimeService); this.metricGroup = checkNotNull(metricGroup); this.restoredCheckpointId = restoredCheckpointId; this.operatorConfig = operatorConfig; } @Override public boolean isObjectReuseEnabled() { return runtimeContext.getExecutionConfig().isObjectReuseEnabled(); } @Override public <IN> TypeSerializer<IN> createInputSerializer() { return operatorConfig .<IN>getTypeSerializerIn(0, runtimeContext.getUserCodeClassLoader()) .duplicate(); } @Override public JobID getJobId() { ExecutionConfig getExecutionConfig return runtimeContext.getJobId(); } } } |
Compatibility, Deprecation, and Migration Plan
- The change only one additional method so no impact on existing code.
Test Plan
These simple API changes will be covered by extending unit and integration tests.
Rejected Alternatives
...
- None for the moment