Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • We want to extend the Sink#InitContext to provide a ExecutionConfig TypeSerializer, ObjectReuse and JobID.

Proposed Changes

...

In this option we only expose the ExecutionConfig and the JobID.


Code Block
languagejava
titleSink.InitContext
@PublicEvolving
public interface Sink<InputT> extends Serializable {
    ... current methods
  
    @PublicEvolving
    public interface InitContext {
        ... current methods
  
        /**          
         * Returns the {@link ExecutionConfig} for the currently executing job.
         */
        ExecutionConfig getExecutionConfig         

        boolean isObjectReuseEnabled();
 
        /**
         * The ID of the current job. Note that Job ID can change in particular upon manual restart.
         * The returned ID should NOT be used for any job management tasks.
         */
        JobID
        <IN> TypeSerializer<IN> createInputSerializer();

        <IN> TypeSerializer<IN> createSerializer(TypeInformation<IN> inType);          
      
        JobID getJobId();    
    }
}

Option 2: Expose ReadableExecutionConfig on InitContext

...

With this option we have to change all the implementations of TypeInformation that exists (current exists 72 implementations)


Code Block
languagejava
titleReadableExecutionConfigSinkWriterOperator.InitContextImpl
private static class InitContextImpl implements Sink.InitContext {
    ... current methods
  
    private final StreamConfig operatorConfig;

    public InitContextImpl(@PublicEvolving
public interface ReadableExecutionConfig extends Serializable {

    /** Returns whether object reuse has been enabled or disabled.*/
    boolean isObjectReuseEnabled();

	/** Another is/get methods needed, we should add all is/get methods from ExecutionConfig **/
    ...
}
Code Block
languagejava
titleExecutionConfig
@Public
public class ExecutionConfig implements ReadableExecutionConfig, Archiveable<ArchivedExecutionConfig> {

	/** Nothing to be done as methods should be the same (maybe put @Override on all), its only needed to implements the new interface **/
}
Code Block
languagejava
titleTypeInformation
@Public
public abstract class TypeInformation<T> implements Serializable {
   /**
     * Creates a serializer for the type. 
	 * The serializer may use the ReadableExecutionConfig for parameterization.
     *
     * @param config The config used to parameterize the serializer.
     * @return A serializer for this type.
     */
    @PublicEvolving
    public abstract TypeSerializer<T> createSerializer(ReadableExecutionConfig config);
}
Code Block
languagejava
titleInitContext
@PublicEvolving
public interface Sink<InputT> extends Serializable {
    ... current methods
  
    @PublicEvolving
    public interface InitContext {
        ... current methods
 
        /**          
         * Returns the {@link ReadableExecutionConfig} for the currently executing job.
         */
        ReadableExecutionConfig getReadableExecutionConfig();
 
        /**
         * The ID of the current job. Note that Job ID can change in particular upon manual restart.
         * The returned ID should NOT be used for any job management tasks.
         */
        JobID getJobId();    
                StreamingRuntimeContext runtimeContext,
                ProcessingTimeService processingTimeService,
                MailboxExecutor mailboxExecutor,
                SinkWriterMetricGroup metricGroup,
                StreamConfig operatorConfig,
                @Nullable Long restoredCheckpointId) {
            this.runtimeContext = checkNotNull(runtimeContext);
            this.mailboxExecutor = checkNotNull(mailboxExecutor);
            this.processingTimeService = checkNotNull(processingTimeService);
            this.metricGroup = checkNotNull(metricGroup);
            this.restoredCheckpointId = restoredCheckpointId;
            this.operatorConfig = operatorConfig;
	}

	@Override
    public boolean isObjectReuseEnabled() {
        return runtimeContext.getExecutionConfig().isObjectReuseEnabled();
    }

    @Override
    public <IN> TypeSerializer<IN> createInputSerializer() {
        return operatorConfig
                .<IN>getTypeSerializerIn(0, runtimeContext.getUserCodeClassLoader())
                .duplicate();
    }

    @Override
    public <IN> TypeSerializer<IN> createSerializer(TypeInformation<IN> inType) {
        return inType.createSerializer(runtimeContext.getExecutionConfig());
    }

    @Override
    public JobID getJobId() {
        return runtimeContext.getJobId();
    }
}

Compatibility, Deprecation, and Migration Plan

...