You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current stateUnder Discussion

Discussion thread: https://lists.apache.org/thread.html/eb5e7b0579e5f1da1e9bf1ab4e4b86dba737946f0261d94d8c30521e@%3Cdev.flink.apache.org%3E

JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, expressions in the Table API can be defined in two ways. Either via the implicit Scala DSL or via custom strings:

// Java API
Table revenue = orders
  .filter("cCountry === 'FRANCE'")
  .groupBy("cID, cName")
  .select("cID, cName, revenue.sum AS revSum");

// Scala API
val revenue = orders
  .filter('cCountry === "FRANCE")
  .groupBy('cID, 'cName)
  .select('cID, 'cName, 'revenue.sum AS 'revSum)


The Java version of the Table API was always treated as a third class citizen. The custom strings are parsed by an ExpressionParser implemented using Scala parser combinators.

In the past the string-based expression parser was/is:

  • multiple times out of sync with the Scala DSL
  • buggy because it was not properly tested
  • it blows up the API as every method must accept both representations like select(String) and select(Expression)
  • confusing for users because it looks like SQL but is actually only SQL-like
  • does not provide JavaDocs or further help in case of parse errors within the IDE, a user can only consult the online documentation
  • negatively impacts the development time due to missing auto completion, highlighting, error-checking

A new built-in function always needed to be added at different positions which made contributions overly complicated and difficult to be reviewed.

The expression parser would need to be ported to Java according to the new module design that clearly separates API from table planner and runtime (FLIP-32).

And a very important fact: Scala symbols are deprecated and might be removed soon!
https://contributors.scala-lang.org/t/proposal-to-deprecate-and-remove-symbol-literals/2953

Public Interfaces

  • org.apache.flink.table.api.Expressions

  • org.apache.flink.table.api Scala package object

  • org.apache.flink.table.expressions.ApiExpression

  • org.apache.flink.table.expressions.ExpressionOperations

Proposed Changes

Instead of using plain strings in the future, we suggest to add a full programmatic Java DSL.

The Scala implicit DSL will just expose and extend the Java DSL. For Scala and Java DSL this means that many functions only need to be defined once. A new function for the API need just be added in one class and is properly documented for both Scala and Java users with consistent functionality.

Also Scala users can use this new DSL and prepare for the removal of symbols.

We suggest the following structure:

  • `o.a.f.t.api.Expressions` contains static entry points. All methods there return `o.a.f.t.expressions.ApiExpression`.
  • `ApiExpression` lists all following operations that are currently listed in `o.a.f.t.api.ImplicitExpressionOperations`. All methods there return again `ApiExpressions`.

Example

import static org.apache.flink.table.api.Expressions.*;

// Java API

Table revenue = orders

  .filter($("cCountry").isEqual(lit("FRANCE")))

  .groupBy($("cID"), $("cName"))

  .select($("cID"), $("cName"), $("revenue').sum().as("revSum"));

// Scala API

val revenue = orders

  .filter('cCountry === "FRANCE")

  .groupBy('cID, 'cName)

  .select('cID, 'cName, 'revenue.sum AS 'revSum)

Details

We introduce a new main API class for Java users. We will recommend a star import of:

import static org.apache.flink.table.api.Expressions.*;

All methods are static and return "ApiExpression".

The following table list new method names. We aim to reduce the name of the most frequently used methods ($() for references, and lit() for literals).

org.apache.flink.table.api.Expressions

$(String) for references

lit(java.lang.Object) for values

lit(java.lang.Object, DataType) for value with explicit type

call(String, Expression...)

call(FunctionDefinition, Expression...)

array(Expression, Expression...)

row(Expression, Expression...)

map(Expression, Expression, Expression...)

nullOf(DataType)

rowInterval(Long), interval(int, DataTypes.Resolution), interval(String, DataTypes.Resolution, DataTypes.Resolution) for representing row intervals or time intervals following the SQL syntax like `'12 DAY(2)` or `12 23:00:00.999999999' DAY(2) TO SECOND(9)` 

ifThenElse(Expression, Expression, Expression)

withColumns(Expression, Expression...)

withoutColumns(Expression, Expression...)

range(Integer, Integer), range(String, String) for column operations instead of `to`

and(Expression, Expression...)

or(Expression, Expression...)

not(Expression)

minus(Expression)

UNBOUNDED_ROW

UNBOUNDED_RANGE

CURRENT_ROW

CURRENT_RANGE

<<End of core expressions>>

localTime()

localTimestamp()

pi()

e()

uuid()

temporalOverlaps(Expression, Expression, Expression, Expression)

timestampDiff(TimePointUnit, Expression, Expression)

rand()

rand(Expression)

randInteger(Expression)

randInteger(Expression, Expression)

concat(Expression, Expression...)

atan2(Expression, Expression)

concat_ws(Expression, Expression, Expression...)

log(Expression)

log(Expression, Expression)

The Scala package object for `org.apache.flink.table.api` will be updated to support the upper expressions as well.

This is an API class that lists all available expression operations.

public class ApiExpression extends ExpressionOperations implements Expression {

	private Expression originalExpression;

	public ApiExpression(Expression originalExpression) {
		this.originalExpression = originalExpression;
	}

	@Override
	public String asSummaryString() {
		return originalExpression.asSummaryString();
	}

	@Override
	public List<Expression> getChildren() {
		return originalExpression.getChildren();
	}

	@Override
	public <R> R accept(ExpressionVisitor<R> visitor) {
		return visitor.visit(this);
	}
}


A newly introduced rule in ExpressionResolver would take care of removing ApiExpressions from the tree as the first step. ApiExpression instances should not travel in the stack.

Contains most methods of org.apache.flink.table.api.ImplicitExpressionOperations. However, all these methods will return ApiExpression instead of Expression. The change is backwards compatible as every assignment "Expression e = (ApiExpression) 'i.abs()" is still valid because ApiExpression is an Expression.

Scala's ImplicitExpressionOperations will extend ExpressionOperations.

Some methods have Scala-specific names and would be added to ExpressionOperations. The following table lists new method names. We aim to reduce the name of the most frequently used methods

Scala Implicit Expression

org.apache.flink.table.api.ExpressionOperations

&&

and(Expression)

||

or(Expression)

>

isGreater(Expression)

>=

isGreaterOrEqual(Expression)

<

isLess(Expression)

<=

isLessOrEqual(Expression)

===

isEqual(Expression)

!==

isNotEqual(Expression)

+

plus(Expression)

-

minus(Expression)

/

dividedBy(Expression)

*

times(Expression)

%

mod(Expression)

as(Symbol, Symbol*)

as(String, String...)

UserDefinedAggregateFunctionCall#distinct

distinct(Expression...)

to(Expression)

not supported

unary +

not supported

Compatibility, Deprecation, and Migration Plan

Many methods in Table API that take a string will be deprecated.

Implementation Plan


  1. Implement the API for Java DSL
  2. Add automatic consistency checks
    For keeping the Scala package object and Java Expressions API in sync
  3. Update existing tests
    For verifying the implementation and add a parser test for ensuring backwards compatibility
  4. Deprecate old String-based API
  5. Update documentation
  6. Implement API for Python
    At the beginning, we could think about simplifying the Python API by only supporting the core expression mentioned above and access the remaining expressions via the generic `call()` function.





Test Plan

Existing tests are updated to verify feature 

Rejected Alternatives

Having a DSL like the one mentioned above is the only way in the Java programming language to fix the issues mentioned in the motivation section.

  • No labels