...
All of those way are valid because SQL:2011 does not specify the function return type and every SQL engine vendor has its own implementation[2], for example CURRENT_TIMESTAMP.
NOTE: Flink supports TIME-related types with precision well, all example in this FLIP the precision just retains to seconds for simplification purpose.
function |
Flink current behavior |
Flink proposed changes | other SQL vendors' behavior |
CURRENT_TIMESTAMP |
return type: TIMESTAMP #session timezone: UTC |
28 23:52:52 |
28 23:52:52 | return type: TIMESTAMP WITH LOCAL TIME ZONE #session timezone: UTC |
28 23:52:52 |
29 07:52:52 | In MySQL, Spark, the function NOW() and CURRENT_TIMESTAMP return current timestamp value in session time zone,the return type is TIMESTAMP In Pg, Presto, the function NOW() and CURRENT_TIMESTAMP return current timestamp in session time zone,the return type is TIMESTAMP WITH TIME ZONE In Snowflake the function CURRENT_TIMESTAMP/LOCALTIMESTAMP return current timestamp in session time zone,the return type is TIMESTAMP WITH LOCAL TIME ZONE |
NOTE: Flink supports TIME-related types with precision well, all example in this FLIP the precision just retains to seconds for simplification purpose.
This FLIP proposes option 1 which changes the return typeThis FLIP proposes option 2 which only change the return value, the proposed changes as following, the current wall-clock is 2020-12-29 07:52:52 in Beijing time(UTC+8):
function | existed problem | current behavior | proposed changes |
CURRENT_DATE | returns UTC date, but user expects current date in session time zone | return type: DATE #session timezone: UTC 2020-12-28 #session timezone: UTC+8 2020-12-28 | return current date in session time zone, the return type should be DATE #session timezone: UTC 2020-12-28 #session timezone: UTC+8 2020-12-29 |
CURRENT_TIME | returns UTC time, but user expects current time in session time zone | return type: TIME #session timezone: UTC 23:52:52 #session timezone: UTC+8 23:52:52 | return current time in session time zone, the return type should be TIME #session timezone: UTC 23:52:52 #session timezone: UTC+8 07:52:52 |
CURRENT_TIMESTAMP | returns UTC timestamp, but user expects current timestamp in session time zone | return type: |
TIMESTAMP #session timezone: UTC 2020-12- |
28 23:52:52 #session timezone: UTC+8 2020-12- |
28 23:52:52 | return current timestamp in session time zone, the return type should be |
TIMESTAMP WITH LOCAL TIME ZONE #session timezone: UTC 2020-12- |
28 23:52:52 #session timezone: UTC+8 2020-12- |
29 07:52:52 | ||
NOW() | returns UTC timestamp, but user expects current timestamp in session time zone | return type: |
TIMESTAMP #session timezone: UTC 2020-12- |
28 23:52:52 #session timezone: UTC+8 2020-12- |
28 23:52:52 | return current timestamp in session time zone, the return type should be |
TIMESTAMP WITH LOCAL TIME ZONE #session timezone: UTC 2020-12- |
28 23:52:52 #session timezone: UTC+8 2020-12- |
29 07:52:52 | ||
PROCTIME() | returns UTC timestamp, but user expects current timestamp in session time zone | return type: |
TIMESTAMP *PROCTIME* #session timezone: UTC 2020-12- |
28 23:52:52 #session timezone: UTC+8 2020-12- |
28 23:52:52 | return current timestamp in session time zone for PROCTIME(), the return type should be |
TIMESTAMP WITH LOCAL TIME ZONE *PROCTIME* #session timezone: UTC 2020-12- |
28 23:52:52 #session timezone: UTC+8 2020-12- |
29 07:52:52 |
After the proposal, the function NOW() , and CURRENT_TIMESTAMP and LOCALTIMESTAMP TIMESTAMP become synonyms, the function CURRENT_TIME and LOCALTIME become synonyms, you can also lookup all time function behaviors in reference [2].
...
function | current behavior | existed problem | proposed changes |
CAST(44 AS TIMESTAMP) | TIMESTAMP(6) NOT NULL #session timezone: UTC 1970-01-01T00:00:44 #session timezone: UTC+8 1970-01-01T00:00:44 | The time in BIGINT type usually represents a unixtime semantic, which represents the elapsed time since java epoch(1970-01-01 00:00:00 UTC+0), when convert to a timestamp we should consider local time zone | Support function TO_TIMESTAMP(seconds) return type: TIMESTAMP(3) #session timezone: UTC TO_TIMESTAMP(44) 1970-01-01T00:00:44 #session timezone: UTC+8 TO_TIMESTAMP(-28756) 1970-01-01T00:00:44 |
CAST(TIMESTAMP ‘1970-01-01 00:00:44’ AS BIGINT) | BIGINT NOT NULL #session timezone: UTC 44 #session timezone: UTC+8 44 | The inverse conversion of above, this conversion is used rarely. | UNIX_TIMESTAMP(TIMESTAMP ‘1970-01-01 00:00:44’) #session timezone: UTC 44 #session timezone: UTC+8 -28756 |
3.
...
Introduce an option that enable fallback to legacy behavior
The default value of table.exec.fallback-legacy-time-function is 'false' which means
(1) the return values of function
- CURRENT_DATE
- CURRENT_TIME
- CURRENT_TIMESTAMP
- NOW()
- PROCTIME()
consider the local timezone.
(2) The cast between NUMERIC and TIMESTAMP is forbidden
Users can set the option to 'true', at this moment:
(1) These functions would keep the legacy behavior
The row time field is stored in a separate field with Long type of StreamRecord in DataStream world, and the value with the timestamp is the milliseconds since java epoch(1970-01-01 00:00:00 UTC+0), which definitely represents an Instant semantics.
When converting a Table contains row time from/to DataStream, the timestamp in StreamRecord will exchange with Table’s row time column.
We need to consider the time zone offset just like the cast conversion between the TIMESTAMP with wall-clock semantics and BIGINT with Instant semantics. This way can make the both DataStream and SQL/API users feel intuitive.
4. Introduce an option that enable fallback to legacy behavior
The default value of table.exec.fallback-legacy-time-function is 'false' which means
(1) the return values of function
- CURRENT_DATE
- CURRENT_TIME
- CURRENT_TIMESTAMP
- NOW()
- PROCTIME()
consider the local timezone.
(2) The cast between NUMERIC and TIMESTAMP is forbidden
(3) The session timezone offset will be considered when convert a Table from/to DataStream.
Users can set the option to 'true', at this moment:
(1) These functions would keep the legacy behavior.
(2) The cast between NUMERIC and TIMESTAMP is supported.
(3)The session timezone offset will not be considered when convert a Table from/to DataStream.
General Implementations
1.Change the codegen implementations for above five functions/cast conversions according to the value of introduced table option: table.exec.fallback-legacy-time-function
2.Consider the session time zone offset when materializing a PROCTIME() attribute, including PROCTIME_MATERIALIZE function and the processing timestamp used to register timer in the window operator, because after change the return value of PROCTIME()function,
the window based on processing time should be triggered by the changed processing time value.
After the proposal is finished, the above user cases will work smoothly. Assume users' local time zone is UTC+8, the wall-clock is 2020-12-29 07:52:52.
- user case 1 :
supported.
General Implementations
1.Change the codegen implementations for above five functions/cast conversions according to the value of introduced table option: table.exec.fallback-legacy-time-function
2. Supports all conversion classes like LocalDateTime, sql.Timestamp that TimestampType supported for LocalZonedTimestampType to resolve the UDF compatibility issue
3. The session timezone offset for processing-time window should still be considered
4. All connectors/formats should supports TIMESTAMP WITH LOCAL TIME ZONE well
5. we also should record in document
After the proposal is finished, the above user cases will work smoothly. Assume users' local time zone is UTC+8, the wall-clock is 2020-12-29 07:52:52.
- user case 1 :
Code Block | ||
---|---|---|
| ||
Flink SQL> SELECT NOW(), PROCTIME(), CURRENT_TIMESTAMP, CURRENT_DATE, CURRENT_TIME;
-- output:
+---------- | ||
Code Block | ||
| ||
Flink SQL> SELECT NOW(), PROCTIME(), CURRENT_TIMESTAMP, CURRENT_DATE, CURRENT_TIME; -- output: +-------------------------+-------------------------+-------------------------+--------------+--------------+ | NOW() | PROCTIME() | CURRENT_TIMESTAMP | CURRENT_DATE | CURRENT_TIME | +-------------------------+-------------------------+-------------------------+--------------+--------------+ | 2020-12-29T0729 07:52:52 | 2020-12-29T0729 07:52:52 | 2020-12-29T0729 07:52:52 | 2020-12-29 | 07:52:52| +-------------------------+-------------------------+-------------------------+--------------+--------------+ |
...
Code Block | ||
---|---|---|
| ||
Flink SQL> SELECT TUMBLE_START(proctime, INTERVAL ‘1’ DAY), > TUMBLE_END(proctime, INTERVAL ‘1’ DAY), > count(userId) as cnt > FROM userLog > GROUP BY TUMBLE_WINDOW(proctime, INTERVAL ‘1’ DAY); -- output: +-------------------------+-------------------------+-------------------------+ | TUMBLE_START | TUMBLE_END | count(userId) | +-------------------------+-------------------------+-------------------------+ | 2020-12-29T0029 00:00:00 | 2020-12-30T0030 00:00:00 | 100 | +-------------------------+-------------------------+-------------------------+ |
...
Code Block | ||
---|---|---|
| ||
Flink SQL> SELECT * > FROM userLog > WHERE date_col >= CURRENT_DATE; -- in the query, records earlier than 2020-12-29 will not be output. +-------------------------+-------------------------+-------------------------+ | date_col | log_ts | user | +-------------------------+-------------------------+-------------------------+ | 2020-12-29 | 2020-12-29T0029 00:00:00 | Alice | +-------------------------+-------------------------+-------------------------+ | 2020-12-29 | 2020-12-29T0029 00:00:01 | Bob | +-------------------------+-------------------------+-------------------------+ | 2020-12-29 | 2020-12-29T0029 00:00:02 | Tom | +-------------------------+-------------------------+-------------------------+ |
Compatibility, Deprecation, and Migration Plan
- Compatibility
This is an incompatible change, we introduce SQL/API option table.exec.fallback-legacy-
...
time-function for compacting current wrong behavior, and set it to ‘false’. If users want to keep the legacy behavior, they need to set this option value to ’true’ manually, this would be add to release note.
Test Plan
Will add plan tests, unit tests, window operator harness tests as well as IT tests.
Rejected Alternatives
1. Change the return type of function CURRENT_TIMESTAMP/NOW()/PROCETIME() to TIMESTAMP WITH TIME ZONE. This proposal needs to introduce a new type TIMESTAMP WITH TIME ZONE, and we think there are no enough benefits. If we do this, the return type of function CURRENT_TIME must be TIME WITH TIME ZONE for consistent consideration, we need to introduce another type.
2. Change the return value of function
Compatibility, Deprecation, and Migration Plan
- Compatibility
This is an incompatible change, we introduce SQL/API option table.exec.fallback-legacy-time-function for compacting current wrong behavior, and set it to ‘false’. If users want to keep the legacy behavior, they need to set this option value to ’true’ manually, this would be add to release note.
Test Plan
Will add plan tests, unit tests, window operator harness tests as well as IT tests.
Rejected Alternatives
1. Change the return type of function CURRENT_TIMESTAMP/NOW()/PROCETIME() to TIMESTAMP WITH TIME ZONE. This proposal needs to introduce a new type TIMESTAMP WITH TIME ZONE, and we think there are no enough benefits. If we do this, the return type of function CURRENT_TIME must be TIME WITH TIME ZONE for consistent consideration, we need to introduce another type.2. Change the return type of function CURRENT_TIMESTAMP/NOW()/PROCETIME() to TIMESTAMP WITH LOCAL TIME ZONE. This proposal will lead to a embarrassed situation for function CURRENT_TIME, because no DB vendor use type TIME WITH LOCAL TIME ZONE yet, and the time semantic of TIMESTAMP WITH LOCAL TIME ZONE is instant which is too complex to understand for normal usersthe timestamp value in local timezone. The proposal is fine if we only use it in FLINK SQL world, but we need to consider the conversion between Table/DataStream, assume a record produced in UTC+0 timezone with TIMESTAMP '1970-01-01 08:00:44' and the Flink SQL processes the data with session time zone 'UTC+8', if the sql program need to convert the Table to DataStream, then we need to calculate the timestamp in StreamRecord with session time zone (UTC+8), then we will get 44 in DataStream program, but it is wrong because the expected value should be (8 * 60 * 60 + 44). The corner case tell us that the ROWTIME/PROCTIME in Flink are based on UTC+0, when correct the PROCTIME() function, the better way is to use TIMESTAMP WITH LOCAL TIME ZONE which keeps same long value with time based on UTC+0 and can be expressed with local timezone.
References
...