Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Div
classhome-banner

 HIP-1 : CSV Source Support for Delta Streamer

Table of Contents
maxLevel4
minLevel3

Proposer

Approver

Status

Current state:"Under Discussion" 

Status
colourYellow
titleIn Progress
 

Discussion thread: here

JIRA: here

Released: N/A

Prior doc link : https://docs.google.com/document/d/1bj-xpkRomVtbzvLb_4BRngDIGkkMR5yzxXRRzkA7QVo/edit#heading=h.di66rda5xhp2

Abstract

Hudi delta Streamer does not have direct support for pulling data in csv CSV format from kafakafka/HDFS-logs. The only possible alternative  to ingesting Csv to ingesting CSV data to hudi Hudi dataset is to first convert them into json/avro before pulling in through delta-streamer. This HIP RFC proposes a mechanism to directly support sources in csv in CSV format.  

Background

Introduce any much background context which is relevant or necessary to understand the feature and design choices.

Implementation

Extend the DeltaStreamer by implementing a CSV Source(kafka/hdfs)

Spark supports loading a CSV file into a dataset of rows (example).  Hudi has a RowSource that handles datasources with Rows.  The idea is to transform CSV to Row format using Spark's functionality by passing in counterpart CSV options and use existing logic to go from Rows.


Currently, the conversion from Row to Avro in Hudi is not efficient.  However, the improvement on this is on the way, so wrapping Spark's CSV reader makes the implementation easy and extensible compared to parsing CSV and constructing the record from scratch.

Implementation

Implementing a CSVSource by extending RowSource (similar to JDBCSource in this PR)

  • Use `spark.read.format("csv")` as the reader of CSV files, using Spark's internal CSV parsing logic to convert it to Dataset of Rows
  • Define new Hudi configurations related to the CSV source to be on par with Spark CSV options, and pass these configs to the reader through `.option()`
  • The smallest unit of incremental pull will be one CSV file.  Assume the CSV files are named after timestamps which are monotonically increasing, the filename of the last ingested CSV can be taken as the last checkpoint.  A directory that holds all CSV files to be ingested will be given in the config
  • We can use existing  FilebasedSchemaProvider class for decoding csv data.
  • If the Csv data does not  contains header then, for field names in source  schema we can use _c0,_c1,_c2..etc as the field names according to the record position in the csv data. (For kakfa data & non header csv files we can use this method).
  • If header is present in the csv data then  need to use header information for the field names.
  • Need to introduce a configuration property to inform whether header is present  or not.
  • Need to introduce a new configuration property to support any configurable delimiters.

Rollout/Adoption Plan

  • There won’t be any impact for existing users. This is just a new feature.

  • New configurations will be added to the documentation.

Test Plan

...

  • Add a new unit test class for the CSVSource
  • Run integration tests around CSVSource
  • Add one CSVSource example in the demo