Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rest: fix error code #190

Merged
merged 2 commits into from
Mar 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 63 additions & 13 deletions karapace/kafka_rest_apis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,11 @@ def validate_partition_id(partition_id: str, content_type: str) -> int:
try:
return int(partition_id)
except ValueError:
KafkaRest.not_found(message=f"Partition {partition_id} not found", content_type=content_type, sub_code=404)
KafkaRest.not_found(
message=f"Partition {partition_id} not found",
content_type=content_type,
sub_code=RESTErrorCodes.HTTP_NOT_FOUND.value,
)

@staticmethod
def is_valid_schema_request(data: dict, prefix: str) -> bool:
Expand Down Expand Up @@ -483,18 +487,32 @@ def get_partition_info(self, topic: str, partition: str, content_type: str) -> d
for p in partitions:
if p["partition"] == partition:
return p
self.not_found(message=f"Partition {partition} not found", content_type=content_type, sub_code=40402)
self.not_found(
message=f"Partition {partition} not found",
content_type=content_type,
sub_code=RESTErrorCodes.PARTITION_NOT_FOUND.value,
)
except UnknownTopicOrPartitionError:
self.not_found(message=f"Partition {partition} not found", content_type=content_type, sub_code=40402)
self.not_found(
message=f"Partition {partition} not found",
content_type=content_type,
sub_code=RESTErrorCodes.PARTITION_NOT_FOUND.value,
)
except KeyError:
self.not_found(message=f"Topic {topic} not found", content_type=content_type, sub_code=40401)
self.not_found(
message=f"Topic {topic} not found",
content_type=content_type,
sub_code=RESTErrorCodes.TOPIC_NOT_FOUND.value,
)
return {}

def get_topic_info(self, topic: str, content_type: str) -> dict:
md = self.cluster_metadata()["topics"]
if topic not in md:
self.not_found(
message=f"Topic {topic} not found in {list(md.keys())}", content_type=content_type, sub_code=40401
message=f"Topic {topic} not found in {list(md.keys())}",
content_type=content_type,
sub_code=RESTErrorCodes.TOPIC_NOT_FOUND.value,
)
return md[topic]

Expand Down Expand Up @@ -531,11 +549,19 @@ async def validate_publish_request_format(self, data: dict, formats: dict, conte

# disallow missing or non empty 'records' key , plus any other keys
if "records" not in data or set(data.keys()).difference(PUBLISH_KEYS) or not data["records"]:
self.unprocessable_entity(message="Invalid request format", content_type=content_type, sub_code=422)
self.unprocessable_entity(
message="Invalid request format",
content_type=content_type,
sub_code=RESTErrorCodes.HTTP_UNPROCESSABLE_ENTITY.value,
)
for r in data["records"]:
convert_to_int(r, "partition", content_type)
if set(r.keys()).difference(RECORD_KEYS):
self.unprocessable_entity(message="Invalid request format", content_type=content_type, sub_code=422)
self.unprocessable_entity(
message="Invalid request format",
content_type=content_type,
sub_code=RESTErrorCodes.HTTP_UNPROCESSABLE_ENTITY.value,
)
# disallow missing id and schema for any key/value list that has at least one populated element
if formats["embedded_format"] in {"avro", "jsonschema"}:
for prefix, code in zip(RECORD_KEYS, RECORD_CODES):
Expand All @@ -551,7 +577,11 @@ async def validate_publish_request_format(self, data: dict, formats: dict, conte
try:
await self.validate_schema_info(data, prefix, content_type, topic, formats["embedded_format"])
except InvalidMessageSchema as e:
self.unprocessable_entity(message=str(e), content_type=content_type, sub_code=42205)
self.unprocessable_entity(
message=str(e),
content_type=content_type,
sub_code=RESTErrorCodes.INVALID_DATA.value,
)

async def produce_message(self, *, topic: str, key: bytes, value: bytes, partition: int = None) -> dict:
prod = None
Expand Down Expand Up @@ -594,21 +624,33 @@ def topic_details(self, content_type: str, *, topic: str):
metadata = self.cluster_metadata([topic])
config = self.get_topic_config(topic)
if topic not in metadata["topics"]:
self.not_found(message=f"Topic {topic} not found", content_type=content_type, sub_code=40401)
self.not_found(
message=f"Topic {topic} not found",
content_type=content_type,
sub_code=RESTErrorCodes.TOPIC_NOT_FOUND.value,
)
data = metadata["topics"][topic]
data["name"] = topic
data["configs"] = config
self.r(data, content_type)
except UnknownTopicOrPartitionError:
self.not_found(message=f"Topic {topic} not found", content_type=content_type, sub_code=40401)
self.not_found(
message=f"Topic {topic} not found",
content_type=content_type,
sub_code=RESTErrorCodes.UNKNOWN_TOPIC_OR_PARTITION.value,
)

def list_partitions(self, content_type: str, *, topic: Optional[str]):
self.log.info("Retrieving partition details for topic %s", topic)
try:
topic_details = self.cluster_metadata([topic])["topics"]
self.r(topic_details[topic]["partitions"], content_type)
except (UnknownTopicOrPartitionError, KeyError):
self.not_found(message=f"Topic {topic} not found", content_type=content_type, sub_code=40401)
self.not_found(
message=f"Topic {topic} not found",
content_type=content_type,
sub_code=RESTErrorCodes.TOPIC_NOT_FOUND.value,
)

def partition_details(self, content_type: str, *, topic: str, partition_id: str):
self.log.info("Retrieving partition details for topic %s and partition %s", topic, partition_id)
Expand All @@ -623,8 +665,16 @@ def partition_offsets(self, content_type: str, *, topic: str, partition_id: str)
except UnknownTopicOrPartitionError as e:
# Do a topics request on failure, figure out faster ways once we get correctness down
if topic not in self.cluster_metadata()["topics"]:
self.not_found(message=f"Topic {topic} not found: {e}", content_type=content_type, sub_code=40401)
self.not_found(message=f"Partition {partition_id} not found: {e}", content_type=content_type, sub_code=40402)
self.not_found(
message=f"Topic {topic} not found: {e}",
content_type=content_type,
sub_code=RESTErrorCodes.TOPIC_NOT_FOUND.value,
)
self.not_found(
message=f"Partition {partition_id} not found: {e}",
content_type=content_type,
sub_code=RESTErrorCodes.PARTITION_NOT_FOUND.value,
)

def list_brokers(self, content_type: str):
metadata = self.cluster_metadata()
Expand Down
5 changes: 3 additions & 2 deletions karapace/kafka_rest_apis/error_codes.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
from enum import Enum, unique
from enum import Enum
from http import HTTPStatus


@unique
class RESTErrorCodes(Enum):
HTTP_BAD_REQUEST = HTTPStatus.BAD_REQUEST.value
HTTP_NOT_FOUND = HTTPStatus.NOT_FOUND.value
HTTP_INTERNAL_SERVER_ERROR = HTTPStatus.INTERNAL_SERVER_ERROR.value
HTTP_UNPROCESSABLE_ENTITY = HTTPStatus.UNPROCESSABLE_ENTITY.value
TOPIC_NOT_FOUND = 40401
PARTITION_NOT_FOUND = 40402
CONSUMER_NOT_FOUND = 40403
UNKNOWN_TOPIC_OR_PARTITION = 40403
UNSUPPORTED_FORMAT = 40601
SCHEMA_RETRIEVAL_ERROR = 40801
CONSUMER_ALREADY_EXISTS = 40902
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ async def test_topics(rest_async_client, admin_client):
assert data["partitions"][0]["replicas"][0]["in_sync"], "Replica should be in sync"
res = await rest_async_client.get(f"/topics/{topic_foo}")
assert res.status_code == 404, f"Topic {topic_foo} should not exist, status_code={res.status_code}"
assert res.json()["error_code"] == 40401, "Error code does not match"
assert res.json()["error_code"] == 40403, "Error code does not match"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to update the whole test_rest.py right now, there is lots of places using the hardcoded constant and not the enum there.



async def test_publish(rest_async_client, admin_client):
Expand Down