Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

JIRA

Released: 1.13.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently some temporal function behaviors are wired to users. 

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DISCUSS-FLIP-162-Consistent-Flink-SQL-time-function-behavior-td40883.html
Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-21617

Release1.13


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Table of Contents

Motivation

Currently some temporal function behaviors are weird to users. 

  • When users use a PROCTIME() in SQL, the return value of PROCTIME() has a timezone offset with the wall-clock time in users' local time zone, users need to add their local time zone offset manually to get expected local timestamp(e.g: Users in Germany need to +1h to get expected local timestamp). 
  • Users can not use CURRENT_DATE/CURRENT_TIME/CURRENT_TIMESTAMP  to get wall-clock timestamp in local time zone, and thus they need write UDF in their SQL
  • When users use a PROCTIME() in SQL, the return value of PROCTIME() has a timezone offset with the wall-clock time in users' local time zone, users need to add their local time zone offset manually to get expected local timestamp(e.g: Users in Germany need to +1h to get expected local timestamp). 
  • Users can not use CURRENT_DATE/CURRENT_TIME/CURRENT_TIMESTAMP  to get wall-clock timestamp in local time zone, and thus they need write UDF in their SQL just for implementing a simple filter like WHERE date_col =  CURRENT_DATE. 

...

As we knew some functions' behavior is wrong currently, but after we correct these function, the legacy behavior should still work in old code.

This FLIP introduce an option for compatibility consideration, given two option names, I prefer the first one, the second one is like Impala style[1].

table.exec.fallback-legacy-time-function = false/true 

table.exec.use-utc-for-unixtime-conversion = false/true    

Proposed Changes

1. Correct the return value of time functions

we plan to drop legacy behavior immediately.


Proposed Changes

1. Correct the return value of time functions

I invested all Flink time-related functions current behavior I invested all Flink time-related functions current behavior and compared with other DB vendors like Pg,Presto, Hive, Spark, Snowflake,  I made an excel [2] to organize them well.

...

All of those way are valid because SQL:2011 does not specify the function return type and every SQL engine vendor has its own implementation[2], for example CURRENT_TIMESTAMP.

function
flink
Flink current behavior
flink

Flink proposed changes

other SQL vendors' behavior

CURRENT_TIMESTAMP

return type: TIMESTAMP 

 #session

#session timezone: UTC
2020-12-

28T23

28 23:52:52

#session timezone:

UTC

GMT+

8

08:00
2020-12-

28T23

28 23:52:52

wall clock:

UTC

GMT+

8

08:00:2020-12-29 07:52:52

return type: TIMESTAMP WITH LOCAL TIME ZONE

#session timezone: UTC
2020-12-

28T23

28 23:52:52

#session timezone:

UTC

GMT+

8

08:00
2020-12-

29T07

29 07:52:52

In MySQL, Spark, the function NOW() and CURRENT_TIMESTAMP return current timestamp value in session time zone,the return type is TIMESTAMP

In
Pg, Presto, the function NOW() and CURRENT_TIMESTAMP return current timestamp in session time zone,the return type is TIMESTAMP WITH TIME ZONE

In
Snowflake the function CURRENT_TIMESTAMP/LOCALTIMESTAMP return current timestamp in session time zone,the return type is TIMESTAMP WITH LOCAL TIME ZONE



NOTE: Flink supports TIME-related types with precision well,  all example in this FLIP the precision just retains to seconds for simplification purpose.

This FLIP proposes option 2 1 which only change changes the return valuetypethe proposed changes as following, the current wall-clock is 2020-12-29 07:52:52 in Beijing time(UTCGMT+808:00):

function

existed problem

current behavior

proposed changes

CURRENT_DATE

returns UTC date, but user expects current date in session time zone

return type: DATE

#session timezone: UTC

2020-12-28

#session timezone:

UTC

GMT+

8

08:00

2020-12-28

 return current date in session time zone, the return type should be DATE

#session timezone: UTC

2020-12-28

#session timezone:

UTC

GMT+

8

08:00

2020-12-29

CURRENT_TIME

returns UTC time, but user expects current time in session time zone

return type:  TIME 

#session timezone: UTC

23:52:52

#session timezone:

UTC

GMT+

8

08:00

23:52:52

return current time in session time zone, the return type should be TIME

#session timezone: UTC

23:52:52

#session timezone:

UTC

GMT+

8

08:00

07:52:52

CURRENT_TIMESTAMP

returns UTC timestamp, but user expects current timestamp in session time zone


return type:  

TIMESTAMP 

TIMESTAMP

#session timezone: UTC

2020-12-

28T23

28 23:52:52

#session timezone:

UTC

GMT+

8

08:00

2020-12-

28T23

28 23:52:52

return current timestamp in session time zone, the return type should be

TIMESTAMP

TIMESTAMP WITH LOCAL TIME ZONE

#session timezone: UTC

2020-12-

28T23

28 23:52:52

#session timezone:

UTC

GMT+

8

08:00

2020-12-

29T07

29 07:52:52

NOW()

returns UTC timestamp, but user expects current timestamp in session time zone


return type:

  TIMESTAMP 

 TIMESTAMP

#session timezone: UTC

2020-12-

28T23

28 23:52:52

#session timezone:

UTC

GMT+

8

08:00

2020-12-

28T23

28 23:52:52

return current timestamp in session time zone, the return type should be

TIMESTAMP

TIMESTAMP WITH LOCAL TIME ZONE

#session timezone: UTC

2020-12-

28T23

28 23:52:52

#session timezone:

UTC

GMT+

8

08:00

2020-12-

29T07

29 07:52:52

PROCTIME()

returns UTC timestamp, but user expects current timestamp in session time zone

return type:

  

TIMESTAMP  *PROCTIME*

#session timezone: UTC

2020-12-

28T23

28 23:52:52

#session timezone:

UTC

GMT+

8

08:00

2020-12-

28T23

28 23:52:52

return current timestamp in session time zone for PROCTIME(), the return type should be

TIMESTAMP  

TIMESTAMP  WITH LOCAL TIME ZONE *PROCTIME*

#session timezone: UTC

2020-12-

28T23

28 23:52:52

#session timezone:

UTC

GMT+

8

08:00

2020-12-

29T07

29 07:52:52

After the proposal, the function NOW() , and CURRENT_TIMESTAMP and LOCALTIMESTAMP TIMESTAMP  become  synonyms, the function CURRENT_TIME and LOCALTIME become synonyms, you can also lookup all time function behaviors in reference [2]. 

...

These cast conversions have wrong behavior and problematic semantics,  because SQL:2011 does not contains these cast specification and we never expose to user.

This FLIP propose to disable the cast between numeric and timestamp,  and introduce a the function proposed use current function TO_TIMESTAMP(seconds) function which accepts a BIGINT seconds and return a TIMESTAMP in local time zoneFROM_UNIXTIME(epochSeconds - zoneoffset)) as a migration plan

function

current behavior

existed problem

proposed changes

migration plan

CAST(44 AS TIMESTAMP) 

TIMESTAMP(

6

0) NOT NULL

#session timezone: UTC

1970-01-

01T

01 00:00:44 

#session timezone:

UTC

GMT+

8

08:00

1970-01-

01T

01 00:00:44 

The time in BIGINT type usually represents a unixtime semantic, which represents the elapsed time since java epoch(1970-01-01 00:00:00 UTC+0), when convert to a timestamp we should consider local time zone

Support function 

This is an invalid behavior, disable the invalid CAST behavior, to get same behavior, user can workaround with: 

#session timezone: UTC

  • TO_TIMESTAMP(
seconds)

return type: TIMESTAMP(3)

#session timezone: UTC

TO_TIMESTAMP(44)
  • FROM_UNIXTIME(44 - 0))

1970-01-

01T

01 00:00:44 

#session timezone:

UTC

GMT+

8

08:00

TO_TIMESTAMP(FROM_UNIXTIME(

-28756

44 - 8 * 60 * 60))

1970-01-

01T

01 00:00:44

CAST(TIMESTAMP ‘1970-01-01 00:00:44’ AS BIGINT) 

BIGINT NOT NULL

#session timezone: UTC

44 

#session timezone:

UTC

GMT+

8

08:00

44

The inverse conversion of above, this conversion is used rarely.

UNIX_TIMESTAMP(TIMESTAMP ‘1970-01-01 00:00:44’)

#session timezone: UTC

44

#session timezone:

UTC

GMT+

8

08:00

-28756

3.

...

Disable legacy behavior of theses time functions

The return values of functions

  • CURRENT_DATE
  • CURRENT_TIME
  • CURRENT_TIMESTAMP
  • NOW()
  • PROCTIME()

consider the local timezone and the return type of PROCTIME()/NOW()/CURRENT_TIMESTAMP is TIMESTAMP_LTZ, the return type of CURRENT_DATE is DATE, the return type of CURRENT_TIME is TIME.

The legacy behavior of these function doesn't consider the local time zone, and the return type of  PROCTIME()/NOW()/CURRENT_TIMESTAMP is TIMESTAMP, the return type of CURRENT_DATE is DATE, the return type of CURRENT_TIME is TIME.

4. Support defining row time attribute on TIMESTAMP_LTZ

After this, we can support ROWTIME/PROCTIME on type TIMESTAMP WITH LOCAL TIME ZONE, which complements the unifcation.

For usability consideration, this FLIP also propose to introduce a function TO_TIMETSTAMP_LTZ(numeric [,]) which is similar to above TO_TIMESTAMP function but the returns type is TIMESTAMP WITH LOCAL TIME ZONE.


proposed changes

note

Support function  TO_TIMESTAMP_LTZ(numeric_expr, [,scale])

return type: TIMESTAMP(3) WITH LOCAL TIME ZONE

#session timezone: UTC

TO_TIMESTAMP_LTZ(44)

1970-01-01 00:00:44 

#session timezone: GMT+08:00

TO_TIMESTAMP_LTZ(-28756)

1970-01-01 00:00:44

TO_TIMESTAMP_LTZ(numeric_expr [,scale])

TO_TIMESTAMP_LTZ(seconds, 0)

TO_TIMESTAMP_LTZ(milliseconds, 3)



5. Support more conversion classes for LocalZonedTimestampType(TIMESTAMP_LTZ)

After we use  type TIMESTAMP WITH LOCAL TIME ZONE in above functions, we can support the conversion class java.sql.Timestamp  for LocalZonedTimestampType, and we enable the implicit cast conversion between TIMESTAMP and TIMESTAMP_LTZ to resolve the UDF compatibility issue.

For example, if user used a UDF which parameter types contains LocalDateTime, and the SQL column data type is TIMESTAMP comes from PROCTIME()/CURRENT_TIMESTAMP, they can migrate to TIMESTAMP_LTZ smoothly.

6. Support abbreviation name for all timestamp types 

The type name of timestamp is pretty long and inconvenient for users, we can introduce abbreviation for them to make it more convenient.  The abbreviation name mapping as following:

  • TIMESTAMP / TIMESTAMP WITHOUT TIME ZONE         <=> TIMESTAMP 
  • TIMESTAMP WITH LOCAL TIME ZONE                            <=> TIMESTAMP_LTZ  
  • TIMESTAMP WITH TIME ZONE                                         <=> TIMESTAMP_TZ     (supports them in the future until we support TIMESTAMP WITH TIME ZONE )

7. Support TIME(9) for Flink SQL

Due to the historical reason, we didn't not support TIME(9) yet, we think It's a good time point to support it in this FLIP.

8. Use execution mode to hint time function evaluation, introduce CURRENT_ROW_TIMESTAMP function.

For time functions

  • LOCALTIME
  • LOCALTIMESTAMP
  • CURRENT_DATE
  • CURRENT_TIME
  • CURRENT_TIMESTAMP
  • NOW()

Flink evaluates above time function values according to execution mode, i.e. Flink evaluates time function value for row level in Streaming mode, evaluates the time function value at query start for batch mode.

For batch users who want to obtain row level timestamp of now, we introduce function CURRENT_ROW_TIMESTAMP, the function can also be used in streaming mode.

It's similar to  CURRENT_TIMESTAMP function, CURRENT_ROW_TIMESTAMP always returns current timestamp in session time zone, the return type should be TIMESTAMP WITH LOCAL TIME ZONE.

The difference is function CURRENT_ROW_TIMESTAMP always evaluates in row-level, but  function CURRENT_TIMESTAMP evaluates according to the execution mode.

General Implementations

1.Change the codegen implementations for above five functions/cast conversions.

2. Supports all  conversion classes like java.time.LocalDateTime, java.sql.Timestamp that TimestampType supported  for LocalZonedTimestampType to resolve the UDF compatibility issue

3. The session timezone offset for processing-time window should still be considered

4. All connectors/formats should supports TIMESTAMP WITH LOCAL TIME ZONE well 

5we also should record in document  


After the proposal is finished, the above user cases will work smoothly.  Assume users' local time zone is GMT+08:00, the wall-clock is 2020-12-29 07:52:52.

  • user case 1 :
Code Block
languagesql
Flink SQL> SELECT NOW(), PROCTIME(), CURRENT_TIMESTAMP, CURRENT_DATE, CURRENT_TIME;
-- output:
+

The row time field is stored  in a separate field with Long type of StreamRecord in DataStream world, and the value with the timestamp is the milliseconds since java epoch(1970-01-01 00:00:00 UTC+0), which definitely represents an Instant semantics. 

When converting a Table contains row time from/to DataStream, the timestamp in StreamRecord will exchange with Table’s row time column.

We need to consider the time zone offset just like the cast conversion between the TIMESTAMP with wall-clock semantics and BIGINT with Instant semantics.  This way can make the both DataStream and SQL/API users feel intuitive.

4. Introduce an option that enable fallback to legacy behavior

 The default value of table.exec.fallback-legacy-time-function is 'false' which means

(1) the return values of function

  • CURRENT_DATE
  • CURRENT_TIME
  • CURRENT_TIMESTAMP
  • NOW()
  • PROCTIME()

consider the local timezone.

(2) The cast between NUMERIC and TIMESTAMP is forbidden

(3) The session timezone offset will be considered when convert a Table from/to DataStream.

Users can set the option to 'true', at this moment:

(1) These functions would keep the legacy behavior.

(2) The cast between NUMERIC and TIMESTAMP is supported.

(3)The session timezone offset will not be considered when convert a Table from/to DataStream.

General Implementations

1.Change the codegen implementations for above five functions/cast conversions according to the value of introduced table option: table.exec.fallback-legacy-time-function

2.Consider the session time zone offset when materializing a PROCTIME() attribute, including PROCTIME_MATERIALIZE function and the processing timestamp used to register timer in the window operator, because after change the return value of PROCTIME()function,

the window based on processing time should be triggered by the changed processing time value.

Compatibility, Deprecation, and Migration Plan

  • Compatibility

This is an incompatible change, we introduce SQL/API option  table.exec.fallback-legacy-time-function for compacting current wrong behavior, and set it to ‘false’. If users want to keep the legacy behavior, they need to set it to ’true’ manually, this would be add to release note. 

  • Migration Plan

After the proposal is finished, the above user cases will work smoothly.  Assume users' local time zone is UTC+8, the wall-clock is 2020-12-29 07:52:52.

  • user case 1 :
Code Block
languagesql
Flink SQL> SELECT NOW(), PROCTIME(), CURRENT_TIMESTAMP, CURRENT_DATE, CURRENT_TIME;
-- output:
+-------------------------+-------------------------+-------------------------+--------------+--------------+
|                   NOW() |              PROCTIME() |       CURRENT_TIMESTAMP | CURRENT_DATE | CURRENT_TIME |
+-------------------------+-------------------------+----------------------------+--------------+--------------+
|     2020-12-29T07:52:52 |     2020-12-29T07:52:52 |     2020-12-29T07:52:52 |   2020-12-29 |      07:52:52|
+-------------------------+------------------------------------+-------------------------+--------------+--------------+
|                   NOW() |              PROCTIME() |       CURRENT_TIMESTAMP | CURRENT_DATE | CURRENT_TIME |
+-------------------------+-------------------------+-------------------------+--------------+--------------+
|     2020-12-29 07:52:52 |     2020-12-29 07:52:52 |     2020-12-29 07:52:52 |   2020-12-29 |      07:52:52|
+-------------------------+-------------------------+-------------------------+--------------+--------------+
  • user case 2: 
Code Block
languagesql
Flink SQL> SELECT TUMBLE_START(proctime, INTERVAL ‘1’ DAY),
         >        TUMBLE_END(proctime, INTERVAL ‘1’ DAY),
         >        count(userId) as cnt
         >    FROM userLog
         >    GROUP BY TUMBLE_WINDOW(proctime, INTERVAL ‘1’ DAY);

-- output:
+-------------------------+--------------+--------------+
  • user case 2: 
Code Block
languagesql
Flink SQL> SELECT TUMBLE_START(proctime, INTERVAL ‘1’ DAY),
         >        TUMBLE_END(proctime, INTERVAL ‘1’ DAY),
         >-------+-------------------------+ 
|        count(userId) as cnt
  TUMBLE_START |      >    FROM userLog
   TUMBLE_END |     >    GROUP BY TUMBLE_WINDOW(proctime, INTERVAL ‘1’ DAY);

-- output:
count(userId) | 
+-------------------------+-------------------------+-------------------------+ 
|            TUMBLE_START |        2020-12-29 00:00:00 |      TUMBLE_END2020-12-30 00:00:00 |     		  		   100 count(userId) | 
+-------------------------+-------------------------+----------------------------+ 
|-----+ 
  • user case 3:
Code Block
languagesql
Flink SQL> SELECT  *
         >    FROM userLog
          2020-12-29T00:00:00 |>    WHERE date_col  2020-12-30T00:00:00 |     		  		  100 |>= CURRENT_DATE;

-- in the query, records earlier than 2020-12-29 will not be output.
+-------------------------+-------------------------+-------------------------+ 
  • user case 3:
Code Block
languagesql
Flink SQL> SELECT  *
|             >   date_col FROM|  userLog
         >    WHERE date >= CURRENT_DATE;

-- in the query, records earlier than 2020-12-29 will not be output.log_ts |                    user | 
+-------------------------+-------------------------+-------------------------+ 
|             2020-12-29       date |                  log_ts2020-12-29 00:00:00 |     		               user | 		Alice |
+-------------------------+-------------------------+-------------------------+ 
|             2020-12-29  |     2020-12-29T0029 00:00:0001 |     		  		Alice  Bob |
+-------------------------+-------------------------+-------------------------+ 
|             2020-12-29  |     2020-12-29T0029 00:00:0102 |     		  		  BobTom |
+-------------------------+-------------------------+-------------------------+ 
|             2020-12-29  |     2020-12-29T00:00:02 |     		  		  Tom |
+-------------------------+-------------------------+-------------------------+ 

Test Plan

Will add plan tests, unit tests, window operator harness tests as well as IT tests. 

Rejected Alternatives

----+ 

Compatibility, Deprecation, and Migration Plan

  • Compatibility

This is an incompatible change,  that users must change their SQL if they used these time functions,  user can also write UDF to obtain same of legacy time function return value.

Test Plan

Will add plan tests, unit tests, window operator harness tests as well as IT tests. 

Future Plan

As a conservative plan, we don't introduce the config 'table.exec.time-function-evaluation' right now, but we have discussed that we should introduce it in the future once we receive enough user requirements / feedbacks.

The option is used to control the materialize time point of time function value. The time function includes:

  • LOCALTIME
  • LOCALTIMESTAMP
  • CURRENT_DATE
  • CURRENT_TIME
  • CURRENT_TIMESTAMP
  • NOW()

The valid value of table.exec.time-function-evaluation includes 'auto', 'row', 'query-start'.

  • 'auto', the default value which means Flink evaluates above time function values according to execution mode, i.e. Flink evaluates time function value for per record in Streaming mode, evaluates the time function value at query start for batch mode.
  • 'row', Flink evaluates time function value for per record.
  • 'query-start', Flink evaluate above time function value at query-start, the time function always returns same value in a query. 

Rejected Alternatives

1. Change the return type of function CURRENT_TIMESTAMP/NOW()/PROCETIME() to TIMESTAMP WITH TIME ZONE. This proposal needs to introduce a new type TIMESTAMP WITH TIME ZONE, and we think there are no enough benefits. If we do this, the return type of function CURRENT_TIME must be TIME WITH TIME ZONE for consistent consideration, we need to introduce another type.

2. Change the return value of function  1. Change the return type of function CURRENT_TIMESTAMP/NOW()/PROCETIME() to TIMESTAMP WITH TIME ZONE. This proposal needs to introduce a new type TIMESTAMP WITH TIME ZONE, and we think there are no enough benefits. If we do this, the return type of function CURRENT_TIME must be TIME WITH TIME ZONE for consistent consideration, we need to introduce another type.

2. Change the return type of function  CURRENT_TIMESTAMP/NOW()/PROCETIME() to TIMESTAMP WITH LOCAL TIME ZONE. This proposal will lead to a embarrassed situation for function CURRENT_TIME, because no DB vendor use type TIME WITH LOCAL TIME ZONE yet, and the time semantic of TIMESTAMP WITH LOCAL TIME ZONE is instant which is too complex to understand for normal users.

to the timestamp value in local timezone. The proposal is fine if we only use it in FLINK SQL world, but we need to consider the conversion between Table/DataStream, assume a record produced in UTC+0 timezone with TIMESTAMP '1970-01-01 08:00:44'  and the Flink SQL processes the data with session time zone 'GMT+08:00', if the sql program need to convert the Table to DataStream, then we need to calculate the timestamp in StreamRecord with session time zone (GMT+08:00), then we will get 44 in DataStream program, but it is wrong because the expected value should be (8 * 60 * 60 + 44). The corner case tell us that the ROWTIME/PROCTIME in Flink are based on UTC+0, when correct the PROCTIME() function, the better way is to use TIMESTAMP WITH LOCAL TIME ZONE which keeps same long value with time based on UTC+0 and can be expressed with  local timezone.


References

...

  1. https://docs.cloudera.com/documentation/enterprise/latest/topics/impala_timezone.html

...