You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current state: Under Discussion

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The key class in a Sink Connector is the `SinkTask` implementation. The `SinkTask.put(...)` method delivers the records from the KafkaConsumer to the Connector-specific code through this entrypoint, and its current abstract class specifies:

    public abstract void put(Collection<SinkRecord> records);

Usually, the implementer will use a for-each loop to iterate through the records one by one and act on them. There is only one caller, WorkerSinkTask which always passes an ArrayList.

There are two shortcomings with this signature:

  1. The Connector implementation is not provide with any ordering guarantees of Records.
  2. The decision values giving flexibility to the caller of the method over the implementer of the method.

Ordering Guarantees

Kafka gives ordering guarantees within a Topic-Partition: Records will have strictly increasing offsets as they are consumed. The current SinkTask forfeits any guarantee when it passes the records to the Task via the put(Collection<SinkRecrod>) method. From the Task's perspective, it is getting a list of maybe-ordered records. If a task wants to put records in S3 with a file for each topic-partition, a task must sort the collection of records before putting them into S3, since it is not guaranteed any ordering. We happen to know that the only caller of SinkTask.put(...) (which happens to be WorkerSinkTask) does indeed call put(...) with an ArrayList that is sorted in the order the consumer consumes. But why is this agreement tacit and not explicit? It would harden the API to explicitly require the transfer of records to be in a datatype that maintains ordering.

Flexibility for Implementers

Generally, whenever we choose a signature for a function, we balance a tension between the flexibility of the callers of the method and the implementions of the signature. The caller will always want to have the most general interface as possible: This will let the caller pass in a variety of objects as a valid argument. Conversely, the implementer will want to know specifically what kind of object it needs to be able to handle. It will unlock more methods if it is guaranteed a more specific type of object, and can rely on more assumptions about specific types that it is forbidden from doing in the general case (e.g. constant time lookup by index is valid to assume in an ArrayList but not a Collection in general).

Concretely, as an implementer of a connector, I want to access elements in the collection/arraylist of records by index in the put(...) method. I can only do this if the framework forfeits its flexibility and tells me specifically I can expect an ArrayList – now I can do a lot more as an implementer like access by index in constant time or ask the size() of the ArrayList. Of course, we are forfeiting the possibility of doing some things other Collection implementations are good at, like checking containment in constant time, but to date Connect Framework has never provided that anyway, since it is the fundamental type of the Consumer.

The purpose of this KIP is to reconsider the interface's use of "Collection<SinkRecord>", in favor of the more specific datatype "ArrayList<SinkRecord>".

Public Interfaces

Add to the "SinkTask" abstract class a method:

    public abstract void put(ArrayList<SinkRecord> records);

Proposed Changes

Add to the "SinkTask" abstract class a method:

    public abstract void put(ArrayList<SinkRecord> records);

Implement the existing abstract method to call the new method in a backwards compat way:

    public void put(Collection<SinkRecord> records) {
ArrayList<SinkRecord> recordsList = records instanceof ArrayList
? (ArrayList<SinkRecord>) records : new ArrayList<>(records);
put(recordsList);
}

Compatibility, Deprecation, and Migration Plan

It is critical that connect frameworks before and after the change are compatible with connectors implemented with either signature. It is less critical that developers of connectors will be able to pick which to implement if they upgrade their artifacts to new AK version as a dep.

The framework will not change, just the Abstract class. The existing (unchanged) call within WorkerSinkTask of `task.put(new ArrayList<>(messageBatch));` will successfully call the right function whether the connector has implemented the new or old signature.

Connector developers will need to change their SinkTask implementations if they update their artifacts, but this is acceptable since they'll find out at compile-time during their development process, not at runtime on production.

Rejected Alternatives

List

List is just more ambiguous, and there's no reason to expect a LinkedList or other list implementation.

  • No labels