Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Proposers

Approvers

Status

Current state"Under Discussion"

Status
colourBlue
titleINACTIVE

JIRA:   HUDI-86

Released: TBD

Abstract

Hudi allows us to store multiple versions of data in the dataset def~table overtime while providing snapshot isolation`snapshot isolation`. The number of versions are configurable and depends on how much tradeoff one prefers for space used by those older versions. Hence, as one can imagine, Hudi has the capabilities to allow to accessing the state of a table (def~table) at a certain point/instant in time. This could be done by allowing read mechanisms from a Hudi table (def~table) that go back and read the version of the data that was written a particular point/instant in time (see instant time)

Background

Hudi provides 3 different views different def~query-type to access data written in 2 different table types - COW and Merge On Read different def~table-type - def~copy-on-write (COW) and def~merge-on-read (MOR).

  1. RO view allows def~read-optimized-query allows to reading data written in columnar format with columnar read performance
  2. RT view is def~snapshot-query is a combination of avro and parquet and optimizes for latency vs read performance (right now applicable only for MOR)
  3. Incremental view allows def~incremental-query allows one to extract incremental change logs from a Hudi table. The incremental view really builds on top of the other 2 views.

Hudi supports 2 types of queries - Snapshot & Incremental.

  • Excerpt Include
    def~query scan mode snapshot
    def~query scan mode snapshot
  • Excerpt Include
    def~query scan mode incremental
    def~query scan mode incremental

All of these view are exposed by intercepting the planning stage through the HoodieInputFormat. Before we dive further into incremental changes, I want to bring to everyone's attention what incremental changes from Hudi really mean. For eg, if you have a log table (say log events from Kafka), incremental_pull (t1, t2] would provide you all the records that were ingested between those instants.

If you have a hudi def~table that you are ingesting database changes logs (updates), then incremental_pull (t1, t2] would provide you all the records that were either inserted or updated between those time instants. These records are a full representation of the updated record, meaning, updated_record = (unchanged_columns + changed_columns). 

Hive queries

We set certain configurations for `Hudi` to understand whether this is a Snapshot based query (def~query scan mode snapshot) or an Incremental query (def~query scan mode incremental. If it is an incremental query, the user can provide the (min,max) lastInstant to pull incremental changes. For def~read-optimized-query, we are able to figure out exactly which files changed and can limit the number of files read (and hence the amount of data) during the planning phase. For def~snapshot-query, since we are not able to figure out easily which log files have data from which commits/instants (def~commit / def~timeline), we rely on Hive to read out the entire data but the user to apply the filters in their query such as : select * from table where _hoodie_commit_time > min and _hoodie_commit_time < max. (def~commit-time)

Presto queries

For both def~read-optimized-query and def~snapshot-querys, we rely on Presto to read out the entire def~table but the user to apply the filters in their query such as : select * from table where _hoodie_commit_time > min and _hoodie_commit_time < max. 

Spark queries

For spark based incremental pull, for def~read-optimized-querys, Hudi has a special technique to avoiding the planning phase through the HoodieInputFormat and directly being able to access the data that changed. A similar implementation on def~snapshot-query is required but for def~snapshot-querys, we fall back on similar paths of applying query filtering as described above.

-----------------

As you can imagine, there is already a "hacky" way to achieve time travel using the following methodology : select * from table where _hoodie_commit_time <= point_in_time. Obviously, this is not very efficient and can also lead to inconsistent results since the point_in_time might be different that the hoodie def~instant-time. If another commit is inflight between point_in_time and the def~instant-time closest to it, this query will provide different results for different runs.

Note that there are ways to improve the queries by predicate pushdowns of _hoodie_commit_time fields which is not in the scope of discussions for this RFC.

We would like to build on these existing capabilities of Hudi to be able to provide point in time queries - access the state and contents of the table at a particular instant in time in the past.

Implementation

Introduce another type of snapshot def~query-scan-mode, POINT_IN_TIME. This will enable  scanning the Hudi def~table for files created on or before the supplied point_in_time. The challenge as described above is to have a solution around inconsistent results when a user runs the same POINT_IN_TIME query multiple times.

There are 2 options here : 

1) Hudi provides a list of timestamps that can be supplied by the user as the point_in_time the user wants to query against. Hudi writes the commit/ def~instant-times to a timeline metadata folder and provides API's to read the timeline. At the moment there are 2 ways to read the timeline, a) HoodieActiveTimeline class can be instantiated on the client to get a list of the completed def~commits.  Hudi exposes a HoodieReadClient to users to enable reading certain contents and perform certain read-level operations on the Hudi def~table. I'm proposing we add another method getCompletedInstants() that can allow users to programmatically get a list of all the timestamps against which users can query the table to get consistent results.  b) For users of hive queries who are more SQL friendly, can perform a query such as : select _hoodie_commit_time from table sort by  _hoodie_commit_time provides an ordered list of all the def~commit-times.

The client can perform some windowing operations on this returned list of timestamps to get the closest commit time that matches the point_in_time that the user wants to query against. 


Hive queries

Users can pass a new def~query-scan-mode via the following config in JobConf → https://github.com/apache/incubator-hudi/blob/master/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java#L30

We will introduce a new config called HOODIE_END_COMMIT_PATTERN, which will represent a def~commit-time that the user wants to scan the table (def~table) against. This can be thought of as asking a question -> select * from hudi_table where timeAsOf (commit_time)


Presto queries

We don't have a way to perform incremental pull right now. Since we are building on similar concepts to support time-travel, the only way this will work is through query as described above 

select * from table where _hoodie_commit_time <= timeAsOf (commit_time)

This RFC does not propose implementation/changes for Presto.


Spark queries

Incremental pull using Spark works with a custom implementation of import org.apache.spark.sql.sources.BaseRelation (called IncrementalRelation) which based on the VIEW_TYPE that the user can pass from a Spark DataSource. This is slightly different from the way users query Hive/Presto tables. The Hive/Presto tables don't really expose the (def~read-optimized-querydef~snapshot-querydef~incremental-query)s to users but instead expose InputFormats that build on those views. As part of those input formats, we support different def~query-scan-modes such as SNAPSHOT or INCREMENTAL (and now our proposed POINT_IN_TIME_SNAPSHOT). 

We might need a way to standardize the way users think about querying hudi tables using Spark/Hive/Presto. At the moment, I'm proposing another VIEW_TYPE (def~query-type (question)) to be introduced in Spark POINT_IN_TIME which will work in conjunction with an already present config END_INSTANTTIME_OPT_KEY to provide the snapshot view of a table at a particular instant in time (similar to select * from table where _hoodie_commit_time <= timeAsOf (commit_time))

Caveats/Open Items

  • The number of versions to keep should match the number of def~commits the client wants to travel. Need a way to enforce this.
  • Proposed approach requires the client to perform some work and enforces some limitations
    • Can only time-travel based on the def~commit-times of a hudi def~table. The clients have to figure out a way to map the timestamp they want to travel against the def~commit-time that matches closes to it.
    • Clients have to get a list of the valid timestamps (hudi def~commit-times) to time travel against 

Rollout/Adoption Plan

  • This change introduced new configs to enable time travel for hudi tables but at the same time does not change the existing features.

Test Plan

  • Unit tests including ones that cover different cases of time-travel to ensure consistent results.
  • Units tests including ones that cover what happens if the versions are not present
  • Testing with hudi test suite, by running a test workflow for few days, with cleaner etc running and queries.



Proposers

Approvers

Status

Current state"Under Discussion"

JIRA:   Image AddedHUDI-86

Released: TBD

Abstract

Hudi allows us to store multiple versions of data in the def~table overtime while providing `snapshot isolation`. The number of versions are configurable and depends on how much tradeoff one prefers for space used by those older versions. Hence, as one can imagine, Hudi has the capabilities to allow to accessing the state of a table (def~table) at a certain point/instant in time. This could be done by allowing read mechanisms from a Hudi table (def~table) that go back and read the version of the data that was written a particular point/instant in time (see instant time). 

Background

Hudi provides 3 different def~query-type to access data written in 2 different def~table-type - def~copy-on-write (COW) and def~merge-on-read (MOR).

  1. def~read-optimized-query allows to reading data written in columnar format with columnar read performance
  2. def~snapshot-query is a combination of avro and parquet and optimizes for latency vs read performance (right now applicable only for MOR)
  3. def~incremental-query allows one to extract incremental change logs from a Hudi table. The incremental view really builds on top of the other 2 views.

Hudi supports 2 types of queries - Snapshot & Incremental.

  • Excerpt Include
    def~query scan mode snapshot
    def~query scan mode snapshot
  • Excerpt Include
    def~query scan mode incremental
    def~query scan mode incremental
    the latest state of a dataset. The default scan mode is set to LATEST.Incremental queries provides incremental changes between time (t1,t2], can control this by using a scan mode called INCREMENTAL

All of these view are exposed by intercepting the planning stage through the HoodieInputFormat. Before we dive further into incremental changes, I want to bring to everyone's attention what incremental changes from Hudi really mean. For eg, if you have a log table (say log events from Kafka), incremental_pull (t1, t2] would provide you all the records that were ingested between those instants.

If you have a hudi dataset def~table that you are ingesting database changes logs (updates), then incremental_pull (t1, t2] would provide you all the records that were either inserted or updated between those time instants. These records are a full representation of the updated record, meaning, updated_record = (unchanged_columns + changed_columns). 

...

We set certain configurations for hoodie `Hudi` to understand whether this is a Snapshot based query query (def~query scan mode snapshot) or an Incremental query (def~query scan mode incremental. If it is an incremental query, the user can provide the (min,max) lastInstant to pull incremental changes. For RO view def~read-optimized-query, we are able to figure out exactly which files changed and can limit the number of files read (and hence the amount of data) during the planning phase. For RT view def~snapshot-query, since we are not able to figure out easily which log files have data from which commits/instants (def~commit / def~timeline), we rely on Hive to read out the entire data but the user to apply the filters in their query such as : select * from table where _hoodie_commit_time > min and _hoodie_commit_time < max.  (def~commit-time)

Presto queries

For both RO and RT viewsboth def~read-optimized-query and def~snapshot-querys, we rely on Presto to read out the entire data entire def~table but the user to apply the filters in their query such as : select * from table where _hoodie_commit_time > min and _hoodie_commit_time < max. 

...

For spark based incremental pull, for RO views def~read-optimized-querys, Hudi has a special technique to avoiding the planning phase through the HoodieInputFormat and directly being able to access the data that changed. A similar implementation on RT view is def~snapshot-query is required but for RT views def~snapshot-querys, we fall back on similar paths of applying query filtering as described above.

...

As you can imagine, there is already a "hacky" way to achieve time travel using the following methodology : select * from table where _hoodie_commit_time <= point_in_time. Obviously, this is not very efficient and can also lead to inconsistent results since the point_in_time might be different that the hoodie instant def~instant-time. If another commit is inflight between point_in_time and the instant_def~instant-time closest  closest to it, this query will provide different results for different runs.

...

We would like to build on these existing capabilities of Hudi to be able to provide point in time queries - access the state and contents of the table at a particular instant in time in the past.

Implementation

Introduce another type of snapshot def~query-scan-mode, POINT_IN_TIME_SCAN. This will enable  scanning the Hudi dataset def~table for files created on or before the supplied point_in_time. The challenge as described above is to have a solution around inconsistent results when a user runs the same POINT_IN_TIME query multiple times.

There are 2 options here : 

1) Hudi provides a list of timestamps that can be supplied by the user as the point_in_time the user wants to query against. Hudi writes the commit/ instant times def~instant-times to a timeline metadata folder and provides API's to read the timeline. At the moment there are 2 ways to read the timeline, a) HoodieActiveTimeline class can be instantiated on the client to get a list of the completed commits def~commits.  Hudi exposes a HoodieReadClient to users to enable reading certain contents and perform certain read-level operations on the Hudi dataset def~table. I'm proposing we add another method getCompletedInstants() that can allow users to programmatically get a list of all the timestamps against which users can query the table to get consistent results.  b) For users of hive queries who are more SQL friendly, can perform a query such as : select _hoodie_commit_time from table sort by  _hoodie_commit_time provides an ordered list of all the commit times def~commit-times.

The client can perform some windowing operations on this returned list of timestamps to get the closest commit time that matches the point_in_time that the user wants to query against. 

...

Hive queries

Users can pass a new def~query-scan-mode via  via the following config in JobConf → https://github.com/apache/incubator-hudi/blob/master/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java#L30

We will introduce a new config called HOODIE_END_COMMIT_PATTERN, which will represent a commit def~commit-time that  that the user wants to scan the table (def~table) against. This can be thought of as asking a question -> select * from hudi_table where timeAsOf (commit_time)

...

We don't have a way to perform incremental pull right  right now. Since we are building on similar concepts to support time-travel, the only way this will work is through query as described above 

...

This RFC does not propose implementation/changes for Presto.


Spark queries

Incremental pull using  using Spark works with a custom implementation of import org.apache.spark.sql.sources.BaseRelation (called IncrementalRelation) which based on the VIEW_TYPE that the user can pass from a Spark DataSource. This is slightly different from the way users query Hive/Presto tables. The Hive/Presto tables don't really expose the RO/RT/IC views (def~read-optimized-querydef~snapshot-querydef~incremental-query)s to users but instead expose InputFormats that build on those views. As part of those input formats, we support different different def~query-scan-modes such as SNAPSHOT or INCREMENTAL (and now our proposed POINT_IN_TIME_SNAPSHOT). 

We might need a way to standardize the way users think about querying hudi tables using Spark/Hive/Presto. At the moment, I'm proposing another VIEW_TYPE (def~query-type (question)) to be introduced in Spark POINT_IN_TIME which will work in conjunction with an already present config END_INSTANTTIME_OPT_KEY to provide the snapshot view of a table at a particular instant in time (similar to select * from table where _hoodie_commit_time <= timeAsOf (commit_time))

Caveats/Open Items

  • The number of versions to keep should match the number of commits def~commits the client wants to travel. Need a way to enforce this.
  • Proposed approach requires the client to perform some work and enforces some limitations
    • Can only time-travel based on the commit times def~commit-times of a hudi dataset def~table. The clients have to figure out a way to map the timestamp they want to travel against the commit def~commit-time that  that matches closes to it.
    • Clients have to get a list of the valid timestamps (hudi commit times def~commit-times) to time travel against 

Rollout/Adoption Plan

  • This change introduced new configs to enable time travel for hudi tables but at the same time does not change the existing features.

Test Plan

  • Unit tests including ones that cover different cases of time-travel to ensure consistent results.
  • Units tests including ones that cover what happens if the versions are not present
  • Testing with hudi test suite, by running a test workflow for few days, with cleaner etc running and queries.

...