Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

TLDR: When a developer implements `SinkTask.put(...)`, it's a much better experience to be guaranteed an `ArrayList<SinkRecord>` `List<SinkRecord>` than the current `Collection<SinkRecord>`, because the connector developer will have access to the full set of ArrayList methods List methods and not just limit set of Collection methods.

...

Kafka gives ordering guarantees within a Topic-Partition: Records will have strictly increasing offsets as they are consumed. The current SinkTask forfeits any guarantee when it passes the records to the Task via the `put(Collection<SinkRecrod>)` method. From the Task's perspective, it is getting a list of maybe-ordered records. If a task wants to put records in S3 with a file for each topic-partition, a task must sort the collection of records before putting them into S3, since it is not guaranteed any ordering from the caller. We happen to know that the only caller of SinkTask.put(...) (which happens to be WorkerSinkTask) does indeed call put(...) with an ArrayList that List that is sorted in the order the consumer consumes. But why is this agreement tacit and not explicit? It would harden the API to explicitly require the transfer of records to be in a datatype that maintains ordering.

...

Generally, whenever we choose a signature for a function, we balance a tension between the flexibility of the callers of the method and the implementations of the signature. The caller will always want to have the most general interface as possible: This will let the caller pass in a variety of objects as a valid argument. Conversely, the implementer will want to know specifically what kind of object it needs to be able to handle. It will "unlock" more methods if it is guaranteed a more specific type of object, and can rely on more assumptions about specific types that it is forbidden from doing in the general case (e.g. constant time lookup by index is valid to assume in an ArrayList List but not a Collection in general).

Concretely, as an implementer of a connector's SinkTask.put(...) method, I want to do things like `records.get(int i)`. I can only do this if the framework gives a more specific implementation of Collection like ArrayListList.


The purpose of this KIP is to reconsider the interface's use of "Collection<SinkRecord>", in favor of the more specific datatype "ArrayList<SinkRecord>List<SinkRecord>".

Public Interfaces

Add to the "SinkTask" abstract class a method:

    public abstract void put(ArrayList<SinkRecord>List<SinkRecord> records);

Proposed Changes

...

    public abstract void put(ArrayList<SinkRecord>List<SinkRecord> records);

Implement the existing abstract method to call the new method in a backwards compat way:

...

The framework will not change, just the Abstract class. The existing (unchanged) call within WorkerSinkTask of `task.put(new ArrayList<>List<>(messageBatch));` will successfully call the right function whether the connector has implemented the new or old signature.

...

There is only one caller of this method, `WorkerSinkTask`, which always passes an ArrayList as the Collection.

Rejected Alternatives

List

ArrayList

Considered using ArrayList since you'd get guarantees about runtime of the indexing, but decided it is a bad tradeoff to get this guarantee at the expenseList is just more ambiguous, and there's no reason to expect a LinkedList or other list implementation.