Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion thread

https://lists.apache.org/thread/fyv16p40z9go0dhosc4cr2ywqclyqqq5

Vote threadhttps://lists.apache.org/thread/7hls4813xmq01wbmo90jtfb5chr3mpr2
JIRA

Unable to render Jira issues macro, execution error.

Release1.19

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, Flink Avro Format doesn't support the Avro timestamp (millis/micros) with local timezone type. Although the Avro timestamp (millis/micros) type is support and is mapped to Flink timestamp without timezone. It is not compliant to semantics defined in Consistent timestamp types in Hadoop SQL engines. Below is the definition timestamp types  in Avro and Flink.


Avro Timestamp (millisecond precision):

The timestamp-millis logical type represents an instant on the global timeline, independent of a particular time zone or calendar, with a precision of one millisecond. Please note that time zone information gets lost in this process. Upon reading a value back, we can only reconstruct the instant, but not the original representation. In practice, such timestamps are typically displayed to users in their local time zones, therefore they may be displayed differently depending on the execution environment. A timestamp-millis logical type annotates an Avro long, where the long stores the number of milliseconds from the unix epoch, 1 January 1970 00:00:00.000 UTC.


Avro LocalTimestamp (millisecond precision): 

The local-timestamp-millis logical type represents a timestamp in a local timezone, regardless of what specific time zone is considered local, with a precision of one millisecond. A local-timestamp-millis logical type annotates an Avro long, where the long stores the number of milliseconds, from 1 January 1970 00:00:00.000.


Flink TimestampType Logic Type: 

Logical type of a timestamp WITHOUT time zone consisting of {@code year-month-day hour:minute:second[.fractional]} with up to nanosecond precision and values ranging from {@code 0000-01-01 00:00:00.000000000} to {@code 9999-12-31 23:59:59.999999999}. Compared to the SQL standard, leap seconds (23:59:60 and 23:59:61) are not supported as the semantics are closer to  {@link java.time.LocalDateTime}.


Flink LocalZonedTimestampType Logic Type: 

Logical type of a timestamp WITH LOCAL time zone consisting of {@code year-month-day hour:minute:second[.fractional] zone} with up to nanosecond precision and values ranging from {@code 0000-01-01 00:00:00.000000000 +14:59} to {@code 9999-12-31 23:59:59.999999999 -14:59}.  Leap seconds (23:59:60 and 23:59:61) are not supported as the semantics are closer to {@link  java.time.OffsetDateTime}. <p>This type fills the gap between time zone free and time zone mandatory timestamp types by allowing the interpretation of UTC timestamps according to the configured session time zone. A conversion from and to {@code int} describes the number of seconds since epoch. A conversion from and to {@code long} describes the number of milliseconds since epoch.


Given the definitions above, Both of Avro Timestamp and Flink LocalZonedTimestampType describe the number of milliseconds since epoch. It is Instant semantics. On the contrary, both Avro Localtimestamp and Flink Timestamp describe the Java LocalDataTime.

Goals

Support Avro timestamps with the compliance to the mapping semantics defined in Consistent timestamp types in Hadoop SQL engines

Public Interfaces

The user specifies option for Avro timestamp handling through a new option in AvroFormatOptions. If default value is used, Avro timestamp with local timezone is not supported to keep the legacy semantics. 

Option

Type

Default value

avro.timestamp-mapping.legacy

Boolean

true

Proposed Changes

Both of Avro Timestamp and Flink LocalZonedTimestampType describe the number of milliseconds since epoch. It is Instant semantics. On the contrary, both Avro Localtimestamp and Flink Timestamp describe the Java LocalDataTime.

Thus, the propose want to support new timestamp type mappings as below

  1. Avro Timestamp <-> Flink TIMESTAMP_WITH_LOCAL_TIME_ZONE
  2. additionally support Avro LocalTimestamp <-> Flink TIMESTAMP_WITHOUT_TIME_ZONE

Compatibility, Deprecation, and Migration Plan

The Option will be added in optionalOptions of AvroFileFormatFactory and AvroFormatFactory, so that users may use it in defining a Kafka/FileSystem table with Avro format. In order to keep compatibility, the default value is true for the flag. Users can explicitly turn on the new behavior by setting it to false. The temporary config option will be marked as deprecated, so that it can be removed after 2 minor releases (1.19, 1.20), i.e, drop in the 2.0 release.

Test Plan

The changes will be covered by UTs.

Rejected Alternatives

Opt in the change directly. It is a breaking change for users. Thus, directly opt in is rejected.

  • No labels