...
- We want to extend the
Sink#InitContext
to provide a ExecutionConfig TypeSerializer, ObjectReuse and JobID.
Proposed Changes
...
In this option we only expose the ExecutionConfig and the JobID.
Code Block |
---|
language | java |
---|
title | Sink.InitContext |
---|
|
@PublicEvolving
public interface Sink<InputT> extends Serializable {
... current methods
@PublicEvolving
public interface InitContext {
... current methods
/**
* Returns the {@link ExecutionConfig} for the currently executing job.
*/
ExecutionConfig getExecutionConfig
boolean isObjectReuseEnabled();
/**
* The ID of the current job. Note that Job ID can change in particular upon manual restart.
* The returned ID should NOT be used for any job management tasks.
*/
JobID
<IN> TypeSerializer<IN> createInputSerializer();
<IN> TypeSerializer<IN> createSerializer(TypeInformation<IN> inType);
JobID getJobId();
}
} |
Option 2: Expose ReadableExecutionConfig on InitContext
...
With this option we have to change all the implementations of TypeInformation that exists (current exists 72 implementations)
Code Block |
---|
language | java |
---|
title | ReadableExecutionConfig | SinkWriterOperator.InitContextImpl |
---|
|
private static class InitContextImpl implements Sink.InitContext {
... current methods
private final StreamConfig operatorConfig;
public InitContextImpl(@PublicEvolving
public interface ReadableExecutionConfig extends Serializable {
/** Returns whether object reuse has been enabled or disabled.*/
boolean isObjectReuseEnabled();
/** Another is/get methods needed, we should add all is/get methods from ExecutionConfig **/
...
} |
Code Block |
---|
language | java |
---|
title | ExecutionConfig |
---|
|
@Public
public class ExecutionConfig implements ReadableExecutionConfig, Archiveable<ArchivedExecutionConfig> {
/** Nothing to be done as methods should be the same (maybe put @Override on all), its only needed to implements the new interface **/
} |
Code Block |
---|
language | java |
---|
title | TypeInformation |
---|
|
@Public
public abstract class TypeInformation<T> implements Serializable {
/**
* Creates a serializer for the type.
* The serializer may use the ReadableExecutionConfig for parameterization.
*
* @param config The config used to parameterize the serializer.
* @return A serializer for this type.
*/
@PublicEvolving
public abstract TypeSerializer<T> createSerializer(ReadableExecutionConfig config);
} |
Code Block |
---|
language | java |
---|
title | InitContext |
---|
|
@PublicEvolving
public interface Sink<InputT> extends Serializable {
... current methods
@PublicEvolving
public interface InitContext {
... current methods
/**
* Returns the {@link ReadableExecutionConfig} for the currently executing job.
*/
ReadableExecutionConfig getReadableExecutionConfig();
/**
* The ID of the current job. Note that Job ID can change in particular upon manual restart.
* The returned ID should NOT be used for any job management tasks.
*/
JobID getJobId();
StreamingRuntimeContext runtimeContext,
ProcessingTimeService processingTimeService,
MailboxExecutor mailboxExecutor,
SinkWriterMetricGroup metricGroup,
StreamConfig operatorConfig,
@Nullable Long restoredCheckpointId) {
this.runtimeContext = checkNotNull(runtimeContext);
this.mailboxExecutor = checkNotNull(mailboxExecutor);
this.processingTimeService = checkNotNull(processingTimeService);
this.metricGroup = checkNotNull(metricGroup);
this.restoredCheckpointId = restoredCheckpointId;
this.operatorConfig = operatorConfig;
}
@Override
public boolean isObjectReuseEnabled() {
return runtimeContext.getExecutionConfig().isObjectReuseEnabled();
}
@Override
public <IN> TypeSerializer<IN> createInputSerializer() {
return operatorConfig
.<IN>getTypeSerializerIn(0, runtimeContext.getUserCodeClassLoader())
.duplicate();
}
@Override
public <IN> TypeSerializer<IN> createSerializer(TypeInformation<IN> inType) {
return inType.createSerializer(runtimeContext.getExecutionConfig());
}
@Override
public JobID getJobId() {
return runtimeContext.getJobId();
}
} |
Compatibility, Deprecation, and Migration Plan
...