Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion threadhttps://lists.apache.org/thread/wb3myhqsdz81h08ygwx057mkn1hc3s8f
Vote threadhttps://lists.apache.org/thread/yckr5c3vbzpm1563k1nkrr4x9f9rxc7x
JIRA

Unable to render Jira issues macro, execution error.

Release1.18.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The goal of this FLIP is to add additional fields to Sink#InitContext to expose TypeSerializer from input, ExecutionConfig#isObjectReuseEnabled and JobID.

This is necessary to migrate the current JdbcSink to the SinkV2 (FLIP-239 -> FLINK-25421) more specifically in order to be able to reuse the current code:

  • the JdbcOutputFormat which today uses RuntimeContext to get the ExecutionConfig#isObjectReuseEnabled. 
  • the SemanticXidGenerator (for Xa-Sink) needs JobID to create the Xid.
  • the use of TypeInformation.createSerializer.


This would be necessary to another connectors as well.

Public Interfaces

  • We want to extend the Sink#InitContext to provide a TypeSerializer, ObjectReuse and JobID.

Proposed Changes


Sink.InitContext
@PublicEvolving
public interface Sink<InputT> extends Serializable {
    ... current methods
  
    @PublicEvolving
    public interface InitContext {
        ... current methods           

        boolean isObjectReuseEnabled();

        <IN> TypeSerializer<IN> createInputSerializer();   
      
        JobID getJobId();    
    }
}


SinkWriterOperator.InitContextImpl
class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<CommittableMessage<CommT>>
        implements OneInputStreamOperator<InputT, CommittableMessage<CommT>>, BoundedOneInput {


	private static class InitContextImpl implements Sink.InitContext {
    	... current methods
  
    	private final StreamConfig operatorConfig;

    	public InitContextImpl(
        	        StreamingRuntimeContext runtimeContext,
        	        ProcessingTimeService processingTimeService,
                	MailboxExecutor mailboxExecutor,
                	SinkWriterMetricGroup metricGroup,
                	StreamConfig operatorConfig,
                	@Nullable Long restoredCheckpointId) {
            this.runtimeContext = checkNotNull(runtimeContext);
            this.mailboxExecutor = checkNotNull(mailboxExecutor);
            this.processingTimeService = checkNotNull(processingTimeService);
            this.metricGroup = checkNotNull(metricGroup);
            this.restoredCheckpointId = restoredCheckpointId;
            this.operatorConfig = operatorConfig;
		}

		@Override
    	public boolean isObjectReuseEnabled() {
        	return runtimeContext.getExecutionConfig().isObjectReuseEnabled();
   	 	}

    	@Override
    	public <IN> TypeSerializer<IN> createInputSerializer() {
        	return operatorConfig
                	.<IN>getTypeSerializerIn(0, runtimeContext.getUserCodeClassLoader())
                	.duplicate();
    	}

    	@Override
    	public JobID getJobId() {
        	return runtimeContext.getJobId();
    	}
	}
}

Compatibility, Deprecation, and Migration Plan

  • The change only one additional method so no impact on existing code.

Test Plan

These simple API changes will be covered by extending unit and integration tests.

Rejected Alternatives

  • None for the moment