Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

TransactionParticipant class implements the processing of records per partition. Since in this case, Task0 is responsible for partitions 0 and 2, the HudiSinkTask will instantiate two instances of TransactionParticipant that will each handle a single partition. Each participant maintains a buffer for incoming Kafka records and an event queue that is used by KafkaCommunicationAgent to asynchronously submit control messages received from the Kafka control topic. The state machine manager represents the state of each participant based on events received from the coordinator via the control topic. Finally, the HudiJavaWriter is the java based implementation of the Hudi Writer that will read records from the incoming buffer and append the records to the Hudi File group.




Connect Sink Task

The HudiSinkTask pseudo code with key interface implementations is shown below. On initialization or when a new partition is assigned to the task (OPEN API), a new instance of TransactionParticipant is assigned to the partition. If the task manages partition 0, then an instance of TransactionCoordinator is also created, which runs continuously in a separate thread.

Kafka connect calls the PUT call periodically with records read from Kafka. First, the records are added to the buffer of the corresponding TransactionParticipant based on the partition. Then we execute the processRecords method of TransactionParticipant that runs the state machine followed by writing of the records to Hudi files. The implementation of TransactionParticipant is explained in more detail in the next section.

Kafka connect calls the PRECOMMIT periodically (the interval can be configured by user) to commit the last Kafka offset. The offset is returned by TransactionParticipant , which is updated based on the written records that were committed. Since Kafka offsets are committed as Hudi files are committed, we suggest setting the interval for PRECOMMIT similar to the transaction intervals.


```

class HudiSinkTask implements SinkTask void OPEN(partitions) { for (partition: partitions) { if (partition == 0) { transactionCoordinator.start() } transactionParticipant.get(partition).start() } } void PUT(records) { for (record : records) { transactionParticipant.get(record.partition).buffer(record) } for (partition: assigned_partitions) { transactionParticipant.get(record.partition).processRecords } } void PRECOMMIT() { commitOffsets = [] for (partition: assigned_partitions) { commitOffsets.put(partition, transactionParticipant.get(record.partition).getLatestKafkaOffset()) } return commitOffset }

```


Implementation

Rollout/Adoption Plan

...