Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.


Page properties

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

here (<- link to /list.html? (<- link to issuesjira/browse/FLINK-XXXX)
Discussion threadhereVote thread
Vote threadJIRA

serverASF JIRA


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


The goal of this FLIP is to add additional fields to Sink#InitContext to expose ExecutionConfig (or at least the TypeSerializer from input, ExecutionConfig#isObjectReuseEnabled ) and JobID.

This is necessary to migrate the current JdbcSink to the SinkV2 (FLIP-239 -> FLINK-25421) more specifically in order to be able to reuse the current code:

  • the JdbcOutputFormat which today uses RuntimeContext to get the ExecutionConfig#isObjectReuseEnabled. 
  • the SemanticXidGenerator (for Xa-Sink) needs JobID to create the Xid.
  • the use of TypeInformation.createSerializer.

This would be necessary to another connectors as well, for example KafkaTableSink uses isObjectReuseEnabled too.

Public Interfaces

  • We want to extend the Sink#InitContext to provide the ExecutionConfig a TypeSerializer, ObjectReuse and JobID.

Proposed Changes

We propose to expose the ExecutionConfig, so that all configs on this will be available, but we could only expose the isObjectReuse if we want to expose only the required configuration.

Code Block
public interface Sink<InputT> extends Serializable {
    ... current methods
    public interface InitContext {
        ... current methods           

        boolean isObjectReuseEnabled();

    public interface InitContext {
    <IN> TypeSerializer<IN> createInputSerializer();   
        JobID getJobId();    

Code Block
class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<CommittableMessage<CommT>>
        implements OneInputStreamOperator<InputT, CommittableMessage<CommT>>, BoundedOneInput {

	private static class InitContextImpl implements Sink.InitContext {
    	... current methods
    	private final StreamConfig  /**  operatorConfig;

    	public InitContextImpl(
        	        StreamingRuntimeContext runtimeContext,
        	        ProcessingTimeService processingTimeService,
                	MailboxExecutor mailboxExecutor,
                	SinkWriterMetricGroup metricGroup,
                	StreamConfig operatorConfig,
               * 	@Nullable ReturnsLong therestoredCheckpointId) {@link org.apache.flink.api.common.ExecutionConfig} for the currently executing job.

            this.runtimeContext = checkNotNull(runtimeContext);
            this.mailboxExecutor = checkNotNull(mailboxExecutor);
            this.processingTimeService = checkNotNull(processingTimeService);
            this.metricGroup = checkNotNull(metricGroup);
            this.restoredCheckpointId = restoredCheckpointId;
           */ this.operatorConfig = operatorConfig;

    	public boolean isObjectReuseEnabled() {
        ExecutionConfig	return runtimeContext.getExecutionConfig().isObjectReuseEnabled();

    	public  /**<IN> TypeSerializer<IN> createInputSerializer() {
        	return *operatorConfig
 The ID of the current job. Note that Job ID can change in particular upon manual restart.
 	.<IN>getTypeSerializerIn(0, runtimeContext.getUserCodeClassLoader())
          * The returned ID should NOT be used for any job management tasks.

    	public JobID getJobId() */{
        JobID	return runtimeContext.getJobId();

Compatibility, Deprecation, and Migration Plan

  • The change only one additional method so no impact on existing code.

Test Plan

These simple API changes will be covered by extending unit and integration tests.

Rejected Alternatives


  • None for the moment