Status

Discussion threadhttp://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DISCUSS-FLIP-162-Consistent-Flink-SQL-time-function-behavior-td40883.html
Vote thread
JIRA

Unable to render Jira issues macro, execution error.

Release1.13

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently some temporal function behaviors are weird to users. 

  • When users use a PROCTIME() in SQL, the return value of PROCTIME() has a timezone offset with the wall-clock time in users' local time zone, users need to add their local time zone offset manually to get expected local timestamp(e.g: Users in Germany need to +1h to get expected local timestamp). 
  • Users can not use CURRENT_DATE/CURRENT_TIME/CURRENT_TIMESTAMP  to get wall-clock timestamp in local time zone, and thus they need write UDF in their SQL just for implementing a simple filter like WHERE date_col =  CURRENT_DATE. 
  • Another common case  is the time window  with day interval based on PROCTIME(), user plan to put all data from one day into the same window, but the window is assigned using timestamp in UTC+0 timezone rather than the session timezone which leads to the window starts with an offset(e.g: Users in China need to add -8h in their business sql start and then +8h when output the result, the conversion like a magic for users). 

These problems come from the fact that lots of time-related functions like PROCTIME(), NOW(), CURRENT_DATE, CURRENT_TIME and CURRENT_TIMESTAMP are returning time values based on UTC+0 time zone.

This FLIP aims to consistent the timestamp function behavior and eventually improve the usability.

Public Interfaces 

As we knew some functions' behavior is wrong currently, after we correct these function, we plan to drop legacy behavior immediately.


Proposed Changes

1. Correct the return value of time functions

I invested all Flink time-related functions current behavior and compared with other DB vendors like Pg,Presto, Hive, Spark, Snowflake,  I made an excel [2] to organize them well.

To correct our current behavior, we need to make the function return type clear, especially for timestamp type, i.e.

  • TIMESTAMP/TIMESTAMP WITHOUT TIME ZONE
  • TIMESTAMP WITH LOCAL TIME ZONE
  • TIMESTAMP WITH TIME ZONE

In order to understand these types better, I wrote a document[3]. BTW, Flink also keeps same semantics for three timestamp types comparing with Hadoop ecosystem.

From my investigation, to correct this time functions' behavior, we have several options
(1) change the function return type
(2) change the function return value
(3) change them both.

All of those way are valid because SQL:2011 does not specify the function return type and every SQL engine vendor has its own implementation[2], for example CURRENT_TIMESTAMP.

functionFlink current behavior

Flink proposed changes

other SQL vendors' behavior

CURRENT_TIMESTAMP

return type: TIMESTAMP 

#session timezone: UTC
2020-12-28 23:52:52

#session timezone: GMT+08:00
2020-12-28 23:52:52

wall clock:
GMT+08:00:2020-12-29 07:52:52

return type: TIMESTAMP WITH LOCAL TIME ZONE

#session timezone: UTC
2020-12-28 23:52:52

#session timezone: GMT+08:00
2020-12-29 07:52:52

In MySQL, Spark, the function NOW() and CURRENT_TIMESTAMP return current timestamp value in session time zone,the return type is TIMESTAMP

In
Pg, Presto, the function NOW() and CURRENT_TIMESTAMP return current timestamp in session time zone,the return type is TIMESTAMP WITH TIME ZONE

In
Snowflake the function CURRENT_TIMESTAMP/LOCALTIMESTAMP return current timestamp in session time zone,the return type is TIMESTAMP WITH LOCAL TIME ZONE



NOTE: Flink supports TIME-related types with precision well,  all example in this FLIP the precision just retains to seconds for simplification purpose.

This FLIP proposes option 1 which changes the return typethe proposed changes as following, the current wall-clock is 2020-12-29 07:52:52 in Beijing time(GMT+08:00):

function

existed problem

current behavior

proposed changes

CURRENT_DATE

returns UTC date, but user expects current date in session time zone

return type: DATE

#session timezone: UTC

2020-12-28

#session timezone: GMT+08:00

2020-12-28

 return current date in session time zone, the return type should be DATE

#session timezone: UTC

2020-12-28

#session timezone: GMT+08:00

2020-12-29

CURRENT_TIME

returns UTC time, but user expects current time in session time zone

return type:  TIME 

#session timezone: UTC

23:52:52

#session timezone: GMT+08:00

23:52:52

return current time in session time zone, the return type should be TIME

#session timezone: UTC

23:52:52

#session timezone: GMT+08:00

07:52:52

CURRENT_TIMESTAMP

returns UTC timestamp, but user expects current timestamp in session time zone


return type:  TIMESTAMP

#session timezone: UTC

2020-12-28 23:52:52

#session timezone: GMT+08:00

2020-12-28 23:52:52

return current timestamp in session time zone, the return type should be

TIMESTAMP WITH LOCAL TIME ZONE

#session timezone: UTC

2020-12-28 23:52:52

#session timezone: GMT+08:00

2020-12-29 07:52:52

NOW()

returns UTC timestamp, but user expects current timestamp in session time zone


return type: TIMESTAMP

#session timezone: UTC

2020-12-28 23:52:52

#session timezone: GMT+08:00

2020-12-28 23:52:52

return current timestamp in session time zone, the return type should be

TIMESTAMP WITH LOCAL TIME ZONE

#session timezone: UTC

2020-12-28 23:52:52

#session timezone: GMT+08:00

2020-12-29 07:52:52

PROCTIME()

returns UTC timestamp, but user expects current timestamp in session time zone

return type: TIMESTAMP  *PROCTIME*

#session timezone: UTC

2020-12-28 23:52:52

#session timezone: GMT+08:00

2020-12-28 23:52:52

return current timestamp in session time zone for PROCTIME(), the return type should be TIMESTAMP  WITH LOCAL TIME ZONE *PROCTIME*

#session timezone: UTC

2020-12-28 23:52:52

#session timezone: GMT+08:00

2020-12-29 07:52:52

After the proposal, the function NOW() and CURRENT_TIMESTAMP  become  synonyms, the function CURRENT_TIME and LOCALTIME become synonyms, you can also lookup all time function behaviors in reference [2]. 

2. Disable CAST between NUMERIC  and TIMESTAMP

Currently, the following CAST conversion behaviors are wrong which does not consider the session time zone. It should use session time zone when cast between NUMERIC type and TIMESTAMP type if there’re strong requirements to support this , the numeric type include TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, they keep same conversion behavior, for example the cast between BIGINT and TIMESTAMP. Although we didn’t expose this feature in the document, some users may use them.

These cast conversions have wrong behavior and problematic semantics,  because SQL:2011 does not contains these cast specification and we never expose to user.

This FLIP propose to disable the cast between numeric and timestamp,  and proposed use current function TO_TIMESTAMP(FROM_UNIXTIME(epochSeconds - zoneoffset)) as a migration plan. 

function

current behavior

existed problem

migration plan

CAST(44 AS TIMESTAMP) 

TIMESTAMP(0) NOT NULL

#session timezone: UTC

1970-01-01 00:00:44 

#session timezone: GMT+08:00

1970-01-01 00:00:44 

The time in BIGINT type usually represents a unixtime semantic, which represents the elapsed time since java epoch(1970-01-01 00:00:00 UTC+0), when convert to a timestamp we should consider local time zone

This is an invalid behavior, disable the invalid CAST behavior, to get same behavior, user can workaround with: 

#session timezone: UTC

  • TO_TIMESTAMP(FROM_UNIXTIME(44 - 0))

1970-01-01 00:00:44 

#session timezone: GMT+08:00

TO_TIMESTAMP(FROM_UNIXTIME(44 - 8 * 60 * 60))

1970-01-01 00:00:44

CAST(TIMESTAMP ‘1970-01-01 00:00:44’ AS BIGINT) 

BIGINT NOT NULL

#session timezone: UTC

44 

#session timezone: GMT+08:00

44

The inverse conversion of above, this conversion is used rarely.

UNIX_TIMESTAMP(TIMESTAMP ‘1970-01-01 00:00:44’)

#session timezone: UTC

44

#session timezone: GMT+08:00

-28756

3. Disable legacy behavior of theses time functions

The return values of functions

  • CURRENT_DATE
  • CURRENT_TIME
  • CURRENT_TIMESTAMP
  • NOW()
  • PROCTIME()

consider the local timezone and the return type of PROCTIME()/NOW()/CURRENT_TIMESTAMP is TIMESTAMP_LTZ, the return type of CURRENT_DATE is DATE, the return type of CURRENT_TIME is TIME.

The legacy behavior of these function doesn't consider the local time zone, and the return type of  PROCTIME()/NOW()/CURRENT_TIMESTAMP is TIMESTAMP, the return type of CURRENT_DATE is DATE, the return type of CURRENT_TIME is TIME.

4. Support defining row time attribute on TIMESTAMP_LTZ

After this, we can support ROWTIME/PROCTIME on type TIMESTAMP WITH LOCAL TIME ZONE, which complements the unifcation.

For usability consideration, this FLIP also propose to introduce a function TO_TIMETSTAMP_LTZ(numeric [,]) which is similar to above TO_TIMESTAMP function but the returns type is TIMESTAMP WITH LOCAL TIME ZONE.


proposed changes

note

Support function  TO_TIMESTAMP_LTZ(numeric_expr, [,scale])

return type: TIMESTAMP(3) WITH LOCAL TIME ZONE

#session timezone: UTC

TO_TIMESTAMP_LTZ(44)

1970-01-01 00:00:44 

#session timezone: GMT+08:00

TO_TIMESTAMP_LTZ(-28756)

1970-01-01 00:00:44

TO_TIMESTAMP_LTZ(numeric_expr [,scale])

TO_TIMESTAMP_LTZ(seconds, 0)

TO_TIMESTAMP_LTZ(milliseconds, 3)



5. Support more conversion classes for LocalZonedTimestampType(TIMESTAMP_LTZ)

After we use  type TIMESTAMP WITH LOCAL TIME ZONE in above functions, we can support the conversion class java.sql.Timestamp  for LocalZonedTimestampType, and we enable the implicit cast conversion between TIMESTAMP and TIMESTAMP_LTZ to resolve the UDF compatibility issue.

For example, if user used a UDF which parameter types contains LocalDateTime, and the SQL column data type is TIMESTAMP comes from PROCTIME()/CURRENT_TIMESTAMP, they can migrate to TIMESTAMP_LTZ smoothly.

6. Support abbreviation name for all timestamp types 

The type name of timestamp is pretty long and inconvenient for users, we can introduce abbreviation for them to make it more convenient.  The abbreviation name mapping as following:

  • TIMESTAMP / TIMESTAMP WITHOUT TIME ZONE         <=> TIMESTAMP 
  • TIMESTAMP WITH LOCAL TIME ZONE                            <=> TIMESTAMP_LTZ  
  • TIMESTAMP WITH TIME ZONE                                         <=> TIMESTAMP_TZ     (supports them in the future until we support TIMESTAMP WITH TIME ZONE )

7. Support TIME(9) for Flink SQL

Due to the historical reason, we didn't not support TIME(9) yet, we think It's a good time point to support it in this FLIP.

8. Use execution mode to hint time function evaluation, introduce CURRENT_ROW_TIMESTAMP function.

For time functions

  • LOCALTIME
  • LOCALTIMESTAMP
  • CURRENT_DATE
  • CURRENT_TIME
  • CURRENT_TIMESTAMP
  • NOW()

Flink evaluates above time function values according to execution mode, i.e. Flink evaluates time function value for row level in Streaming mode, evaluates the time function value at query start for batch mode.

For batch users who want to obtain row level timestamp of now, we introduce function CURRENT_ROW_TIMESTAMP, the function can also be used in streaming mode.

It's similar to  CURRENT_TIMESTAMP function, CURRENT_ROW_TIMESTAMP always returns current timestamp in session time zone, the return type should be TIMESTAMP WITH LOCAL TIME ZONE.

The difference is function CURRENT_ROW_TIMESTAMP always evaluates in row-level, but  function CURRENT_TIMESTAMP evaluates according to the execution mode.

General Implementations

1.Change the codegen implementations for above five functions/cast conversions.

2. Supports all  conversion classes like java.time.LocalDateTime, java.sql.Timestamp that TimestampType supported  for LocalZonedTimestampType to resolve the UDF compatibility issue

3. The session timezone offset for processing-time window should still be considered

4. All connectors/formats should supports TIMESTAMP WITH LOCAL TIME ZONE well 

5we also should record in document  


After the proposal is finished, the above user cases will work smoothly.  Assume users' local time zone is GMT+08:00, the wall-clock is 2020-12-29 07:52:52.

  • user case 1 :
Flink SQL> SELECT NOW(), PROCTIME(), CURRENT_TIMESTAMP, CURRENT_DATE, CURRENT_TIME;
-- output:
+-------------------------+-------------------------+-------------------------+--------------+--------------+
|                   NOW() |              PROCTIME() |       CURRENT_TIMESTAMP | CURRENT_DATE | CURRENT_TIME |
+-------------------------+-------------------------+-------------------------+--------------+--------------+
|     2020-12-29 07:52:52 |     2020-12-29 07:52:52 |     2020-12-29 07:52:52 |   2020-12-29 |      07:52:52|
+-------------------------+-------------------------+-------------------------+--------------+--------------+
  • user case 2: 
Flink SQL> SELECT TUMBLE_START(proctime, INTERVAL ‘1’ DAY),
         >        TUMBLE_END(proctime, INTERVAL ‘1’ DAY),
         >        count(userId) as cnt
         >    FROM userLog
         >    GROUP BY TUMBLE_WINDOW(proctime, INTERVAL ‘1’ DAY);

-- output:
+-------------------------+-------------------------+-------------------------+ 
|            TUMBLE_START |              TUMBLE_END |           count(userId) | 
+-------------------------+-------------------------+-------------------------+ 
|     2020-12-29 00:00:00 |     2020-12-30 00:00:00 |     		  		  100 |
+-------------------------+-------------------------+-------------------------+ 
  • user case 3:
Flink SQL> SELECT  *
         >    FROM userLog
         >    WHERE date_col >= CURRENT_DATE;

-- in the query, records earlier than 2020-12-29 will not be output.
+-------------------------+-------------------------+-------------------------+ 
|                date_col |                  log_ts |                    user | 
+-------------------------+-------------------------+-------------------------+ 
|             2020-12-29  |     2020-12-29 00:00:00 |     		  		Alice |
+-------------------------+-------------------------+-------------------------+ 
|             2020-12-29  |     2020-12-29 00:00:01 |     		  		  Bob |
+-------------------------+-------------------------+-------------------------+ 
|             2020-12-29  |     2020-12-29 00:00:02 |     		  		  Tom |
+-------------------------+-------------------------+-------------------------+ 

Compatibility, Deprecation, and Migration Plan

  • Compatibility

This is an incompatible change,  that users must change their SQL if they used these time functions,  user can also write UDF to obtain same of legacy time function return value.

Test Plan

Will add plan tests, unit tests, window operator harness tests as well as IT tests. 

Future Plan

As a conservative plan, we don't introduce the config 'table.exec.time-function-evaluation' right now, but we have discussed that we should introduce it in the future once we receive enough user requirements / feedbacks.

The option is used to control the materialize time point of time function value. The time function includes:

  • LOCALTIME
  • LOCALTIMESTAMP
  • CURRENT_DATE
  • CURRENT_TIME
  • CURRENT_TIMESTAMP
  • NOW()

The valid value of table.exec.time-function-evaluation includes 'auto', 'row', 'query-start'.

  • 'auto', the default value which means Flink evaluates above time function values according to execution mode, i.e. Flink evaluates time function value for per record in Streaming mode, evaluates the time function value at query start for batch mode.
  • 'row', Flink evaluates time function value for per record.
  • 'query-start', Flink evaluate above time function value at query-start, the time function always returns same value in a query. 

Rejected Alternatives

1. Change the return type of function CURRENT_TIMESTAMP/NOW()/PROCETIME() to TIMESTAMP WITH TIME ZONE. This proposal needs to introduce a new type TIMESTAMP WITH TIME ZONE, and we think there are no enough benefits. If we do this, the return type of function CURRENT_TIME must be TIME WITH TIME ZONE for consistent consideration, we need to introduce another type.

2. Change the return value of function  CURRENT_TIMESTAMP/NOW()/PROCETIME() to the timestamp value in local timezone. The proposal is fine if we only use it in FLINK SQL world, but we need to consider the conversion between Table/DataStream, assume a record produced in UTC+0 timezone with TIMESTAMP '1970-01-01 08:00:44'  and the Flink SQL processes the data with session time zone 'GMT+08:00', if the sql program need to convert the Table to DataStream, then we need to calculate the timestamp in StreamRecord with session time zone (GMT+08:00), then we will get 44 in DataStream program, but it is wrong because the expected value should be (8 * 60 * 60 + 44). The corner case tell us that the ROWTIME/PROCTIME in Flink are based on UTC+0, when correct the PROCTIME() function, the better way is to use TIMESTAMP WITH LOCAL TIME ZONE which keeps same long value with time based on UTC+0 and can be expressed with  local timezone.


References

  1. https://docs.cloudera.com/documentation/enterprise/latest/topics/impala_timezone.html

      2. https://docs.google.com/document/d/1iY3eatV8LBjmF0gWh2JYrQR0FlTadsSeuCsksOVp_iA/edit?usp=sharing

      3. https://docs.google.com/spreadsheets/d/1T178krh9xG-WbVpN7mRVJ8bzFnaSJx3l-eg1EWZe_X4/edit?usp=sharing