Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Page properties

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

here (<- link to list.html?dev@flink.apache.org)here (<- link to issuesjira/browse/FLINK-XXXX)
Discussion threadhereVote threadhttps://lists.apache.org/thread/wb3myhqsdz81h08ygwx057mkn1hc3s8f
Vote threadJIRAhttps://lists.apache.org/thread/yckr5c3vbzpm1563k1nkrr4x9f9rxc7x
JIRA

Jira
serverASF JIRA
columnIdsissuekey,summary,issuetype,created,updated,duedate,assignee,reporter,priority,status,resolution
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-32376

Release1.18.0ReleaseTBD


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The goal of this FLIP is to add additional fields to Sink#InitContext to expose ExecutionConfig (or at least the TypeSerializer from input, ExecutionConfig#isObjectReuseEnabled ) and JobID.

This is necessary to migrate the current JdbcSink to the SinkV2 (FLIP-239 -> FLINK-25421) more specifically in order to be able to reuse the current code:

...

This would be necessary to another connectors as well.

Public Interfaces

  • We want to extend the Sink#InitContext to provide and  ReadableExecutionConfiga TypeSerializer, ObjectReuse and JobID.

Proposed Changes

...


Code Block
languagejava
titleReadableExecutionConfigSink.InitContext
@PublicEvolving
public interface ReadableExecutionConfigSink<InputT> extends Serializable {
    ... current methods
  
    @PublicEvolving
    public interface InitContext {
        ... current methods /**  Returns  whether  object  reuse  has

 been enabled or disabled.*/
    boolean isObjectReuseEnabled();

	/** Another is/get methods needed, we should add all is/get methods from ExecutionConfig **/
    ...        <IN> TypeSerializer<IN> createInputSerializer();   
      
        JobID getJobId();    
    }
}


Code Block
languagejava
titleExecutionConfig
@Public
public class ExecutionConfig implements ReadableExecutionConfig, Archiveable<ArchivedExecutionConfig> {

	/** Nothing to be done as methods should be the same (maybe put @Override on all), its only needed to implements the new interface **/
}
Code Block
languagejava
titleTypeInformation
@Public
public abstract class TypeInformation<T> implements Serializable {
   /**
     * Creates a serializer for the type. 
	 * The serializer may use the ReadableExecutionConfig for parameterization.
     *
     * @param config The config used to parameterize the serializer.
     * @return A serializer for this type.
     */
    @PublicEvolving
    public abstract TypeSerializer<T> createSerializer(ReadableExecutionConfig config);
}
Code Block
languagejava
titleInitContext
@PublicEvolving
public interface Sink<InputT> extends Serializable {
    ... current methods
  
    @PublicEvolving
    public interface InitContext {
        ... current methods
 
        /**          
         * Returns the {@link ReadableExecutionConfig} for the currently executing job.
         */
        ReadableExecutionConfig getReadableExecutionConfig();
 
        /**
         * The ID of the current job. Note that Job ID can change in particular upon manual restart.
         * The returned ID should NOT be used for any job management tasks.
         */
        JobID getJobId();    
    }
}

  

SinkWriterOperator.InitContextImpl
class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<CommittableMessage<CommT>>
        implements OneInputStreamOperator<InputT, CommittableMessage<CommT>>, BoundedOneInput {


	private static class InitContextImpl implements Sink.InitContext {
    	... current methods
  
    	private final StreamConfig operatorConfig;

    	public InitContextImpl(
        	        StreamingRuntimeContext runtimeContext,
        	        ProcessingTimeService processingTimeService,
                	MailboxExecutor mailboxExecutor,
                	SinkWriterMetricGroup metricGroup,
                	StreamConfig operatorConfig,
                	@Nullable Long restoredCheckpointId) {
            this.runtimeContext = checkNotNull(runtimeContext);
            this.mailboxExecutor = checkNotNull(mailboxExecutor);
            this.processingTimeService = checkNotNull(processingTimeService);
            this.metricGroup = checkNotNull(metricGroup);
            this.restoredCheckpointId = restoredCheckpointId;
            this.operatorConfig = operatorConfig;
		}

		@Override
    	public boolean isObjectReuseEnabled() {
        	return runtimeContext.getExecutionConfig().isObjectReuseEnabled();
   	 	}

    	@Override
    	public <IN> TypeSerializer<IN> createInputSerializer() {
        	return operatorConfig
                	.<IN>getTypeSerializerIn(0, runtimeContext.getUserCodeClassLoader())
                	.duplicate();
    	}

    	@Override
    	public JobID getJobId() {
        	return runtimeContext.getJobId();
    	}
	}
}

Compatibility, Deprecation, and Migration Plan

  • The change only one additional method so no impact on existing code.

Test Plan

These simple API changes will be covered by extending unit and integration tests.

Rejected Alternatives

  • Expose ExecutionConfig was reject to avoid modifications
  • Extending ExecutionConfig was reject to avoid modificationsNone for the moment