You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 5 Next »

Proposers

Approvers

Status

Current state"Under Discussion"

JIRA:   HUDI-86

Released: TBD

Abstract

Hudi allows us to store multiple versions of data in the table overtime while providing snapshot isolation. The number of versions are configurable and depends on how much tradeoff one prefers for space used by those older versions. Hence, as one can imagine, Hudi has the capabilities to allow to accessing the state of a table at a certain instant in time. This could be done by allowing read mechanisms from a Hudi table that go back and read the version of the data that was written a particular instant in time. 

Background

Hudi provides 3 different views to access data written in 2 different table types - COW and MOR. 1) RO view allows to reading data written in columnar format with columnar read performance 2) RT view is a combination of avro and parquet and optimizes for latency vs read performance (right now applicable only for MOR) 3) Incremental view allows one to extract incremental change logs from a Hudi table. The incremental view really builds on top of the other 2 views. Hudi supports 2 types of queries - Snapshot & Incremental.

  • Snapshot queries as the latest state of a dataset/table. The default scan mode is set to LATEST.
  • Incremental queries provides incremental changes between time (t1,t2], can control this by using a scan mode called INCREMENTAL

All of these view are exposed by intercepting the planning stage through the HoodieInputFormat. Before we dive further into incremental changes, I want to bring to everyone's attention what incremental changes from Hudi really mean. For eg, if you have a log table (say log events from Kafka), incremental_pull (t1, t2] would provide you all the records that were ingested between those instants.

If you have a hudi table/dataset that you are ingesting database changes logs (updates), then incremental_pull (t1, t2] would provide you all the records that were either inserted or updated between those time instants. These records are a full representation of the updated record, meaning, updated_record = (unchanged_columns + changed_columns). 

Hive queries

We set certain configurations for hoodie to understand whether this is a Snapshot based query or an Incremental query. If it is an incremental query, the user can provide the (min,max) lastInstant to pull incremental changes. For RO view, we are able to figure out exactly which files changed and can limit the number of files read (and hence the amount of data) during the planning phase. For RT view, since we are not able to figure out easily which log files have data from which commits/instants, we rely on Hive to read out the entire data but the user to apply the filters in their query such as : select * from table where _hoodie_commit_time > min and _hoodie_commit_time < max. 

Presto queries

For both RO and RT views, we rely on Presto to read out the entire data but the user to apply the filters in their query such as : select * from table where _hoodie_commit_time > min and _hoodie_commit_time < max. 

Spark queries

For spark based incremental pull, for RO views, Hudi has a special technique to avoiding the planning phase through the HoodieInputFormat and directly being able to access the data that changed. A similar implementation on RT view is required but for RT views, we fall back on similar paths of applying query filtering as described above.

-----------------

As you can imagine, there is already a "hacky" way to achieve time travel using the following methodology : select * from table where _hoodie_commit_time <= point_in_time. Obviously, this is not very efficient and can also lead to inconsistent results since the point_in_time might be different that the hoodie instant time. If another commit is inflight between point_in_time and the instant_time closest to it, this query will provide different results for different runs.

Note that there are ways to improve the queries by predicate pushdowns of _hoodie_commit_time fields which is not in the scope of discussions for this RFC.

We would like to build on these existing capabilities of Hudi to be able to provide point in time queries - access the state and contents of the table at a particular instant in time in the past.

Implementation

Introduce another type of snapshot scan mode, POINT_IN_TIME_SCAN. This will enable  scanning the Hudi dataset for files created on or before the supplied point_in_time. The challenge as described above is to have a solution around inconsistent results when a user runs the same POINT_IN_TIME query multiple times. There are 2 options here : 

1) Hudi provides a list of timestamps that can be supplied by the user as the point_in_time the user wants to query against. Hudi writes the commit/instant times to a timeline metadata folder and provides API's to read the timeline. At the moment there are 2 ways to read the timeline, a) HoodieActiveTimeline class can be instantiated on the client to get a list of the completed commits.  Hudi exposes a HoodieReadClient to users to enable reading certain contents and perform certain read-level operations on the Hudi dataset. I'm proposing we add another method getCompletedInstants() that can allow users to programmatically get a list of all the timestamps against which users can query the table to get consistent results.  b) For users of hive queries who are more SQL friendly, can perform a query such as : select _hoodie_commit_time from table sort by  _hoodie_commit_time provides an ordered list of all the commit times.

The client can perform some windowing operations on this returned list of timestamps to get the closest commit time that matches the point_in_time that the user wants to query against. 


Hive queries

Users can pass a new scan mode via the following config in JobConf → https://github.com/apache/incubator-hudi/blob/master/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java#L30

We will introduce a new config called HOODIE_END_COMMIT_PATTERN, which will represent a commit time that the user wants to scan the table against. This can be thought of as asking a question -> select * from hudi_table where timeAsOf (commit_time)


Presto queries

We don't have a way to perform incremental pull right now. Since we are building on similar concepts to support time-travel, the only way this will work is through query as described above 

select * from table where _hoodie_commit_time <= timeAsOf (commit_time)

This RFC does not propose implementation/changes for Presto.


Spark queries

Incremental pull using Spark works with a custom implementation of import org.apache.spark.sql.sources.BaseRelation (called IncrementalRelation) which based on the VIEW_TYPE that the user can pass from a Spark DataSource. This is slightly different from the way users query Hive/Presto tables. The Hive/Presto tables don't really expose the RO/RT/IC views to users but instead expose InputFormats that build on those views. As part of those input formats, we support different scan modes such as SNAPSHOT or INCREMENTAL (and now our proposed POINT_IN_TIME_SNAPSHOT). 

We might need a way to standardize the way users think about querying hudi tables using Spark/Hive/Presto. At the moment, I'm proposing another VIEW_TYPE to be introduced in Spark POINT_IN_TIME which will work in conjunction with an already present config END_INSTANTTIME_OPT_KEY to provide the snapshot view of a table at a particular instant in time (similar to select * from table where _hoodie_commit_time <= timeAsOf (commit_time))

Caveats

  • The number of versions to keep should match the number of commits the client wants to travel. Need a way to enforce this.
  • Proposed approach pushed the client to perform some work and enforces some limitations
    • Can only time-travel based on the commit times of a hudi dataset. The clients have to figure out a way to map the timestamp they want to travel against the commit time that matches closes to it.
    • Clients have to get a list of the valid timestamps (hudi commit times) to time travel against 

Rollout/Adoption Plan

  • This change introduced new configs to enable time travel for hudi tables but at the same time does not change the existing features.

Test Plan

  • Unit tests including ones that cover different cases of time-travel to ensure consistent results.
  • Units tests including ones that cover what happens if the versions are not present
  • Testing with hudi test suite, by running a test workflow for few days, with cleaner etc running and queries.






  • No labels