Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • the JdbcOutputFormat which today uses RuntimeContext to get the ExecutionConfig#isObjectReuseEnabled. 
  • the SemanticXidGenerator (for Xa-Sink) needs JobID to create the Xid.
  • the use of TypeInformation.createSerializer.


This would be necessary to another connectors as well, for example KafkaTableSink uses isObjectReuseEnabled too.

Public Interfaces

  • We want to extend the Sink#InitContext to provide the ExecutionConfig and  ReadableExecutionConfig and JobID.

Proposed Changes

We propose to expose the ExecutionConfig, so that all configs on this will be available, but we could only expose the isObjectReuse if we want to expose only the required configuration.create a new interface ReadableExecutionConfig, making ExecutionConfig extending this interface, and adding a new method to TypeInformation 

Code Block
languagejava
titleInitContextReadableExecutionConfig
@PublicEvolving
public interface Sink<InputT>ReadableExecutionConfig extends Serializable {

    /** Returns whether object reuse has been enabled or disabled.*/
    boolean isObjectReuseEnabled();

	/** Another is/get methods needed, we should add all is/get methods from ExecutionConfig **/
    ...
}


Code Block
languagejava
titleExecutionConfig
@Public
public class ExecutionConfig implements ReadableExecutionConfig, Archiveable<ArchivedExecutionConfig> {

	/** Nothing to be done as methods should be the same (maybe put @Override on all), its only needed to implements the new interface
}


Code Block
languagejava
titleTypeInformation
@Public
public abstract class TypeInformation<T> implements Serializable {
   /**
     * Creates a serializer for the type. 
	 * The serializer may use the ReadableExecutionConfig for parameterization.
     *
     * @param config The config used to parameterize the serializer.
     * @return A serializer for this type... current methods
 
    @PublicEvolving
    public interface InitContext {
        ... current methods

        /**           
         * Returns the {@link org.apache.flink.api.common.ExecutionConfig} for the currently executing job.
         */
        ExecutionConfig getExecutionConfig();

        /**
         * The ID of the current job. Note that Job ID can change in particular upon manual restart.
         * The returned ID should NOT be used for any job management tasks.
     */
    */@PublicEvolving
    public abstract TypeSerializer<T>  JobID getJobId();     
	}
}  createSerializer(ReadableExecutionConfig config);
}

Compatibility, Deprecation, and Migration Plan

...

These simple API changes will be covered by extending unit and integration tests.

Rejected Alternatives

...

  • Expose ExecutionConfig was reject to avoid modifications
  • Extending ExecutionConfig was reject to avoid modifications