Status
Current state: DRAFT [One of "Under Discussion", "Accepted", "Rejected"]
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: KAFKA-3294
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The goal is to add Kafka Rest Server/Proxy to Kafka Repository. This will allow any language/tool that
can work with HTTP to produce and consume messages, and perform administrative actions with Kafka service.
Why add Kafka Rest Server/Proxy to Kafka?
There are already some open-source REST proxies are available. Some are specific to a particular user cases
Confluent REST Proxy got comprehensive interface. We want to add Kafka Rest Server to Kafka for the following reasons.
1) Many data Infra tools comes up with Rest Interface. It is useful to have inbuilt Rest API support for Produce,
Consume messages and admin interface for integrating with external management tools.
2) Shipping Kafka Rest Server as part of a normal Kafka release makes it immediately available to every user that downloads Kafka.
3) Helps to maintain the version compatibility between Kafka and Rest Server.
Some of the good practices and ideas will be borrowed from existing tools.
Proposed Changes
Rest Server/Proxy:
Supported Content-Types are: application/vnd.kafka.binary.v1+json, application/vnd.kafka.json.v1+json
Producer API:
REST Proxy accepts produce requests for specific topics or partitions. It internally uses java producer instance to write messages into Kafka.
Consumer API:
REST Proxy uses the consumer API to consume the messages from the subscribed topic on behalf of the specified consumer group.
When a message is consumed on behalf of a consume group for the first time, then Kafka Consumer instance joins the consumer group
and subscribes to the topic. All Consumer instances that are currently members of that group and subscribed to that topic divide
topic partitions among themselves. If a Consumer instance has not consumed from a particular topic on behalf of a particular
consumer group for configured interval, then it unsubscribes from the topic on behalf of that group. Offset commit can be either
automatic or manual as requested by the user. We can also retrieve the messages for a consumer, from a specific partition,
starting with an offset.
We also want to take community opinion on other ways of implementing consumer group
functionality.
Admin API and Security Integration:
This will be taken up as future work after the KIP-4 implementation.
Public Interfaces
Producer APIs:
POST /topics/:topic
Description : Produce messages to a given topic
Parameters:
Topic (String) - topic name
Request:
JSON Object contains array of produce records
Response:
JSON Object contains response objects
Status Codes:
404 Not Found
Error Code 40401 - topic Not Found
Error Code 40402 - Version Not Found
500 Internal Server Error
Error Code 50001 - Kafka Error
Example request:
POST /topics/test HTTP/1.1 Host:kafkarest.host.com Content-Type: application/vnd.kafka.binary.v1+json Accept: application/vnd.kafka.v1+json { "records": [ { "key": "a2V5", "value": "dmFsdWU=" }, { "value": "dmFsdWU=", "partition": 1 } ] } |
Example response:
HTTP/1.1 200 OK Content-Type: application/vnd.kafka.v1+json { "offsets": [ { "partition": 2, "offset": 100 }, { "partition": 1, "offset": 101 } ] } |
POST /topics/:topicName/partitions/:partition
Description : Produce messages to one partition of the topic
Parameters:
topicName (String) - topic name
partition (int) - partition number
Request:
JSON Object contains array of produce records
Response:
JSON Object contains response objects
Status Codes:
404 Not Found
Error Code 40401 - topic Not Found
Error Code 40402 - partition Not Found
500 Internal Server Error
Error Code 50001 - Kafka Error
Example request:
POST /topics/test/partitions/1 HTTP/1.1 Host:kafkarest.host.com Content-Type: application/vnd.kafka.binary.v1+json Accept: application/vnd.kafka.v1+json { "records": [ { "key": "a2V5", "value": "dmFsdWU=" }, { "value": "dmFsdWU=" } ] } |
Example response:
HTTP/1.1 200 OK Content-Type: application/vnd.kafka.v1+json { "offsets": [ { "partition": 1, "offset": 100, }, { "partition": 1, "offset": 101, } ] } |
Consumer APIs:
GET /topics/:topic_name/partitions/:partition?offset=(int)
Description : Consume messages from one partition of the topic.
Parameters:
topic_name (String) - topic name
partition (int) - partition number
offset (int) - offset to fetch
Response:
JSON Object contains response objects
Status Codes:
404 Not Found
Error Code 40401 - topic Not Found
Error Code 40402 - partition Not Found
500 Internal Server Error
Error Code 50001 - Kafka Error
Example request:
GET /consume/test/partitions/1?offset=1 HTTP/1.1 Accept: application/vnd.kafka.binary.v1+json |
Example response:
HTTP/1.1 200 OK Content-Type: application/vnd.kafka.binary.v1+json [ { "key": "a2V5", "value": "dmFsdWU=", "partition": 1, "offset": 1, }, { "key": "a2V5", "value": "dmFsdWU=", "partition": 1, "offset": 2, } ] |
GET /topics/:topic/messages?group=<group>
Description : consumes a message from the specified topic on behalf of the specified consumer group
Parameters:
topic (String) - topic name
group (String) - consumer group id
Response:
JSON Object contains response objects
Status Codes:
404 Not Found
Error Code 40401 - topic Not Found
500 Internal Server Error
Error Code 50001 - Kafka Error
Example binary request:
GET /topics/test/messages?group=testgroup HTTP/1.1 Accept: application/vnd.kafka.binary.v1+json |
Example response:
HTTP/1.1 200 OK Content-Type: application/vnd.kafka.binary.v1+json [ { "key": "a2V5", "value": "dmFsdWU=", "partition": 1, "offset": 1, }, { "key": "a2V5", "value": "dmFsdWU=", "partition": 1, "offset": 2, } ] |
POST /topics/:group/offsets
Description : Commit offsets for the consumer instance associated with group.
Parameters:
group (String) - consumer group id
Response:
JSON Object contains response objects
Status Codes:
404 Not Found
Error Code 40401 - group Not Found
500 Internal Server Error
Error Code 50001 - Kafka Error
Example request:
POST /topics/testgroup/offsets HTTP/1.1 Accept: application/vnd.kafka.v1+json |
Example response:
HTTP/1.1 200 OK Content-Type: application/vnd.kafka.v1+json [ { "topic": "topic", "partition": 1, "committed": 1 }, { "topic": "topic", "partition": 2, "committed": 2 }, ] |
Compatibility, Deprecation, and Migration Plan
This KIP only proposes additions. There should be no compatibility issues.
Rejected Alternatives
Make Kafka Rest Server an external third-party tool:
Main Reason for this KIP is to add Rest Server to Kafka. Shipping Kafka Rest Server as part of a normal Kafka release
makes it immediately available to every user that downloads Kafka. Also helps to maintain the version compatibility
between Kafka clients and Rest Server.