...
Of course, users can choose some of the items to configure according to their needs.
Public Interface
In order to implement the above features, it is expected to add a wrapper class for the watermark parameters, like this:
Code Block | ||
---|---|---|
| ||
public class WatermarkParams implements Serializable {
private static final long serialVersionUID = 1L;
private WatermarkEmitStrategy emitStrategy;
private int emitOnEventGap;
private String alignGroupName;
private Duration alignMaxDrift;
private Duration alignUpdateInterval;
private long sourceIdleTimeout;
} |
To describe the watermark launch strategy, we need to add an enumeration class:
Code Block | ||
---|---|---|
| ||
public enum WatermarkEmitStrategy {
ON_EVENT("on-event"),
ON_PERIODIC("on-periodic")
} |
The biggest change probably is that we need to modify the constructor of WatermarkPushDownSpec
to use the params wrapper class:
Code Block | ||
---|---|---|
| ||
@JsonCreator
public WatermarkPushDownSpec(
@JsonProperty(FIELD_NAME_WATERMARK_EXPR) RexNode watermarkExpr,
@JsonProperty(FIELD_NAME_IDLE_TIMEOUT_MILLIS) long idleTimeoutMillis,
@JsonProperty(FIELD_NAME_PRODUCED_TYPE) RowType producedType,
@JsonProperty(FIELD_NAME_WATERMARK_PARAMS) WatermarkParams watermarkParams) {
super(producedType);
this.watermarkExpr = checkNotNull(watermarkExpr);
this.idleTimeoutMillis = idleTimeoutMillis;
this.watermarkParams = watermarkParams;
} |
Same as GeneratedWatermarkGeneratorSupplier
:
Code Block | ||
---|---|---|
| ||
public GeneratedWatermarkGeneratorSupplier(
GeneratedWatermarkGenerator generatedWatermarkGenerator,
WatermarkParams watermarkParams) {
this.generatedWatermarkGenerator = generatedWatermarkGenerator;
this.watermarkParams = watermarkParams;
} |
Compiled Plan
After implementing the above features, The following contents will be added to the WatermarkPushDown module in the json plan:
Code Block |
---|
"watermarkParams":{
"emitStrategy" : "ON_PERIODIC",
"emitOnEventGap" : 1,
"alignGroupName" : null,
"alignMaxDrift" : "PT0S",
"alignUpdateInterval" : "PT1S",
"sourceIdleTimeout" : -1
} |
But this doesn't cause compatibility issues, CompilePlan and ExecutePlan are backward compatible. We will add unit tests to validate the compatibility.
Migration Plan and Compatibility
...