Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Hudi allows us to store multiple versions of data in the dataset overtime while providing snapshot isolation`snapshot isolation`. The number of versions are configurable and depends on how much tradeoff one prefers for space used by those older versions. Hence, as one can imagine, Hudi has the capabilities to allow to accessing the state of a table (dataset) at a certain point/instant in time. This could be done by allowing read mechanisms from a Hudi table (dataset) that go back and read the version of the data that was written a particular point/instant in time (see instant time). 

...

Hudi supports 2 types of queries - Snapshot & Incremental.

  • Snapshot queries as the latest state of a dataset. The default scan mode is set to LATEST.
  • Excerpt Include
    query scan mode snapshot
    query scan mode snapshot
    Incremental queries provides incremental changes between time (t1,t2], can control this by using a scan mode called INCREMENTAL
  • Excerpt Include
    query scan model incremental
    query scan model incremental

...

We set certain configurations for hoodie `Hudi` to understand whether this is a Snapshot based query (query scan mode snapshot) or an Incremental query (query scan model incremental. If it is an incremental query, the user can provide the (min,max) lastInstant to pull incremental changes. For RO read optimized view, we are able to figure out exactly which files changed and can limit the number of files read (and hence the amount of data) during the planning phase. For RT realtime view, since we are not able to figure out easily which log files have data from which commits/instants (commit / timeline instant), we rely on Hive to read out the entire data but the user to apply the filters in their query such as : select * from table where _hoodie_commit_time > min and _hoodie_commit_time < max.  (commit time)

Presto queries

For both RO and RT viewsboth read optimized view and realtime views, we rely on Presto to read out the entire data entire dataset but the user to apply the filters in their query such as : select * from table where _hoodie_commit_time > min and _hoodie_commit_time < max. 

...

For spark based incremental pull, for RO views read optimized views, Hudi has a special technique to avoiding the planning phase through the HoodieInputFormat and directly being able to access the data that changed. A similar implementation on RT realtime view is  is required but for RT views realtime views, we fall back on similar paths of applying query filtering as described above.

...

As you can imagine, there is already a "hacky" way to achieve time travel using the following methodology : select * from table where _hoodie_commit_time <= point_in_time. Obviously, this is not very efficient and can also lead to inconsistent results since the point_in_time might be different that the hoodie instant time. If another commit is inflight between point_in_time and the instant _ time closest  closest to it, this query will provide different results for different runs.

...

Introduce another type of snapshot query scan mode, POINT_IN_TIME_SCAN. This will enable  scanning the Hudi dataset for files created on or before the supplied point_in_time. The challenge as described above is to have a solution around inconsistent results when a user runs the same POINT_IN_TIME query multiple times.

There are 2 options here : 

1) Hudi provides a list of timestamps that can be supplied by the user as the point_in_time the user wants to query against. Hudi writes the commit/ instant times times to a timeline metadata folder and provides API's to read the timeline. At the moment there are 2 ways to read the timeline, a) HoodieActiveTimeline class can be instantiated on the client to get a list of the completed commits commits.  Hudi exposes a HoodieReadClient to users to enable reading certain contents and perform certain read-level operations on the Hudi dataset. I'm proposing we add another method getCompletedInstants() that can allow users to programmatically get a list of all the timestamps against which users can query the table to get consistent results.  b) For users of hive queries who are more SQL friendly, can perform a query such as : select _hoodie_commit_time from table sort by  _hoodie_commit_time provides an ordered list of all the commit timestimes.

The client can perform some windowing operations on this returned list of timestamps to get the closest commit time that matches the point_in_time that the user wants to query against. 

...

Hive queries

Users can pass a new query scan mode via  via the following config in JobConf → https://github.com/apache/incubator-hudi/blob/master/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java#L30

We will introduce a new config called HOODIE_END_COMMIT_PATTERN, which will represent a commit time that  that the user wants to scan the table (dataset) against. This can be thought of as asking a question -> select * from hudi_table where timeAsOf (commit_time)

...

We don't have a way to perform incremental pull right  right now. Since we are building on similar concepts to support time-travel, the only way this will work is through query as described above 

...

This RFC does not propose implementation/changes for Presto.


Spark queries

Incremental pull using  using Spark works with a custom implementation of import org.apache.spark.sql.sources.BaseRelation (called IncrementalRelation) which based on the VIEW_TYPE that the user can pass from a Spark DataSource. This is slightly different from the way users query Hive/Presto tables. The Hive/Presto tables don't really expose the RO/RT/IC views (read optimized viewrealtime viewincremental view)s to users but instead expose InputFormats that build on those views. As part of those input formats, we support different different query scan modes such as SNAPSHOT or INCREMENTAL (and now our proposed POINT_IN_TIME_SNAPSHOT). 

We might need a way to standardize the way users think about querying hudi tables using Spark/Hive/Presto. At the moment, I'm proposing another VIEW_TYPE (query/view type (question)) to be introduced in Spark POINT_IN_TIME which will work in conjunction with an already present config END_INSTANTTIME_OPT_KEY to provide the snapshot view of a table at a particular instant in time (similar to select * from table where _hoodie_commit_time <= timeAsOf (commit_time))

...

  • The number of versions to keep should match the number of commits commits the client wants to travel. Need a way to enforce this.
  • Proposed approach requires the client to perform some work and enforces some limitations
    • Can only time-travel based on the commit times times of a hudi dataset. The clients have to figure out a way to map the timestamp they want to travel against the commit time that  that matches closes to it.
    • Clients have to get a list of the valid timestamps (hudi commit timestimes) to time travel against 

...