Skip to content

Commit

Permalink
err: Add new exception
Browse files Browse the repository at this point in the history
1. Add a serverError `TransactionComponentLoadFailed`
2. Add a new PulsarClientException `TransactionComponentLoadFailedException`.
2. Add a new BrokerServiceException `TransactionComponentLoadFailedException`.
  • Loading branch information
liangyepianzhou committed Nov 16, 2023
1 parent f652bce commit 5d5b51f
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,13 @@ public TopicBacklogQuotaExceededException(BacklogQuota.RetentionPolicy retention
}
}

// Todo: clean up transaction exception to make the transactional exception follow up the way of other component.
public static class TransactionComponentLoadFailedException extends BrokerServiceException {
public TransactionComponentLoadFailedException(String msg) {
super(msg);
}
}

public static org.apache.pulsar.common.api.proto.ServerError getClientErrorCode(Throwable t) {
return getClientErrorCode(t, true);
}
Expand Down Expand Up @@ -277,6 +284,8 @@ private static ServerError getClientErrorCode(Throwable t, boolean checkCauseIfU
return ServerError.TransactionConflict;
} else if (t instanceof CoordinatorException.TransactionNotFoundException) {
return ServerError.TransactionNotFound;
} else if (t instanceof TransactionComponentLoadFailedException) {
return ServerError.TransactionComponentLoadFailed;
} else {
if (checkCauseIfUnknown) {
return getClientErrorCode(t.getCause(), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -943,6 +943,12 @@ public TransactionHasOperationFailedException(String msg) {
}
}

public static class TransactionComponentLoadFailedException extends PulsarClientException {
public TransactionComponentLoadFailedException(String msg) {
super(msg);
}
}

// wrap an exception to enriching more info messages.
public static Throwable wrap(Throwable t, String msg) {
msg += "\n" + t.getMessage();
Expand Down Expand Up @@ -1007,6 +1013,8 @@ public static Throwable wrap(Throwable t, String msg) {
return new TransactionConflictException(msg);
} else if (t instanceof TransactionHasOperationFailedException) {
return new TransactionHasOperationFailedException(msg);
} else if (t instanceof TransactionComponentLoadFailedException) {
return new TransactionComponentLoadFailedException(msg);
} else if (t instanceof PulsarClientException) {
return new PulsarClientException(msg);
} else if (t instanceof CompletionException) {
Expand Down Expand Up @@ -1107,6 +1115,8 @@ public static PulsarClientException unwrap(Throwable t) {
newException = new NotFoundException(msg);
} else if (cause instanceof TransactionHasOperationFailedException) {
newException = new TransactionHasOperationFailedException(msg);
} else if (cause instanceof TransactionComponentLoadFailedException) {
newException = new TransactionComponentLoadFailedException(msg);
} else {
newException = new PulsarClientException(t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1300,6 +1300,8 @@ public static PulsarClientException getPulsarClientException(ServerError error,
return new PulsarClientException.TransactionConflictException(errorMsg);
case ProducerFenced:
return new PulsarClientException.ProducerFencedException(errorMsg);
case TransactionComponentLoadFailed:
return new PulsarClientException.TransactionComponentLoadFailedException(errorMsg);
case UnknownError:
default:
return new PulsarClientException(errorMsg);
Expand Down
3 changes: 3 additions & 0 deletions pulsar-common/src/main/proto/PulsarApi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,9 @@ enum ServerError {
// use this error to indicate that this producer is now permanently
// fenced. Applications are now supposed to close it and create a
// new producer
TransactionComponentLoadFailed = 26; // When TransactionBuffer, TransactionPendingAck
// init failed with no-retryable exception.
// All operations related to this component should be failed with this error.
}

enum AuthMethod {
Expand Down

0 comments on commit 5d5b51f

Please sign in to comment.