Discussion threadhttps://lists.apache.org/thread/t4or2q5z9g0vkpppp454llnybhnz3nzh
Vote thread
JIRA

Unable to render Jira issues macro, execution error.

Release1.11

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, expressions in the Table API can be defined in two ways. Either via the implicit Scala DSL or via custom strings:

// Java API
Table revenue = orders
  .filter("cCountry === 'FRANCE'")
  .groupBy("cID, cName")
  .select("cID, cName, revenue.sum AS revSum");

// Scala API
val revenue = orders
  .filter('cCountry === "FRANCE")
  .groupBy('cID, 'cName)
  .select('cID, 'cName, 'revenue.sum AS 'revSum)


The Java version of the Table API was always treated as a third class citizen. The custom strings are parsed by an ExpressionParser implemented using Scala parser combinators.

In the past the string-based expression parser was/is:

  • multiple times out of sync with the Scala DSL
  • buggy because it was not properly tested
  • it blows up the API as every method must accept both representations like select(String) and select(Expression)
  • confusing for users because it looks like SQL but is actually only SQL-like
  • does not provide JavaDocs or further help in case of parse errors within the IDE, a user can only consult the online documentation
  • negatively impacts the development time due to missing auto completion, highlighting, error-checking
  • does not give an easy way to migrate to the new type system (cast(f0 as DECIMAL) vs cast(f0 as DECIMAL(10,3))

A new built-in function always needed to be added at different positions which made contributions overly complicated and difficult to be reviewed.

The expression parser would need to be ported to Java according to the new module design that clearly separates API from table planner and runtime (FLIP-32).

And a very important fact: Scala symbols are deprecated and might be removed soon!
https://contributors.scala-lang.org/t/proposal-to-deprecate-and-remove-symbol-literals/2953

Public Interfaces

  • org.apache.flink.table.api.Expressions

  • org.apache.flink.table.api.scala package object

  • org.apache.flink.table.expressions.ApiExpression

  • org.apache.flink.table.expressions.ExpressionOperations

Proposed Changes

Instead of using plain strings in the future, we suggest to add a full programmatic Java DSL.

The Scala implicit DSL will just expose and extend the Java DSL. For Scala and Java DSL this means that many functions only need to be defined once. A new function for the API need just be added in one class and is properly documented for both Scala and Java users with consistent functionality.

Also Scala users can use this new DSL and prepare for the removal of symbols.

We suggest the following structure:

  • `o.a.f.t.api.Expressions` contains static entry points. All methods there return `o.a.f.t.expressions.ApiExpression`.
  • `ApiExpression` lists all following operations that are currently listed in `o.a.f.t.api.ImplicitExpressionOperations`
  • Java DSL accept raw java.lang.Object as paremeters and return ApiExpression
  • Scala DSL accept Expressions as parameters and return Expression, non-expression objects are accepted via implicit conversions (as it is now)
  • we suggest introducing a scala string interpolation for creating a column reference via $"..." The reason being creating symbols via ' is deprecated. Moreover currently there is no way to reference columns with a space in the name

Example

import static org.apache.flink.table.api.Expressions.*;

// Java API

Table revenue = orders

  .filter($("cCountry").isEqual("FRANCE"))

  .groupBy($("cID"), $("cName"))

  .select($("cID"), $("cName"), $("revenue').sum().as("revSum"));

// Scala API

val revenue = orders

  .filter($"cCountry" === "FRANCE")

  .groupBy($"cID", $"cName")

  .select($"cID", $"cName", $"revenue".sum as "revSum")

Details

We hope using raw java objects in the table API will make it more user friendly. The API will accept both expressions and objects such as int, long, String etc which will be converted to valueLiterals:

  • when expession is used in an API → it's an identity transformation
  • when anything other than expression is used, it will be converted to valueLiteral


Common suffix operations

class public abstract class BaseExpressions<InT, OutT>

public abstract class org.apache.flink.table.api.internal.BaseExpressions<InT, OutT> {
  public OutT as(java.lang.String, java.lang.String...);
  public OutT and(InT);
  public OutT or(InT);
  public OutT isGreater(InT);
  public OutT isGreaterOrEqual(InT);
  public OutT isLess(InT);
  public OutT isLessOrEqual(InT);
  public OutT isEqual(InT);
  public OutT isNotEqual(InT);
  public OutT plus(InT);
  public OutT minus(InT);
  public OutT dividedBy(InT);
  public OutT multipliedBy(InT);
  public OutT between(InT, InT);
  public OutT notBetween(InT, InT);
  public OutT isNull();
  public OutT isNotNull();
  public OutT isTrue();
  public OutT isFalse();
  public OutT isNotTrue();
  public OutT isNotFalse();
  public OutT distinct();
  public OutT sum();
  public OutT sum0();
  public OutT min();
  public OutT max();
  public OutT count();
  public OutT avg();
  public OutT stddevPop();
  public OutT stddevSamp();
  public OutT varPop();
  public OutT varSamp();
  public OutT collect();
  public OutT cast(org.apache.flink.table.types.DataType);
  public OutT cast(org.apache.flink.api.common.typeinfo.TypeInformation<?>);
  public OutT asc();
  public OutT desc();
  public final OutT in(InT...);
  public OutT in(org.apache.flink.table.api.Table);
  public OutT start();
  public OutT end();
  public OutT mod(InT);
  public OutT exp();
  public OutT log10();
  public OutT log2();
  public OutT ln();
  public OutT log();
  public OutT log(InT);
  public OutT power(InT);
  public OutT cosh();
  public OutT sqrt();
  public OutT abs();
  public OutT floor();
  public OutT sinh();
  public OutT ceil();
  public OutT sin();
  public OutT cos();
  public OutT tan();
  public OutT cot();
  public OutT asin();
  public OutT acos();
  public OutT atan();
  public OutT tanh();
  public OutT degrees();
  public OutT radians();
  public OutT sign();
  public OutT round(InT);
  public OutT bin();
  public OutT hex();
  public OutT truncate(InT);
  public OutT truncate();
  public OutT substring(InT, InT);
  public OutT substring(InT);
  public OutT trimLeading();
  public OutT trimLeading(InT);
  public OutT trimTrailing();
  public OutT trimTrailing(InT);
  public OutT trim();
  public OutT trim(InT);
  public OutT replace(InT, InT);
  public OutT charLength();
  public OutT upperCase();
  public OutT lowerCase();
  public OutT initCap();
  public OutT like(InT);
  public OutT similar(InT);
  public OutT position(InT);
  public OutT lpad(InT, InT);
  public OutT rpad(InT, InT);
  public OutT over(InT);
  public OutT overlay(InT, InT);
  public OutT overlay(InT, InT, InT);
  public OutT regexpReplace(InT, InT);
  public OutT regexpExtract(InT, InT);
  public OutT regexpExtract(InT);
  public OutT fromBase64();
  public OutT toBase64();
  public OutT ltrim();
  public OutT rtrim();
  public OutT repeat(InT);
  public OutT toDate();
  public OutT toTime();
  public OutT toTimestamp();
  public OutT extract(org.apache.flink.table.expressions.TimeIntervalUnit);
  public OutT floor(org.apache.flink.table.expressions.TimeIntervalUnit);
  public OutT ceil(org.apache.flink.table.expressions.TimeIntervalUnit);
  public OutT get(java.lang.String);
  public OutT get(int);
  public OutT flatten();
  public OutT at(InT);
  public OutT cardinality();
  public OutT element();
  public OutT rowtime();
  public OutT proctime();
  public OutT md5();
  public OutT sha1();
  public OutT sha224();
  public OutT sha256();
  public OutT sha384();
  public OutT sha512();
  public OutT sha2(InT);
}

Java DSL:

class ApiExpression will extend from BaseExpressions. This way we will expose the suffix operators to the Java DSL:

public final class Expressions {

	....

	/**
	 * Java API class that gives access to expressions operations.
	 */
	public static final class ApiExpression extends BaseExpressions<Object, ApiExpression> implements Expression {
		....
	}
}


We introduce a new main API class for Java users. We will recommend a star import of:

import static org.apache.flink.table.api.Expressions.*;

Static entry points:

All methods are static and return "ApiExpression".

The following table list new method names. We aim to reduce the name of the most frequently used methods ($() for references, and lit() for literals).


public final class org.apache.flink.table.api.Expressions {
  public static org.apache.flink.table.api.Expressions$ApiExpression $(java.lang.String);
  public static org.apache.flink.table.api.Expressions$ApiExpression lit(java.lang.Object);
  public static org.apache.flink.table.api.Expressions$ApiExpression lit(java.lang.Object, org.apache.flink.table.types.DataType); //for value with explicit type
  public static org.apache.flink.table.api.Expressions$ApiExpression range(java.lang.String, java.lang.String); // for column operations instead of `to`
  public static org.apache.flink.table.api.Expressions$ApiExpression range(int, int); // for column operations instead of `to`
  public static org.apache.flink.table.api.Expressions$ApiExpression and(java.lang.Object, java.lang.Object);
  public static org.apache.flink.table.api.Expressions$ApiExpression or(java.lang.Object, java.lang.Object);
  public static org.apache.flink.table.api.Expressions$ApiExpression currentDate();
  public static org.apache.flink.table.api.Expressions$ApiExpression currentTime();
  public static org.apache.flink.table.api.Expressions$ApiExpression currentTimestamp();
  public static org.apache.flink.table.api.Expressions$ApiExpression localTime();
  public static org.apache.flink.table.api.Expressions$ApiExpression localTimestamp();
  public static org.apache.flink.table.api.Expressions$ApiExpression temporalOverlaps(java.lang.Object, java.lang.Object, java.lang.Object, java.lang.Object);
  public static org.apache.flink.table.api.Expressions$ApiExpression dateFormat(java.lang.Object, java.lang.Object);
  public static org.apache.flink.table.api.Expressions$ApiExpression timestampDiff(org.apache.flink.table.expressions.TimePointUnit, java.lang.Object, java.lang.Object);
  public static org.apache.flink.table.api.Expressions$ApiExpression array(java.lang.Object, java.lang.Object...);
  public static org.apache.flink.table.api.Expressions$ApiExpression row(java.lang.Object, java.lang.Object...);
  public static org.apache.flink.table.api.Expressions$ApiExpression map(java.lang.Object, java.lang.Object, java.lang.Object...); // ctor for a value of MAP type
  public static org.apache.flink.table.api.Expressions$ApiExpression rowInterval(java.lang.Long);
  public static org.apache.flink.table.api.Expressions$ApiExpression interval(java.time.Duration);
  public static org.apache.flink.table.api.Expressions$ApiExpression interval(java.time.Period);
  public static org.apache.flink.table.api.Expressions$ApiExpression interval(long, org.apache.flink.table.api.DataTypes$Resolution); for representing row intervals or time intervals following the SQL syntax like `12 23:00:00.999999999' DAY(2) TO SECOND(9)` 
  public static org.apache.flink.table.api.Expressions$ApiExpression interval(java.lang.String, org.apache.flink.table.api.DataTypes$Resolution, org.apache.flink.table.api.DataTypes$Resolution); for representing row intervals or time intervals following the SQL syntax like `12 23:00:00.999999999' DAY(2) TO SECOND(9)` 
  public static org.apache.flink.table.api.Expressions$ApiExpression pi();
  public static org.apache.flink.table.api.Expressions$ApiExpression e();
  public static org.apache.flink.table.api.Expressions$ApiExpression rand();
  public static org.apache.flink.table.api.Expressions$ApiExpression rand(java.lang.Object);
  public static org.apache.flink.table.api.Expressions$ApiExpression randInteger(java.lang.Object);
  public static org.apache.flink.table.api.Expressions$ApiExpression randInteger(java.lang.Object, java.lang.Object);
  public static org.apache.flink.table.api.Expressions$ApiExpression concat(java.lang.Object, java.lang.Object...);
  public static org.apache.flink.table.api.Expressions$ApiExpression atan2(java.lang.Object, java.lang.Object);
  public static org.apache.flink.table.api.Expressions$ApiExpression concatWs(java.lang.Object, java.lang.Object, java.lang.Object...);
  public static org.apache.flink.table.api.Expressions$ApiExpression uuid();
  public static org.apache.flink.table.api.Expressions$ApiExpression nullOf(org.apache.flink.table.types.DataType);
  public static org.apache.flink.table.api.Expressions$ApiExpression nullOf(org.apache.flink.api.common.typeinfo.TypeInformation<?>);
  public static org.apache.flink.table.api.Expressions$ApiExpression log(java.lang.Object);
  public static org.apache.flink.table.api.Expressions$ApiExpression log(java.lang.Object, java.lang.Object);
  public static org.apache.flink.table.api.Expressions$ApiExpression ifThenElse(java.lang.Object, java.lang.Object, java.lang.Object);
  public static org.apache.flink.table.api.Expressions$ApiExpression withColumns(java.lang.Object, java.lang.Object...);
  public static org.apache.flink.table.api.Expressions$ApiExpression withoutColumns(java.lang.Object, java.lang.Object...);
  public static org.apache.flink.table.api.Expressions$ApiExpression call(java.lang.String, java.lang.Object...);
  public static org.apache.flink.table.api.Expressions$ApiExpression call(org.apache.flink.table.functions.FunctionDefinition, java.lang.Object...);

  public static final org.apache.flink.table.api.Expressions$ApiExpression UNBOUNDED_ROW;
  public static final org.apache.flink.table.api.Expressions$ApiExpression UNBOUNDED_RANGE;
  public static final org.apache.flink.table.api.Expressions$ApiExpression CURRENT_ROW;
  public static final org.apache.flink.table.api.Expressions$ApiExpression CURRENT_RANGE;
}


Scala DSL

Scala's ImplicitExpressionOperations will extend ExpressionOperations. ImplicitExpressionOperations will still return Expression.

trait ImplicitExpressionOperations extends BaseExpressions[Expression, Expression] {
...
}


Static entry points:

Unfortunately there is no way to inherit static methods from java in scala. Therefore we will keep all static entry points in ImplicitExpressionConversions as they are. Moreover we will add an implicit conversion from ApiExpression to Expression so that it can be mixed (nevertheless we should make it very explicit that mixing those two apis is highly discouraged)

Scala specific methods:

Some methods have Scala-specific names and would be added to ExpressionOperations. The following table lists new method names. We aim to reduce the name of the most frequently used methods

Scala Implicit Expression

org.apache.flink.table.api.ExpressionOperations

&&

and(Expression)

||

or(Expression)

>

isGreater(Expression)

>=

isGreaterOrEqual(Expression)

<

isLess(Expression)

<=

isLessOrEqual(Expression)

===

isEqual(Expression)

!==

isNotEqual(Expression)

+

plus(Expression)

-

minus(Expression)

/

dividedBy(Expression)

*

times(Expression)

%

mod(Expression)

as(Symbol, Symbol*)

as(String, String...)

UserDefinedAggregateFunctionCall#distinct

distinct(Expression...)

to(Expression)

not supported

unary +

not supported

String interpolation for columns in Scala DSL

As explained in the motivation, scala's syntax of using ' to create symbol will become deprecated in scala 2.13. Moreover it is not possible to use spaces with that syntax. We should introduce an alternative way of creating a column reference. Using a '$' will make it consistent with java api

  implicit class FieldExpression(val sc: StringContext) extends AnyVal {
    def $(args: Any*): Expression = unresolvedRef(sc.s(args: _*))
  }


Compatibility, Deprecation, and Migration Plan

Many methods in Table API that take a string will be deprecated.

Deprecations:

  • discourage usage of ' in scala API for creating  column references
  • deprecate concat_ws in favour of concatWs

Implementation Plan

  1. Implement the API for Java DSL (Majority of the implementation can be seen here: https://github.com/dawidwys/flink/tree/javaExpressionDsl2 )
  2. Add automatic consistency checks
    For keeping the Scala package object and Java Expressions API in sync
  3. Update existing tests
    For verifying the implementation and add a parser test for ensuring backwards compatibility
  4. Deprecate old String-based API
  5. Update documentation
  6. Implement API for Python
    At the beginning, we could think about simplifying the Python API by only supporting the core expression mentioned above and access the remaining expressions via the generic `call()` function.

Test Plan

Existing tests are updated to verify feature 

Rejected Alternatives

Having a DSL like the one mentioned above is the only way in the Java programming language to fix the issues mentioned in the motivation section.