Skip to content

Commit

Permalink
[C++] Handle OAuth 2.0 exceptional cases gracefully (apache#12335)
Browse files Browse the repository at this point in the history
Fixes apache#12324

### Motivation

Currently if any error happened during OAuth 2.0 authentication in C++ client, a runtime error would be thrown and could only be caught when creating an `AuthOauth` object, but could not be caught in `Client`'s method like `createProducer`. It's not graceful. What's worse, there's no way for Python client that is a wrapper of C++ client to caught this exception.

### Modifications

When `ClientCredentialFlow::authenticate` returns an invalid `Oauth2TokenResult`, catch the `runtime_error` thrown in `Oauth2CachedToken`'s constructor and returns `ResultAuthenticationError` as `AuthOauth2::getAuthData`'s returned value. Since `getAuthData` always returns `ResultOk` before this PR, the related docs are also modified.

Then when a CONNECT or AUTH_RESPONSE command is created, expose the result of `getAuthData`. If it's not `ResultOk`, close the connection and complete the connection's future with the result. After that, the `Client`'s API will be completed with the result.

In addition, this PR also makes the error code of libcurl human readable by configuring `CURLOPT_ERRORBUFFER`.

### Verifying this change

- [x] Make sure that the change passes the CI checks.

This change added tests `AuthPluginTest.testOauth2Failure` to verify when OAuth 2.0 authentication failed, the `createProducer` would return `ResultAuthenticationError` without any exception thrown.
  • Loading branch information
BewareMyPower authored and ciaocloud committed Oct 28, 2021
1 parent 367ee25 commit b159b68
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 32 deletions.
6 changes: 3 additions & 3 deletions pulsar-client-cpp/include/pulsar/Authentication.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class PULSAR_PUBLIC Authentication {
*
* @param[out] authDataContent the shared pointer of AuthenticationData. The content of AuthenticationData
* is changed to the internal data of the current instance.
* @return ResultOk
* @return ResultOk or ResultAuthenticationError if authentication failed
*/
virtual Result getAuthData(AuthenticationDataPtr& authDataContent) {
authDataContent = authData_;
Expand Down Expand Up @@ -450,7 +450,7 @@ class CachedToken {
/**
* Get AuthenticationData from the current instance
*
* @return ResultOk
* @return ResultOk or ResultAuthenticationError if authentication failed
*/
virtual AuthenticationDataPtr getAuthData() = 0;

Expand Down Expand Up @@ -504,7 +504,7 @@ class PULSAR_PUBLIC AuthOauth2 : public Authentication {
*
* @param[out] authDataOauth2 the shared pointer of AuthenticationData. The content of AuthenticationData
* is changed to the internal data of the current instance.
* @return ResultOk
* @return ResultOk or ResultAuthenticationError if authentication failed
*/
Result getAuthData(AuthenticationDataPtr& authDataOauth2);

Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/BinaryProtoLookupService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ void BinaryProtoLookupService::sendPartitionMetadataLookupRequest(const std::str
const ClientConnectionWeakPtr& clientCnx,
LookupDataResultPromisePtr promise) {
if (result != ResultOk) {
promise->setFailed(ResultConnectError);
promise->setFailed(result);
Future<Result, LookupDataResultPtr> future = promise->getFuture();
return;
}
Expand Down
21 changes: 17 additions & 4 deletions pulsar-client-cpp/lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,14 @@ void ClientConnection::handleHandshake(const boost::system::error_code& err) {
}

bool connectingThroughProxy = logicalAddress_ != physicalAddress_;
SharedBuffer buffer = Commands::newConnect(authentication_, logicalAddress_, connectingThroughProxy);
Result result = ResultOk;
SharedBuffer buffer =
Commands::newConnect(authentication_, logicalAddress_, connectingThroughProxy, result);
if (result != ResultOk) {
LOG_ERROR(cnxString_ << "Failed to establish connection: " << result);
close(result);
return;
}
// Send CONNECT command to broker
asyncWrite(buffer.const_asio_buffer(), std::bind(&ClientConnection::handleSentPulsarConnect,
shared_from_this(), std::placeholders::_1, buffer));
Expand Down Expand Up @@ -1144,7 +1151,13 @@ void ClientConnection::handleIncomingCommand() {
case BaseCommand::AUTH_CHALLENGE: {
LOG_DEBUG(cnxString_ << "Received auth challenge from broker");

SharedBuffer buffer = Commands::newAuthResponse(authentication_);
Result result;
SharedBuffer buffer = Commands::newAuthResponse(authentication_, result);
if (result != ResultOk) {
LOG_ERROR(cnxString_ << "Failed to send auth response: " << result);
close(result);
break;
}
asyncWrite(buffer.const_asio_buffer(),
std::bind(&ClientConnection::handleSentAuthResponse, shared_from_this(),
std::placeholders::_1, buffer));
Expand Down Expand Up @@ -1472,7 +1485,7 @@ void ClientConnection::handleConsumerStatsTimeout(const boost::system::error_cod
startConsumerStatsTimer(consumerStatsRequests);
}

void ClientConnection::close() {
void ClientConnection::close(Result result) {
Lock lock(mutex_);
if (isClosed()) {
return;
Expand Down Expand Up @@ -1529,7 +1542,7 @@ void ClientConnection::close() {
HandlerBase::handleDisconnection(ResultConnectError, shared_from_this(), it->second);
}

connectPromise_.setFailed(ResultConnectError);
connectPromise_.setFailed(result);

// Fail all pending requests, all these type are map whose value type contains the Promise object
for (auto& kv : pendingRequests) {
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
*/
void tcpConnectAsync();

void close();
void close(Result result = ResultConnectError);

bool isClosed() const;

Expand Down
18 changes: 14 additions & 4 deletions pulsar-client-cpp/lib/Commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ PairSharedBuffer Commands::newSend(SharedBuffer& headers, BaseCommand& cmd, uint
}

SharedBuffer Commands::newConnect(const AuthenticationPtr& authentication, const std::string& logicalAddress,
bool connectingThroughProxy) {
bool connectingThroughProxy, Result& result) {
BaseCommand cmd;
cmd.set_type(BaseCommand::CONNECT);
CommandConnect* connect = cmd.mutable_connect();
Expand All @@ -228,13 +228,18 @@ SharedBuffer Commands::newConnect(const AuthenticationPtr& authentication, const
}

AuthenticationDataPtr authDataContent;
if (authentication->getAuthData(authDataContent) == ResultOk && authDataContent->hasDataFromCommand()) {
result = authentication->getAuthData(authDataContent);
if (result != ResultOk) {
return SharedBuffer{};
}

if (authDataContent->hasDataFromCommand()) {
connect->set_auth_data(authDataContent->getCommandData());
}
return writeMessageWithSize(cmd);
}

SharedBuffer Commands::newAuthResponse(const AuthenticationPtr& authentication) {
SharedBuffer Commands::newAuthResponse(const AuthenticationPtr& authentication, Result& result) {
BaseCommand cmd;
cmd.set_type(BaseCommand::AUTH_RESPONSE);
CommandAuthResponse* authResponse = cmd.mutable_authresponse();
Expand All @@ -244,7 +249,12 @@ SharedBuffer Commands::newAuthResponse(const AuthenticationPtr& authentication)
authData->set_auth_method_name(authentication->getAuthMethodName());

AuthenticationDataPtr authDataContent;
if (authentication->getAuthData(authDataContent) == ResultOk && authDataContent->hasDataFromCommand()) {
result = authentication->getAuthData(authDataContent);
if (result != ResultOk) {
return SharedBuffer{};
}

if (authDataContent->hasDataFromCommand()) {
authData->set_auth_data(authDataContent->getCommandData());
}

Expand Down
4 changes: 2 additions & 2 deletions pulsar-client-cpp/lib/Commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ class Commands {
const static int checksumSize = 4;

static SharedBuffer newConnect(const AuthenticationPtr& authentication, const std::string& logicalAddress,
bool connectingThroughProxy);
bool connectingThroughProxy, Result& result);

static SharedBuffer newAuthResponse(const AuthenticationPtr& authentication);
static SharedBuffer newAuthResponse(const AuthenticationPtr& authentication, Result& result);

static SharedBuffer newPartitionMetadataRequest(const std::string& topic, uint64_t requestId);

Expand Down
5 changes: 1 addition & 4 deletions pulsar-client-cpp/lib/HTTPLookupService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,7 @@ Result HTTPLookupService::sendHTTPRequest(const std::string completeUrl, std::st
AuthenticationDataPtr authDataContent;
Result authResult = authenticationPtr_->getAuthData(authDataContent);
if (authResult != ResultOk) {
LOG_ERROR(
"All Authentication methods should have AuthenticationData and return true on getAuthData for "
"url "
<< completeUrl);
LOG_ERROR("Failed to getAuthData: " << authResult);
curl_easy_cleanup(handle);
return authResult;
}
Expand Down
42 changes: 32 additions & 10 deletions pulsar-client-cpp/lib/auth/AuthOauth2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ static size_t curlWriteCallback(void* contents, size_t size, size_t nmemb, void*
}

void ClientCredentialFlow::initialize() {
if (issuerUrl_.empty()) {
LOG_ERROR("Failed to initialize ClientCredentialFlow: issuer_url is not set");
return;
}
if (!keyFile_.isValid()) {
return;
}
Expand Down Expand Up @@ -189,6 +193,9 @@ void ClientCredentialFlow::initialize() {
curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 0L);
curl_easy_setopt(handle, CURLOPT_SSL_VERIFYHOST, 0L);

char errorBuffer[CURL_ERROR_SIZE];
curl_easy_setopt(handle, CURLOPT_ERRORBUFFER, errorBuffer);

// Make get call to server
res = curl_easy_perform(handle);

Expand Down Expand Up @@ -218,8 +225,8 @@ void ClientCredentialFlow::initialize() {
}
break;
default:
LOG_ERROR("Response failed for getting the well-known configuration " << issuerUrl_
<< ". Error Code " << res);
LOG_ERROR("Response failed for getting the well-known configuration "
<< issuerUrl_ << ". Error Code " << res << ": " << errorBuffer);
break;
}
// Free header list
Expand Down Expand Up @@ -283,6 +290,9 @@ Oauth2TokenResultPtr ClientCredentialFlow::authenticate() {

curl_easy_setopt(handle, CURLOPT_POSTFIELDS, jsonBody.c_str());

char errorBuffer[CURL_ERROR_SIZE];
curl_easy_setopt(handle, CURLOPT_ERRORBUFFER, errorBuffer);

// Make get call to server
res = curl_easy_perform(handle);

Expand All @@ -303,19 +313,26 @@ Oauth2TokenResultPtr ClientCredentialFlow::authenticate() {
break;
}

resultPtr->setAccessToken(root.get<std::string>("access_token"));
resultPtr->setExpiresIn(root.get<uint32_t>("expires_in"));

LOG_DEBUG("access_token: " << resultPtr->getAccessToken()
<< " expires_in: " << resultPtr->getExpiresIn());
resultPtr->setAccessToken(root.get<std::string>("access_token", ""));
resultPtr->setExpiresIn(
root.get<uint32_t>("expires_in", Oauth2TokenResult::undefined_expiration));
resultPtr->setRefreshToken(root.get<std::string>("refresh_token", ""));
resultPtr->setIdToken(root.get<std::string>("id_token", ""));

if (!resultPtr->getAccessToken().empty()) {
LOG_DEBUG("access_token: " << resultPtr->getAccessToken()
<< " expires_in: " << resultPtr->getExpiresIn());
} else {
LOG_ERROR("Response doesn't contain access_token, the response is: " << responseData);
}
} else {
LOG_ERROR("Response failed for issuerurl " << issuerUrl_ << ". response Code "
<< response_code << " passedin: " << jsonBody);
}
break;
default:
LOG_ERROR("Response failed for issuerurl " << issuerUrl_ << ". Error Code " << res
<< " passedin: " << jsonBody);
LOG_ERROR("Response failed for issuerurl " << issuerUrl_ << ". ErrorCode " << res << ": "
<< errorBuffer << " passedin: " << jsonBody);
break;
}
// Free header list
Expand Down Expand Up @@ -363,7 +380,12 @@ const std::string AuthOauth2::getAuthMethodName() const { return "token"; }

Result AuthOauth2::getAuthData(AuthenticationDataPtr& authDataContent) {
if (cachedTokenPtr_ == nullptr || cachedTokenPtr_->isExpired()) {
cachedTokenPtr_ = CachedTokenPtr(new Oauth2CachedToken(flowPtr_->authenticate()));
try {
cachedTokenPtr_ = CachedTokenPtr(new Oauth2CachedToken(flowPtr_->authenticate()));
} catch (const std::runtime_error& e) {
// The real error logs have already been printed in authenticate()
return ResultAuthenticationError;
}
}

authDataContent = cachedTokenPtr_->getAuthData();
Expand Down
50 changes: 47 additions & 3 deletions pulsar-client-cpp/tests/AuthPluginTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -366,9 +366,7 @@ TEST(AuthPluginTest, testOauth2WrongSecret) {
LOG_INFO("PARAMS: " << params);
pulsar::AuthenticationPtr auth = pulsar::AuthOauth2::create(params);
ASSERT_EQ(auth->getAuthMethodName(), "token");

EXPECT_THROW(auth->getAuthData(data), std::runtime_error)
<< "Expected fail for wrong secret when to get token from server";
ASSERT_EQ(auth->getAuthData(data), ResultAuthenticationError);
}

TEST(AuthPluginTest, testOauth2CredentialFile) {
Expand Down Expand Up @@ -420,3 +418,49 @@ TEST(AuthPluginTest, testOauth2RequestBody) {
ClientCredentialFlow flow2(params);
ASSERT_EQ(flow2.generateJsonBody(), expectedJson);
}

TEST(AuthPluginTest, testOauth2Failure) {
ParamMap params;
auto addKeyValue = [&](const std::string& key, const std::string& value) {
params[key] = value;
LOG_INFO("Configure \"" << key << "\" to \"" << value << "\"");
};

auto createClient = [&]() -> Client {
ClientConfiguration conf;
conf.setAuth(AuthOauth2::create(params));
return {"pulsar://localhost:6650", conf};
};

const std::string topic = "AuthPluginTest-testOauth2Failure";
Producer producer;

// No issuer_url
auto client1 = createClient();
ASSERT_EQ(client1.createProducer(topic, producer), ResultAuthenticationError);
client1.close();

// Invalid issuer_url
addKeyValue("issuer_url", "hello");
auto client2 = createClient();
ASSERT_EQ(client2.createProducer(topic, producer), ResultAuthenticationError);
client2.close();

addKeyValue("issuer_url", "https://google.com");
auto client3 = createClient();
ASSERT_EQ(client3.createProducer(topic, producer), ResultAuthenticationError);
client3.close();

// No client id and secret
addKeyValue("issuer_url", "https://dev-kt-aa9ne.us.auth0.com");
auto client4 = createClient();
ASSERT_EQ(client4.createProducer(topic, producer), ResultAuthenticationError);
client4.close();

// Invalid client_id and client_secret
addKeyValue("client_id", "my_id");
addKeyValue("client_secret", "my-secret");
auto client5 = createClient();
ASSERT_EQ(client5.createProducer(topic, producer), ResultAuthenticationError);
client5.close();
}

0 comments on commit b159b68

Please sign in to comment.