Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

StatusStatus 

Current state: DRAFT [One of "Under Discussion", "Accepted", "Rejected"] DISCARDED

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread] 

JIRA: KAFKA-3294 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

There are already some open-source REST proxies are available.  Confluent Confluent REST Proxy got comprehensive interface.
But we would like to add REST server that many users ask for under Apache Kafka repo.

We want to add Kafka Rest Server to Kafka for the following reasons.

...

content types in Content-Type and Accept headers to make the format of the data explicit. This approach is borrowed from Confluent  Confluent Rest Proxy.
Supported Content Content-Types are: application/vnd.kafka.binary.v1+json, application/vnd.kafka.json.v1+json
Rest server can be easily scaled by deploying multiple proxy instances. This way we can spread the load across multiple proxy instanceinstances.
Producer API

REST server accepts produce requests for specific topics or partitions. It internally uses java producer instance to write messages into Kafka. 

...

 

POST /topics/test/partitions/1 HTTP/1.1

Host:kafkarest.host.com

Content-Type: application/vnd.kafka.binary.v1+json

Accept: application/vnd.kafka.v1+json

{

  "records": [

   {

     "key": "a2V5",

     "value": "dmFsdWU="

   },

   {

     "value": "dmFsdWU="

   }

 ]

}


Example response:

 

HTTP/1.1 200 OK

 Content-Type: application/vnd.kafka.v1+json


{

 "offsets": [

   {

     "partition": 1,

     "offset": 1,

   },

   {

     "partition": 1,

     "offset": 2,

   }

 ]

}

Consumer API

GET /topics/:topic_name/partitions/:partition?offset=(int)
Description : Consume messages from one partition of the topic.

Parameters:

     topic_name

 Description : Create a new consumer instance in the consumer group.

 

POST /consumers/:group

Parameters:

  • group (String) - group name

Request:

  • JSON Object contains name, dataformat, consumer properties.

Response:

  • JSON Object contains response objects.

  • Response includes a URL including the host since the consumer is stateful and tied to a specific REST proxy instance

Status Codes:

  • 404 Not Found

    • Error Code 40401 - topic Not Found

    • Error Code 40402 -  partition Not Found

  • 500 Internal Server Error

    • Error Code 50001 - Kafka Error


Example request:

 


POST /consumers/group/ HTTP/1.1

Accept: application/vnd.kafka.v1+json, application/vnd.kafka+json, application/json


{

 "name": "Instance1",

 "format": "binary",

 "auto.offset.reset": "smallest",

 "auto.commit.enable": "false"

}

 

Example response:

 


HTTP/1.1 200 OK

Content-Type: application/vnd.kafka.v1+json


{

 "id": "Instance1",

 "url": "http://kafkarest1.com/consumers/group/instances/Instance1"

}

 

Description : consumes a message from the specified topic on behalf of the specified consumer group

 

GET /consumers/:group/:instance/:topic

Parameters:

  • topic (String) - topic name

     partition
  • group (

int
  • String) -

partition number
     offset (int) - offset to fetch
  • consumer group id

  • Instance (string) - consumer instance name

Response:

   
  • JSON Object contains response objects


Status Codes:

   
  • 404 Not Found

       
    • Error Code

40401
    • 4040X - topic Not Found

       
    • Error Code 4040Y - group Not Found

    • Error Code

40402
    • 4040Z -

partition
    • instance Not Found

   500
  • 500 Internal Server Error

       Error
    • Error Code 50001 - Kafka Error

 


Example

...

request:

 

GET /

consume

consumers/

test

group/

partitions/1?offset=1

instance1/topic HTTP/1.1

Accept: application/vnd.kafka.binary.v1+json

 

Example response:

 

HTTP/1.1 200 OK

Content-Type: application/vnd.kafka.binary.v1+json


[

 {

   "key": "a2V5",

   "value": "dmFsdWU=",

   "partition": 1,

   "offset": 1,

 },

 {

    "key": "a2V5",

   "value": "dmFsdWU=",

   "partition": 1,

   "offset": 2,

 }

]

 

Description : Commit offsets for the consumer instance associated with group.

 

GET /topics

POST /consumers/:group/:

topic/messages?group=<group>
Description : consumes a message from the specified topic on behalf of the specified consumer group

instance/offsets

 

Parameters:     topic

  • group (String) -

topic name
     group
  • consumer group id

  • Instance (String) - consumer

group id
  • instance name


Response:     JSON

  • JSON Object contains response objects


Status Codes:   

  • 404 Not Found

         Error
    • Error Code 40401 -

topic
    • group Not Found

    • Error Code 40402 - consumer instance Not Found

   
  • 500 Internal Server Error

         Error
    • Error Code 50001 - Kafka Error

 


Example

...

request:

 

GET

POST /consumers/

topics

group/

test/messages?group=testgroup

instance1/offsets HTTP/1.1

Accept: application/vnd.kafka

.binary

.v1+json

 

Example response:

 

HTTP/1.1 200 OK

Content-Type: application/vnd.kafka

.binary

.v1+json


[

 {

   "

key

topic": "

a2V5

test",

   "

value": "dmFsdWU=",    "

partition": 1,

   "

offset

committed":

1, 

100

 },

 {

   "topic": "test",

   "

key

partition": 2,

   "

a2V5

committed": 200

 },

 {

   "

value

topic": "

dmFsdWU=

test2",

   "partition": 1,

   "

offset

committed":

2,

50

 }

]

 

POST

GET /topics/:

group/offsets

topic_name/partitions/:partition?offset=(int)
Description :

Commit offsets for the consumer instance associated with group

Consume messages from one partition of the topic.

Parameters:

   

group (String) - consumer group id

 topic_name (String) - topic name
     partition (int) - partition number
     offset (int) - offset to fetch

Response:
    JSON Object contains response objects

Status Codes:
   

 404

404 Not Found
     

 Error

  Error Code 40401 -

group

topic Not Found
        Error Code 40402 - partition Not Found
 

500

 500 Internal Server Error
       Error Code 50001 - Kafka Error

 

Example

...

 request:

 

POST

GET /consume/

topics

test/

testgroup/offsets

partitions/1?offset=1 HTTP/1.1

Accept: application/vnd.kafka.binary.v1+json

 

Example response:

 

HTTP/1.1 200 OK

Content-Type: application/vnd.kafka.binary.v1+json


[

 {

   "

topic

key": "a2V5",

   "value": "

topic

dmFsdWU=",

   "partition": 1,

   "

committed

offset": 1,

 },

 {

    "

topic

key": "a2V5",

   "value": "

topic

dmFsdWU=",

   "partition":

2

1,

   "

committed

offset": 2,

 }

,

]


 

TODO
I will update required config options and response Error codes/messages.

 

Compatibility, Deprecation, and Migration Plan

This KIP only proposes additions. There should be no compatibility issues.

Rejected Alternatives

Make Kafka Rest Server an external third-party tool

Main Reason for this KIP is to add Rest Server to Kafka. Shipping Kafka Rest Server as part of a normal Kafka release
makes it immediately available to every user that downloads Kafka. Also helps to maintain the version compatibility
between Kafka clients and Rest Server.

 
Push/Stream messages to end clients:

End clients can register for the listening of events. This can be implemented used Web socket push notifications.
But This is against kafka consumer's pull model. It is good maintain Kafka consumer's semantics.