Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Of course, users can choose some of the items to configure according to their needs.

Public Interface

In order to implement the above features, it is expected to add a wrapper class for the watermark parameters, like this:

Code Block
languagejava
public class WatermarkParams implements Serializable {
    private static final long serialVersionUID = 1L;

    private WatermarkEmitStrategy emitStrategy;
    private int emitOnEventGap;
    private String alignGroupName;
    private Duration alignMaxDrift;
    private Duration alignUpdateInterval;
    private long sourceIdleTimeout;
}

To describe the watermark launch strategy, we need to add an enumeration class:

Code Block
languagejava
public enum WatermarkEmitStrategy {
    ON_EVENT("on-event"),
    ON_PERIODIC("on-periodic")
}

The biggest change probably is that we need to modify the constructor of WatermarkPushDownSpec to use the params wrapper class:

Code Block
languagejava
@JsonCreator
public WatermarkPushDownSpec(
    @JsonProperty(FIELD_NAME_WATERMARK_EXPR) RexNode watermarkExpr,
    @JsonProperty(FIELD_NAME_IDLE_TIMEOUT_MILLIS) long idleTimeoutMillis,
    @JsonProperty(FIELD_NAME_PRODUCED_TYPE) RowType producedType,
    @JsonProperty(FIELD_NAME_WATERMARK_PARAMS) WatermarkParams watermarkParams) {
        super(producedType);
        this.watermarkExpr = checkNotNull(watermarkExpr);
        this.idleTimeoutMillis = idleTimeoutMillis;
        this.watermarkParams = watermarkParams;
}

Same as GeneratedWatermarkGeneratorSupplier :

Code Block
languagejava
public GeneratedWatermarkGeneratorSupplier(
    GeneratedWatermarkGenerator generatedWatermarkGenerator,
    WatermarkParams watermarkParams) {
        this.generatedWatermarkGenerator = generatedWatermarkGenerator;
        this.watermarkParams = watermarkParams;
}


Compiled Plan

After implementing the above features, The following contents will be added to the WatermarkPushDown module in the json plan:

Code Block
"watermarkParams":{
    "emitStrategy" : "ON_PERIODIC",
    "emitOnEventGap" : 1,
    "alignGroupName" : null,
    "alignMaxDrift" : "PT0S",
    "alignUpdateInterval" : "PT1S",
    "sourceIdleTimeout" : -1
}

But this doesn't cause compatibility issues, CompilePlan and ExecutePlan are backward compatible. We will add unit tests to validate the compatibility.

Migration Plan and Compatibility

...