Skip to content

New Consumer API Design

Huajun Qin edited this page Dec 3, 2016 · 4 revisions

Overview

This document is a draft of REST Proxy API changes to be made as part of supporting the "new" 0.9+ Java consumer. The "new" Java consumer is sufficiently different from the "old" consumer that it warrants REST API changes.

Note that this document is still very much in draft form. Many APIs haven't been specified yet. And some APIs listed here may change.

Versioning

The "new" Java consumer will not replace the "old" Java consumer in existing REST API endpoints. Instead, a new consumer REST API will be introduced that is powered by the "new" Java consumer.

Exactly how versioning is done is TBD. One option is to introduce a version number in the url, eg /v2/... Another option is to use the content type. Perhaps there are other alternatiaves to consider, too.

New API

Creating a consumer instance in a consumer group

POST /consumers/(string: group_name)

The request JSON object will contain the following configuration keys:

  • name
  • consumer-configs – forwarded to the Java consumer, eg max.poll.records

Response:

  • instance_id
  • base_uri – the REST proxy instance with the KafkaConsumer instance, where all subsequent poll and commit calls should be made

Closing a consumer

DELETE /consumers/(string: group_name)/instances/(string: instance_id)

Subscribing

POST /consumers/(string: group_name)/instances/(string: instance_id)/subscription

The request JSON object will contain the following configuration keys:

  • topics – a list of topics, or:
  • topic_pattern – a REGEX pattern

Get current subscription

GET /consumers/(string: group_name)/instances/(string: instance_id)/subscription

Unsubscribing

DELETE /consumers/(string: group_name)/instances/(string: instance_id)/subscription

Consuming

GET /consumers/(string: group_name)/instances/(string: instance_id)/records

timeout is a query parameter.

Response:

[
  {
    "topic": "foo",
    "key": "a2V5",
    "value": "Y29uZmx1ZW50",
    "partition": 1,
    "offset": 100
  }
]

Partition assignment

POST /consumers/(string: group_name)/instances/(string: instance_id)/assignment

Request

[ {"topic": "foo", "partition": 0}, {"topic": "bar", "partition": 1} ]

GET /consumers/(string: group_name)/instances/(string: instance_id)/assignment

Response

[ {"topic": "foo", "partition": 0}, {"topic": "bar", "partition": 1} ]

Seek to positions

POST /consumers/(string: group_name)/instances/(string: instance_id)/positions

Seek to an offset

Request

{"topic": "foo", "partition": 0, "offset": 12}

POST /consumers/(string: group_name)/instances/(string: instance_id)/positions/beginning

Seek to the beginning

Request

[{"topic": "foo", "partition": 0}, {"topic": "bar", "partition": 2}]

POST /consumers/(string: group_name)/instances/(string: instance_id)/positions/end

Seek to the end

Request

[{"topic": "foo", "partition": 0}, {"topic": "bar", "partition": 2}]

Committing offsets

POST /consumers/(string: group_name)/instances/(string: instance_id)/offsets

Optional query parameter: async -- if present, commit asynchrously.

Commit offsets returned on the last .../records call for all the subscribed list of topics and partitions.

POST /consumers/(string: group_name)/instances/(string: instance_id)/offsets

Optional query parameter: async -- if present, commit asynchrously.

Commit specific offsets

Request

[{"topic": "foo", "partition": 0, "offset": 12}, {"topic": "bar", "partition": 1, "offset": 101}]

GET /consumers/(string: group_name)/instances/(string: instance_id)/offsets

Get the last committed offset for the given partition (whether the commit happened by this process or another).

Request

[{"topic": "foo", "partition": 0}, {"topic": "bar", "partition": 2}]

TODO:

The following API endpoints still need to be spec'd and must be included in this new API. Note that several API endpoints (eg, committed) are not included and can come in later releases. Ordered roughly by importance/likelihood of implementation:

  • close (DELETE)
  • commit (sync/async/with or without a specified offset)
  • seek (general, toBeginning, toEnd)
  • assign