Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Hudi delta Streamer does not have direct support for pulling data in CSV format from kafka/HDFS-logs. The only possible alternative to ingesting CSV data to Hudi dataset is to first convert them into json/avro before pulling in through delta-streamer. This RFC proposes a mechanism to directly support sources in CSV format.

Background

Introduce any much background context which is relevant or necessary to understand the feature and design choices.

Implementation

Extend the DeltaStreamer by implementing a CSV Source(kafka/hdfs)

  • We can use existing  FilebasedSchemaProvider class for decoding csv data.
  • If the Csv data does not  contains header then, for field names in source  schema we can use _c0,_c1,_c2..etc as the field names according to the record position in the csv data. (For kakfa data & non header csv files we can use this method).
  • If header is present in the csv data then  need to use header information for the field names.
  • Need to introduce a configuration property to inform whether header is present  or not.
  • Need to introduce a new configuration property to support any configurable delimiters.

Spark supports loading a CSV file into a dataset of rows (example).  Hudi has a RowSource that handles datasources with Rows.  The idea is to transform CSV to Row format using Spark's functionality by passing in counterpart CSV options and use existing logic to go from Rows.

Implementation

Implementing a CSVSource by extending RowSource (similar to JDBCSource in this PR)

  • Use `spark.read.format("csv")` as the reader of CSV files, using Spark's internal CSV parsing logic to convert it to Dataset of Rows
  • Define new Hudi configurations related to the CSV source to be on par with Spark CSV options, and pass these configs to the reader through `.option()`
  • The smallest unit of incremental pull will be one CSV file.  Assume the CSV files are named after timestamps which are monotonically increasing, the filename of the last ingested CSV can be taken as the last checkpoint.

Rollout/Adoption Plan

  • There won’t be any impact for existing users. This is just a new feature.

  • New configurations will be added to the documentation.

Test Plan

...

  • Add a new unit test class for the CSVSource
  • Run integration tests around CSVSource
  • Add one CSVSource example in the demo