Skip to content

Commit

Permalink
Merge pull request #1104 from prithvijit-dasgupta/master
Browse files Browse the repository at this point in the history
createTopic error restructure
  • Loading branch information
Nevon authored May 24, 2021
2 parents 906668d + eab1e5a commit 7292734
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 16 deletions.
37 changes: 35 additions & 2 deletions src/admin/__tests__/createTopics.spec.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
const createAdmin = require('../index')
const { KafkaJSProtocolError } = require('../../errors')
const {
KafkaJSProtocolError,
KafkaJSAggregateError,
KafkaJSCreateTopicError,
} = require('../../errors')
const { createErrorFromCode } = require('../../protocol/error')

const { secureRandom, createCluster, newLogger } = require('testHelpers')

const NOT_CONTROLLER = 41
const TOPIC_ALREADY_EXISTS = 36
const INVALID_TOPIC_EXCEPTION = 17

describe('Admin', () => {
let topicName, admin
Expand Down Expand Up @@ -93,7 +98,9 @@ describe('Admin', () => {
cluster.refreshMetadata = jest.fn()
cluster.findControllerBroker = jest.fn(() => broker)
broker.createTopics.mockImplementationOnce(() => {
throw new KafkaJSProtocolError(createErrorFromCode(TOPIC_ALREADY_EXISTS))
throw new KafkaJSAggregateError('error', [
new KafkaJSCreateTopicError(createErrorFromCode(TOPIC_ALREADY_EXISTS), topicName),
])
})

admin = createAdmin({ cluster, logger: newLogger() })
Expand Down Expand Up @@ -132,5 +139,31 @@ describe('Admin', () => {
expect(broker.metadata).toHaveBeenCalledTimes(1)
expect(broker.metadata).toHaveBeenCalledWith([topicName, topic2, topic3])
})

test('forward non ignorable errors with topic name metadata', async () => {
const cluster = createCluster()
const broker = { createTopics: jest.fn(), metadata: jest.fn(() => true) }

cluster.refreshMetadata = jest.fn()
cluster.findControllerBroker = jest.fn(() => broker)

broker.createTopics.mockImplementationOnce(() => {
throw new KafkaJSAggregateError('error', [
new KafkaJSCreateTopicError(createErrorFromCode(INVALID_TOPIC_EXCEPTION), topicName),
])
})
admin = createAdmin({ cluster, logger: newLogger() })

await expect(
admin.createTopics({
waitForLeaders: true,
topics: [{ topic: topicName }],
})
).rejects.toBeInstanceOf(KafkaJSAggregateError)

expect(cluster.refreshMetadata).toHaveBeenCalledTimes(1)
expect(cluster.findControllerBroker).toHaveBeenCalledTimes(1)
expect(broker.createTopics).toHaveBeenCalledTimes(1)
})
})
})
7 changes: 5 additions & 2 deletions src/admin/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const {
KafkaJSDeleteGroupsError,
KafkaJSBrokerNotFound,
KafkaJSDeleteTopicRecordsError,
KafkaJSAggregateError,
} = require('../errors')
const { staleMetadata } = require('../protocol/error')
const CONFIG_RESOURCE_TYPES = require('../protocol/configResourceTypes')
Expand Down Expand Up @@ -155,8 +156,10 @@ module.exports = ({
throw e
}

if (e.type === 'TOPIC_ALREADY_EXISTS') {
return false
if (e instanceof KafkaJSAggregateError) {
if (e.errors.every(error => error.type === 'TOPIC_ALREADY_EXISTS')) {
return false
}
}

bail(e)
Expand Down
17 changes: 17 additions & 0 deletions src/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,21 @@ class KafkaJSInvalidLongError extends KafkaJSNonRetriableError {
}
}

class KafkaJSCreateTopicError extends KafkaJSProtocolError {
constructor(e, topicName) {
super(e)
this.topic = topicName
this.name = 'KafkaJSCreateTopicError'
}
}
class KafkaJSAggregateError extends Error {
constructor(message, errors) {
super(message)
this.errors = errors
this.name = 'KafkaJSAggregateError'
}
}

module.exports = {
KafkaJSError,
KafkaJSNonRetriableError,
Expand Down Expand Up @@ -252,4 +267,6 @@ module.exports = {
KafkaJSInvariantViolation,
KafkaJSInvalidVarIntError,
KafkaJSInvalidLongError,
KafkaJSCreateTopicError,
KafkaJSAggregateError,
}
8 changes: 7 additions & 1 deletion src/protocol/requests/createTopics/v0/response.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const Decoder = require('../../../decoder')
const { failure, createErrorFromCode } = require('../../../error')
const { KafkaJSAggregateError, KafkaJSCreateTopicError } = require('../../../../errors')

/**
* CreateTopics Response (Version: 0) => [topic_errors]
Expand All @@ -25,7 +26,12 @@ const decode = async rawData => {
const parse = async data => {
const topicsWithError = data.topicErrors.filter(({ errorCode }) => failure(errorCode))
if (topicsWithError.length > 0) {
throw createErrorFromCode(topicsWithError[0].errorCode)
throw new KafkaJSAggregateError(
'Topic creation errors',
topicsWithError.map(
error => new KafkaJSCreateTopicError(createErrorFromCode(error.errorCode), error.topic)
)
)
}

return data
Expand Down
13 changes: 2 additions & 11 deletions src/protocol/requests/createTopics/v1/response.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
const Decoder = require('../../../decoder')
const { failure, createErrorFromCode } = require('../../../error')
const { parse: parseV0 } = require('../v0/response')

/**
* CreateTopics Response (Version: 1) => [topic_errors]
Expand All @@ -24,16 +24,7 @@ const decode = async rawData => {
}
}

const parse = async data => {
const topicsWithError = data.topicErrors.filter(({ errorCode }) => failure(errorCode))
if (topicsWithError.length > 0) {
throw createErrorFromCode(topicsWithError[0].errorCode)
}

return data
}

module.exports = {
decode,
parse,
parse: parseV0,
}

0 comments on commit 7292734

Please sign in to comment.