Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

To avoid additional dependencies, we simply use a dedicated control topic (with 1 partition) to be used for all communication traffic between the coordinator and the participants. To start a write transaction, the coordinator broadcasts a START-COMMIT message with the new commit time. On receiving the START-COMMIT message, each participant starts consuming the Kafka records for the corresponding partition from the connect framework and appends the records to the Hudi file group that is specific to the partition. To ensure that file groups are unique to a partition, we can encode the file group id using the partition id. On the end of a transaction interval, the coordinator sends an END-COMMIT message. Once the participants receive an END-COMMIT, they flush all the existing records to the file, and send back a WRITE-STATUS message with the required details of the write that is required to commit the logs in Hudi. It also includes the offset of the last written Kafka record in the message. Once the coordinator receives all the WRITE-STATUS messages, one per partition, it writes the commit files, including the last written Kafka offsets per partition and sends back a ACK-COMMIT message to acknowledge the commit. Once a participant receives the ACK-COMMIT message, it commits the last written Kafka offset to the Connect platform and waits until the next START-COMMIT message before starting to write records to Hudi files again.

...


Implementation

To explain the design and implementation in detail, we depict the components of the system in the figure below. We will implement a class called HudiSinkTask that implements the SinkTask interface defined in Kafka connect. For simplicity, we only show the components of a single Sink task (HudiSinkTask-0) that handles partitions 0 and 2. Since the task handles partition 0, we will instantiate a Coordinator daemon that will be implemented by TransactionCoordinator.

...

Kafka connect calls the PRECOMMIT periodically (the interval can be configured by user) to commit the last Kafka offset. The offset is returned by TransactionParticipant , which is updated based on the written records that were committed. Since Kafka offsets are committed as Hudi files are committed, we suggest setting the interval for PRECOMMIT similar to the transaction intervals.

```

class HudiSinkTask implements SinkTask void OPEN(partitions) { for (partition: partitions) { if (partition == 0) { transactionCoordinator.start() } transactionParticipant.get(partition).start() } } void PUT(records) { for (record : records) { transactionParticipant.get(record.partition).buffer(record) } for (partition: assigned_partitions) { transactionParticipant.get(record.partition).processRecords } } void PRECOMMIT() { commitOffsets = [] for (partition: assigned_partitions) { commitOffsets.put(partition, transactionParticipant.get(record.partition).getLatestKafkaOffset()) } return commitOffset }

```Image Added

Implementation

Rollout/Adoption Plan

  • <What impact (if any) will there be on existing users?>
  • <If we are changing behavior how will we phase out the older behavior?>
  • <If we need special migration tools, describe them here.>
  • <When will we remove the existing behavior?>

...