Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

To appreciate the design proposed in this RFC, it is important to understand Kafka Connect. It is a framework to stream data into and out of Apache Kafka. The core components of the Connect framework that are relevant to this RFC are connectors, tasks, and workers. The connector instance is a logical job that manages the copying of data from Kafka to another system. A connector instance manages a set of tasks that actually copy the data. Using multiple tasks allows for parallelism and scalable data copying. Connectors and tasks are logical execution units that are scheduled within workers. In distributed mode, workers are run across a cluster to provide scalability and fault tolerance. All workers can be configured with the same [group.id](<http://group.id>) and the connect framework automatically manages the execution of tasks across all available workers. As shown in the figure below, tasks are distributed across workers, and each task manages one or more distinct partitions of the Kafka topic.Image Removed

Image Added

On system initialization, the workers rebalance the set of tasks so that each worker has a similar amount of work. Dynamically, the system may rebalance when the number of partitions or tasks changes. In addition, on the failure of a worker, the tasks are re-assigned to the other workers to ensure fault tolerance as shown in the figure below.



Image Added


Goals

We propose to build a Kafka Connect Sink for Hudi with the following assumptions:

  1. Initially, we will only support Bulk insert, aka append-only. This means that incoming records (immutable data) from Kafka will be appended to log files in Hudi format.
  2. We will guarantee Exactly-once delivery, no missing records and no de-dup required.
  3. The sink will be built in a way that the transactions are centrally coordinated, making it easier to provide other file management optimizations in hudi, such as clustering and compaction. This ensures that the target hudi tables query performance is optimal, despite streaming/ incremental ingestion.


Proposed Solution

The high-level architecture is shown in the figure below. To coordinate the write transactions across the different connect tasks, we require a Coordinator, Participants that manage each partition, and a communication channel across the coordinator and all the participants. Since Kafka Connect already assigns unique partitions across tasks, we simply pick the task assigned with partition-0 to be running the coordinator instance. We instantiate one or more participant instances within the Connect tasks to manage the data writes per partition. For instance, Task2 will have 2 participants, one for partition-3 and the other for partition-4 in the figure below.


Image Added

To avoid additional dependencies, we simply use a dedicated control topic (with 1 partition) to be used for all communication traffic between the coordinator and the participants. To start a write transaction, the coordinator broadcasts a START-COMMIT message with the new commit time. On receiving the START-COMMIT message, each participant starts consuming the Kafka records for the corresponding partition from the connect framework and appends the records to the Hudi file group that is specific to the partition. To ensure that file groups are unique to a partition, we can encode the file group id using the partition id. On the end of a transaction interval, the coordinator sends an END-COMMIT message. Once the participants receive an END-COMMIT, they flush all the existing records to the file, and send back a WRITE-STATUS message with the required details of the write that is required to commit the logs in Hudi. It also includes the offset of the last written Kafka record in the message. Once the coordinator receives all the WRITE-STATUS messages, one per partition, it writes the commit files, including the last written Kafka offsets per partition and sends back a ACK-COMMIT message to acknowledge the commit. Once a participant receives the ACK-COMMIT message, it commits the last written Kafka offset to the Connect platform and waits until the next START-COMMIT message before starting to write records to Hudi files again.


Detailed Design

To explain the design and implementation in detail, we depict the components of the system in the figure below. We will implement a class called HudiSinkTask that implements the SinkTask interface defined in Kafka connect. For simplicity, we only show the components of a single Sink task (HudiSinkTask-0) that handles partitions 0 and 2. Since the task handles partition 0, we will instantiate a Coordinator daemon that will be implemented by TransactionCoordinator.

KafkaCommunicationAgent is a singleton class with an instance per Connect Worker shared across all tasks belonging to the same worker process. KafkaCommunicationAgent implements the Kafka Consumer subscribed to the Kafka control topic in a dedicated thread. Image RemovedEach instance uses a unique group-id based on the worker id to ensure that each instance belongs to a different Kafka group and hence all workers receive all the messages in the topic. On initialization, the KafkaCommunicationAgent subscribes to the topic at the latest offset, so it only receives messages posted after its instantiation. While we can improve efficiency of the system during failures by processing the state messages in the past, we chose this design choice to simplify the system over efficiency.

TransactionParticipant class implements the processing of records per partition. Since in this case, Task0 is responsible for partitions 0 and 2, the HudiSinkTask will instantiate two instances of TransactionParticipant that will each handle a single partition. Each participant maintains a buffer for incoming Kafka records and an event queue that is used by KafkaCommunicationAgent to asynchronously submit control messages received from the Kafka control topic. The state machine manager represents the state of each participant based on events received from the coordinator via the control topic. Finally, the HudiJavaWriter is the java based implementation of the Hudi Writer that will read records from the incoming buffer and append the records to the Hudi File group.


Image Added



Implementation

Rollout/Adoption Plan

...