You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Over several years of usage we have run into a number of complaints about the scala client. This is a proposal for addressing these complaints.

Goals

We would like to accomplish the following:

  • General code cleanup: both producer and consumer code are extremely hard to understand
  • Generalize the producer API
    • Give back a return value containing error code, offset, etc
  • Generalize the consumer API
    • Handle consuming from known partitions
    • Handle consumer-driven offset changes.
    • Get rid of the need for simple consumer
  • Make the producer fully async
  • Redo the request serialization layer to avoid all the custom request definition objects
  • Remove all threads from the consumer
  • Remove all library dependencies from clients to avoid version clashes
  • Create an RPC-based protocol for partition assignment to replace the direct zookeeper usage as prototyped here.
    • This will allow a number of simplifications
      • Centralization of all complexity on the server side (clients get easier to write in all languages)
      • Easier for languages with poor zk compatibility
      • More scalable with # of partitions
      • Easier to get correctness under partial-failure conditions
  • Modularize the clients so that the client and server do not share a jar
  • Rewrite the clients in Java
    • Though scala is a nice language it has proven to be a painful dependency for people wanting to integrate the client. The server would remain in scala for our convenience, but the clients would move to java. The following are the scala complaints:
      • Bad stacktraces
      • Leakage of scala classes into the Java api
      • Non-existant scala compatibility (binary compatibility breaks every 6 months)
      • Hard to get javadocs
      • People can't read the code--ideally we want people using the client to be able to read the client code.

The idea would be to roll out the new api as a separate jar, leaving the existing client intact but deprecated for one or two releases before removing the old client. This should allow a gradual migration.

Proposed Producer API

SendResponse send(KafkaMessage... message);


Usage:
Producer producer = new Producer(new ProducerConfig(props));
SendResponse r = producer.send(new KafkaMessage(topic, key, message));
r.onCompletion(new Runnable() {System.out.println("All done")})
r.getOffset()
r.getError()

Changes from current API

  • The key and message in KafkaMessage would have type Object instead of parameterized types. The parameterized types have not played well with the fact that different topics may take different types and since the serializer is instantiated via reflection (at runtime) the parametric types add no actual type safety.
  • The producer will always attempt to batch data and will always immediately return a SendResponse which acts as a Future to allow the client to await the completion of the request.

Implementation

As today we will have a single background thread that does message collation, serialization, and compression. However this thread will now run an event loop to simultaneously send requests and receive responses from all brokers.

Consumer API

Here is an example of the proposed consumer API:

Consumer consumer = new Consumer(props);
consumer.addTopic("my-topic"); // consume dynamically assigned partitions
consumer.addTopic("my-other-topic", 3); //
long timeout = 1000;
while(true) {
  List<MessageAndOffset> messages = consumer.poll(timeout);
  process(messages);
  consumer.commit(); // alternately consumer.commit(topic) or consumer.commit(topic, partition)
}

As before the consumer group is set in the properties and for simplicity each consumer can belong to only one group.

addTopic is used to change the set of topics the consumer consumes. If the user gives only the topic name it will automatically be assigned a set of partitions based on the group membership protocol below. If the user specifies a partition they will be statically assigned that partition.

The actual partition assignment can be made pluggable (see the proposal on group membership below) by setting an assignment strategy in the properties.

This api allows alternative methods for managing offsets as today by simply disabling autocommit and not calling commit().

Implementation

The client would be entirely single threaded.

The poll() method executes the network event loop, which includes the following:

  • Checks for group membership changes
  • Sends a heartbeat to the controller if needed
  • Issues a fetch request to all brokers for which the consumer currently is consuming
  • Reads any available fetch responses
  • Issues metadata requests when requests fail to get the new cluster topology

The timeout the user specifies will be purely to ensure we have a mechanism to give control back to the user even when no messages are delivered. It is up to the user to ensure poll() is called again within the heartbeat frequency set for the consumer group. Internally the timeout on our select() may uses a shorter timeout to ensure the heartbeat frequency is met even when no messages are delivered.

Consumer Group Membership Protocol

We will introduce a set of RPC apis for managing partition assignment on the consumer side. This will be based on the prototype here. This set of APIs is orthogonal to the APIs for producing and consuming data, it is responsible for group membership.

My proposal would be to actually not handle partition assignment at all. The only facility we need to offer is a group membership facility. If all consumers agree on who exists the consumer client implementation can implement any partition assignment it likes.

api

sender

fields

returns

description

issue to

list_groups

client

group_name (optional)

group_name
controller_id/host/port

Returns metadata for one or more groups. If issued with a group_name it returns metadata for just that group. If no group_name is given it returns metadata for all active groups. This request can be issued to any server.

any server

create_group

client

group_name
heart_beat_freq
ephemeral

group_name
controller_id/host/port

Create a new group with the given name and the specified minimum heartbeat frequency. Return the id/host/port of the server acting as the controller for that group.
If the ephemeral flag is set the group will disappear when the last client exits.

any server

delete_group

client

group_name

ack

Deletes a non-emphemeral group (but only if it has no members)

controller

join_group

client

group_name

no response

Ask to join the given group

controller

begin_group_change

server

 

ack

This message is sent from the controller to the group members. Group change happens in two phases--first all group members are notified of the change, when they all acknowledge, then they are sent the new group membership list.

client

group_changed

server

group_generation
group_members

no response

This message alerts all group members of the current group membership. This is the second phase of the group membership change.

client

im_alive

client

group_generation

no response

This heartbeat message must be sent by the consumer to the controller with an SLA specified in the create_group command. The client can and should issues these more frequently to avoid accidentally timing out.

controller

The use of this protocol would be as follows:

  • on startup the consumer issues a list_groups(my_group) to find out the location of the controller for its group (each server is the controller for some groups)
  • knowing where the controller is, it connects and issues a join_group command, then it awaits a begin_group_change and group_changed message
  • on receiving the join_group command the server issues a begin_group_change message to all alive group members and waits for an acknowledgement from each consumer. The two-phase group change ensures that it is never the case that two consumers have different views of the group at the same time.
  • on receiving acknowledgements from all consumers of the group membership change the controller sends the a group_changed message to all the new group members
  • on receiving the new group membership the consumer is now able to start consuming its partitions and must now start sending heartbeat messages back to the controller with the current generation id.

Low-level RPC API

TBD we need to design an underlying client network abstraction that can be used to send requests. This should ideally handle metadata and addressing by broker id, multiplex over multiple connections, and support both blocking and non-blocking operations.

Something like

KafkaConnection connection = new KafkaConnection(bootstrapUrl, socket_props);
List<Responses> connection.poll(timeout, Request...requests);

This is an async api so the responses responses returned have no relationship to the request sent. This is meant to be called from within an event loop--i.e. each call to poll corresponds to one iteration of the select() loop. poll() with no request simply checks for any new responses to previous requests or non-client-initiated communication.

Misc Notes and questions

Currently we do a lot of logging and metrics recording in the client. I think instead of handling reporting of metrics and logging we should instead incorporate this feedback in the client apis and allow the user to log or monitor this in the manner they chose. This will avoid dependence on a particular logging or metrics library and seems like a good policy.

It is not clear how to support changing the offset either manually or programmatically while consumption is happening. Simply using the api to set the offset will likely collide with commit calls from the active consumer. Perhaps this is not needed?

How many jars should we have? I think we could do either

  • Option A: kafka-clients.jar and kafka-server.jar or
  • Option B: kakfa-common.jar, kafka-producer.jar, kafka-consumer.jar, and kafka-server.jar

I prefer Option A as it is simpler if we add an AdminApi--we won't need to introduce a whole new jar for it.

  • No labels