Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

SinkTaskContext

Div
classhome-banner

Table of Contents

...

  1. Initially, we will only support Bulk insert, aka append-only. This means that incoming records (immutable data) from Kafka will be appended to log files in Hudi format.
  2. We will guarantee Exactly-once delivery, no missing records, and no de-dup required.
  3. The sink will be built in a way that the transactions are centrally coordinated, making it easier to provide other file management optimizations in hudi, such as clustering and compaction. This ensures that the target hudi tables query performance is optimal, despite streaming/ incremental ingestion.

...

KafkaCommunicationAgent is a singleton class with an instance per Connect Worker shared across all tasks belonging to the same worker process. KafkaCommunicationAgent implements the Kafka Consumer subscribed to the Kafka control topic in a dedicated thread. Each instance uses a unique group-id based on the worker id to ensure that each instance belongs to a different Kafka group and hence all workers receive all the messages in the topic. On initialization, the KafkaCommunicationAgent subscribes to the topic at the latest offset, so it only receives messages posted after its instantiation. While we can improve the efficiency of the system during failures by processing the state messages in the past, we chose this design choice to simplify the system over efficiency.

TransactionParticipant class implements the processing of records per partition. Since in this case, Task0 is responsible for partitions 0 and 2, the HudiSinkTask will instantiate two instances of TransactionParticipant that will each handle a single partition. Each participant maintains a buffer for incoming Kafka records and an event queue that is used by the KafkaCommunicationAgent to asynchronously submit control messages received from the Kafka control topic. The state machine manager represents the state of each participant based on events received from the coordinator via the control topic. Finally, the HudiJavaWriter is the java based implementation of the Hudi Writer that will read records from the incoming buffer and append the records to the Hudi File groupFilegroup.


Connect Sink Task

The HudiSinkTask pseudo code with key interface implementations is shown below. On initialization or when a new partition is assigned to the task (OPEN API), a new instance of the TransactionParticipant is assigned to the partition. If the task manages partition 0, then an instance of TransactionCoordinator is also created, which runs continuously in a separate thread.

...

The coordinator state machine as is implemented by as TransactionCoordinator is shown in the Figure below. On the initialization of the system, after a pre-configured delay, the coordinator sends a control message START-COMMIT. In the case the worker where the coordinator is running fails, and the Connect framework re-assigns partition 0 to another Connect worker, the new coordinator does not have any state about the previous coordinator. The only state that the new coordinator has is the Kafka offsets that were committed for each partition, which it reads from the latest Hudi Commit file. The new coordinator sends a START-COMMIT message to start a new transaction. If a transaction initiated by the previous coordinator is ongoing, all participants will discontinue that transaction in favor of the new transaction.

After a pre-configured, fixed transaction interval, the coordinator will broadcast an END-COMMIT and wait for receiving the WRITE-STATUS from all participants. The WRITE-STATUS messages will contain information about the previous transaction, including the Hudi files were , where records were appended, the commit time, the last written Kafka offset, etc. At this stage, the coordinator also triggers a timeout event that can be configured by the user. Upon expiration of this timeout, if the WRITE-STATUS is not received from all participants, the coordinator will discard the current transaction and send a START-COMMIT to trigger the start of a new transaction. This will take care of the failures of one or more tasks during the transaction. Although , the Connect framework will re-assign those partitions to other existing tasks, we avoid newly assigned participants to write records for an ongoing transaction.

If it does received receive the WRITE-STATUS from all participants, the coordinator will write the Hudi commit file for this transaction and send a ACK-COMMIT to the participants for them to commit the Kafka offsets. Henceforth, the coordinator will trigger a START-COMMIT to trigger a new transaction.

...

When initialized, TransactionParticipant pauses the Kafka messages using [SinkTaskContext](<https://kafka.apache. org/26/javadoc/org/apache/kafka/connect/sink/SinkTaskContext.html>). This signals to the Connect platform to avoid consuming records unless we receive a START-COMMIT from the coordinator. On every PUT call in the sink task, the processRecords method is called. Firstly, it processes the latest events from the Kafka control topic from the coordinator , and adjusts the state accordingly. When a START-COMMIT is received, the current transaction is discarded if there was one ongoing. This may happen when the worker running the coordinator thread fails, resulting in Connect framework re-assigning the partition 0 to another worker. The records written in the previous transaction will still exist, but since they are not committed by Hudi, they will be cleaned up later. In addition, the local Kafka Commit offset is reset to the one sent in the START-COMMIT message from the coordinator. In most cases, the local offset should match the one with the coordinator, but in case of task failures, the local offset may need to be updated. In that case, we resort to the Kafka offset from the coordinator as the source of truth. This avoids duplication of records since the Kafka offset was committed in Hudi and is in sync with the data records written and committed in Hudi. A new transaction is started, and the Kafka message consumption is resumed using the resume API provided by the Connect framework in SinkTaskContext.

If an END-COMMIT is received that is meant for the current ongoing transaction, the Kafka message consumption is paused, and the Write Status of the transaction is sent to the coordinator. Following that, an ACK-COMMIT event is received that confirms the coordinator has committed the transaction and the kafka Kafka commit offset is reset to represent the latest record that was written and committed by Hudi. In the TransactionParticipant, we do not have timeouts to handle failures, instead, we resort to receiving a START-COMMIT from the coordinator to reset the state of all participants.

Once the state machine is updated, the writeRecords method flushes all the records from the buffer and writes it them to Hudi using the HudiJavaClient A local variable storing the last written Kafka records is maintained as records are written. At the start of the transaction, this local variable is initialized to the Kafka offset committed to ensure ensuring we do not miss any records.

...

We have validated the working of the protocol by building a PoC. In the current PoC, we have not integrated with the Hudi Write Client, but we have implemented the transaction protocol within the Connect platform. We have implemented a Simple File Writer that mimics the Hudi writer, and have validated that no duplicate or missing records were found. We also tested for cases of worker failures which that caused either the Coordinator instance to fail and restart or caused one or more Participant instances to get re-assigned to another worker. 

...