Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

TLDR: When a developer implements `SinkTask.put(...)`, it's a much better experience to be guaranteed an `ArrayList<SinkRecord>` than the current `Collection<SinkRecord>`, because the connector developer will have access to the full set of ArrayList methods and not just limit set of Collection methods.

When a developer writes a new Sink Connector, the key class to implement is `SinkTask`, and the key method The key class in a Sink Connector is the `SinkTask` implementation. The `SinkTask.put(...)` method delivers does the work of processing records from the KafkaConsumer to the Connector-specific code through this entrypoint, and its Kafka. So it's critical that developers have the right method signature here to work with. Its current abstract class specifies:

...

Usually, the implementer will use a for-each loop to iterate through the records one by one and act on them. There is only one caller, WorkerSinkTask which always passes an ArrayList.

There are two shortcomings with this signature:

  1. The Connector implementation is not provide with any ordering guarantees of Records.
  2. The decision values giving flexibility to the caller of the method over the implementer of the methodproposed change will give flexibility to task implementation by giving it more knowledge, over the method caller who prefers to givew few guarantees.

Ordering Guarantees

Kafka gives ordering guarantees within a Topic-Partition: Records will have strictly increasing offsets as they are consumed. The current SinkTask forfeits any guarantee when it passes the records to the Task via the put`put(Collection<SinkRecrod>)` method. From the Task's perspective, it is getting a list of maybe-ordered records. If a task wants to put records in S3 with a file for each topic-partition, a task must sort the collection of records before putting them into S3, since it is not guaranteed any ordering from the caller. We happen to know that the only caller of SinkTask.put(...) (which happens to be WorkerSinkTask) does indeed call put(...) with an ArrayList that is sorted in the order the consumer consumes. But why is this agreement tacit and not explicit? It would harden the API to explicitly require the transfer of records to be in a datatype that maintains ordering.

Flexibility for Implementers

Generally, whenever we choose a signature for a function, we balance a tension between the flexibility of the callers of the method and the implementions implementations of the signature. The caller will always want to have the most general interface as possible: This will let the caller pass in a variety of objects as a valid argument. Conversely, the implementer will want to know specifically what kind of object it needs to be able to handle. It will "unlock" more methods if it is guaranteed a more specific type of object, and can rely on more assumptions about specific types that it is forbidden from doing in the general case (e.g. constant time lookup by index is valid to assume in an ArrayList but not a Collection in general).

Concretely, as an implementer of a connector, I want to access elements in the collection/arraylist of records by index in the 's SinkTask.put(...) method, I want to do things like `records.get(int i)`. I can only do this if the framework forfeits its flexibility and tells me specifically I can expect an ArrayList – now I can do a lot more as an implementer like access by index in constant time or ask the size() of the ArrayList. Of course, we are forfeiting the possibility of doing some things other Collection implementations are good at, like checking containment in constant time, but to date Connect Framework has never provided that anyway, since it is the fundamental type of the Consumergives a more specific implementation of Collection like ArrayList.


The purpose of this KIP is to reconsider the interface's use of "Collection<SinkRecord>", in favor of the more specific datatype "ArrayList<SinkRecord>".

Public Interfaces

Add to the "SinkTask" abstract class a method:

    public abstract void put(ArrayList<SinkRecord> records);

Proposed Changes

Add to the "SinkTask" abstract class a method:

...

    public void put(Collection<SinkRecord> records) {
ArrayList<SinkRecord> recordsList = records instanceof ArrayList
? (ArrayList<SinkRecord>) records : new ArrayList<>(records);
put(recordsList);
}

Compatibility, Deprecation, and Migration Plan

It is critical that connect frameworks before and after the change are compatible with connectors implemented with either signature. It is less critical that developers of connectors will be able to pick which to implement if they upgrade their artifacts to new AK version as a dep.

...

Connector developers will need to change their SinkTask implementations if they update their artifacts, but this is acceptable since they'll find out at compile-time during their development process, not at runtime on production.

There is only one caller of this method, `WorkerSinkTask`, which always passes an ArrayList as the Collection.

Rejected Alternatives

List

List is just more ambiguous, and there's no reason to expect a LinkedList or other list implementation.