...
Page properties |
---|
Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected". Discussion thread | here | Vote thread | here (<- link to https://lists.apache.org/ | list.html?dev@flink.apachethread/wb3myhqsdz81h08ygwx057mkn1hc3s8f |
---|
Vote thread | .org)JIRA | here (<- link to https:// | issueslists.apache.org/thread/ | jira/browse/FLINK-XXXX)yckr5c3vbzpm1563k1nkrr4x9f9rxc7x |
---|
JIRA | Jira |
---|
server | ASF JIRA |
---|
columnIds | issuekey,summary,issuetype,created,updated,duedate,assignee,reporter,priority,status,resolution |
---|
columns | key,summary,type,created,updated,due,assignee,reporter,priority,status,resolution |
---|
serverId | 5aa69414-a9e9-3523-82ec-879b028fb15b |
---|
key | FLINK-32376 |
---|
|
|
---|
Release | 1.18.0 | Release | TBD |
---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The goal of this FLIP is to add additional fields to Sink#InitContext
to expose ExecutionConfig (or at least the TypeSerializer from input, ExecutionConfig#isObjectReuseEnabled ) and JobID.
This is necessary to migrate the current JdbcSink to the SinkV2 (FLIP-239 -> FLINK-25421) more specifically in order to be able to reuse the current code:
...
This would be necessary to another connectors as well.
Public Interfaces
- We want to extend the
Sink#InitContext
to provide and ReadableExecutionConfiga TypeSerializer, ObjectReuse and JobID.
Proposed Changes
Option 1: Expose ExecutionConfig directly on InitContext
In this option we only expose the ExecutionConfig and the JobID.
Code Block |
---|
language | java |
---|
title | Sink.InitContext |
---|
|
@PublicEvolving
public interface Sink<InputT> extends Serializable {
... current methods
@PublicEvolving
public interface InitContext {
... current methods
/**
* Returns the {@link ExecutionConfig} for the currently executing job.
*/
ExecutionConfig getExecutionConfig
boolean isObjectReuseEnabled();
/**
* The ID of the current job. Note that<IN> Job ID can change in particular upon manual restart.
* The returned ID should NOT be used for any job management tasks.
*/
JobID TypeSerializer<IN> createInputSerializer();
JobID getJobId();
}
} |
Option 2: Expose ReadableExecutionConfig on InitContext
...
With this option we have to change all the implementations of TypeInformation that exists (current exists 72 implementations)
Code Block |
---|
language | java |
---|
title | ReadableExecutionConfig | SinkWriterOperator.InitContextImpl |
---|
|
class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<CommittableMessage<CommT>>
implements OneInputStreamOperator<InputT, CommittableMessage<CommT>>, BoundedOneInput@PublicEvolving
public interface ReadableExecutionConfig extends Serializable {
private static class InitContextImpl /** Returns whether object reuse has been enabled or disabled.*/
boolean isObjectReuseEnabled();
/** Another is/get methods needed, we should add all is/get methods from ExecutionConfig **/
...
} |
Code Block |
---|
language | java |
---|
title | ExecutionConfig |
---|
|
@Public
public class ExecutionConfig implements ReadableExecutionConfig, Archiveable<ArchivedExecutionConfig> {
/** Nothing to be done as methods should be the same (maybe put @Override on all), its only needed to implements the new interface **/
} |
Code Block |
---|
language | java |
---|
title | TypeInformation |
---|
|
@Public
public abstract class TypeInformation<T> implements Serializable {
/**
* Creates a serializer for the type.
* The serializer may use the ReadableExecutionConfig for parameterization.
*
* @param config The config used to parameterize the serializer.
* @return A serializer for this type.
*/
@PublicEvolving
public abstract TypeSerializer<T> createSerializer(ReadableExecutionConfig config);
} |
Code Block |
---|
language | java |
---|
title | InitContext |
---|
|
@PublicEvolving
public interface Sink<InputT> extends Serializable {
... current methods
@PublicEvolving
public interface InitContext {
... current methods
/**
* Returns the {@link ReadableExecutionConfig} for the currently executing job.
*/
ReadableExecutionConfig getReadableExecutionConfig();
/**
* The ID of the current job. Note that Job ID can change in particular upon manual restart.
* The returned ID should NOT be used for any job management tasks.
*/
JobID getJobId();
implements Sink.InitContext {
... current methods
private final StreamConfig operatorConfig;
public InitContextImpl(
StreamingRuntimeContext runtimeContext,
ProcessingTimeService processingTimeService,
MailboxExecutor mailboxExecutor,
SinkWriterMetricGroup metricGroup,
StreamConfig operatorConfig,
@Nullable Long restoredCheckpointId) {
this.runtimeContext = checkNotNull(runtimeContext);
this.mailboxExecutor = checkNotNull(mailboxExecutor);
this.processingTimeService = checkNotNull(processingTimeService);
this.metricGroup = checkNotNull(metricGroup);
this.restoredCheckpointId = restoredCheckpointId;
this.operatorConfig = operatorConfig;
}
@Override
public boolean isObjectReuseEnabled() {
return runtimeContext.getExecutionConfig().isObjectReuseEnabled();
}
@Override
public <IN> TypeSerializer<IN> createInputSerializer() {
return operatorConfig
.<IN>getTypeSerializerIn(0, runtimeContext.getUserCodeClassLoader())
.duplicate();
}
@Override
public JobID getJobId() {
return runtimeContext.getJobId();
}
}
} |
Compatibility, Deprecation, and Migration Plan
- The change only one additional method so no impact on existing code.
Test Plan
These simple API changes will be covered by extending unit and integration tests.
Rejected Alternatives