Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++] Handle OAuth 2.0 exceptional cases gracefully #12335

Merged
6 changes: 3 additions & 3 deletions pulsar-client-cpp/include/pulsar/Authentication.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class PULSAR_PUBLIC Authentication {
*
* @param[out] authDataContent the shared pointer of AuthenticationData. The content of AuthenticationData
* is changed to the internal data of the current instance.
* @return ResultOk
* @return ResultOk or ResultAuthenticationError if authentication failed
*/
virtual Result getAuthData(AuthenticationDataPtr& authDataContent) {
authDataContent = authData_;
Expand Down Expand Up @@ -450,7 +450,7 @@ class CachedToken {
/**
* Get AuthenticationData from the current instance
*
* @return ResultOk
* @return ResultOk or ResultAuthenticationError if authentication failed
*/
virtual AuthenticationDataPtr getAuthData() = 0;

Expand Down Expand Up @@ -504,7 +504,7 @@ class PULSAR_PUBLIC AuthOauth2 : public Authentication {
*
* @param[out] authDataOauth2 the shared pointer of AuthenticationData. The content of AuthenticationData
* is changed to the internal data of the current instance.
* @return ResultOk
* @return ResultOk or ResultAuthenticationError if authentication failed
*/
Result getAuthData(AuthenticationDataPtr& authDataOauth2);

Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/BinaryProtoLookupService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ void BinaryProtoLookupService::sendPartitionMetadataLookupRequest(const std::str
const ClientConnectionWeakPtr& clientCnx,
LookupDataResultPromisePtr promise) {
if (result != ResultOk) {
promise->setFailed(ResultConnectError);
promise->setFailed(result);
Future<Result, LookupDataResultPtr> future = promise->getFuture();
return;
}
Expand Down
21 changes: 17 additions & 4 deletions pulsar-client-cpp/lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,14 @@ void ClientConnection::handleHandshake(const boost::system::error_code& err) {
}

bool connectingThroughProxy = logicalAddress_ != physicalAddress_;
SharedBuffer buffer = Commands::newConnect(authentication_, logicalAddress_, connectingThroughProxy);
Result result = ResultOk;
SharedBuffer buffer =
Commands::newConnect(authentication_, logicalAddress_, connectingThroughProxy, result);
if (result != ResultOk) {
LOG_ERROR(cnxString_ << "Failed to establish connection: " << result);
close(result);
return;
}
// Send CONNECT command to broker
asyncWrite(buffer.const_asio_buffer(), std::bind(&ClientConnection::handleSentPulsarConnect,
shared_from_this(), std::placeholders::_1, buffer));
Expand Down Expand Up @@ -1144,7 +1151,13 @@ void ClientConnection::handleIncomingCommand() {
case BaseCommand::AUTH_CHALLENGE: {
LOG_DEBUG(cnxString_ << "Received auth challenge from broker");

SharedBuffer buffer = Commands::newAuthResponse(authentication_);
Result result;
SharedBuffer buffer = Commands::newAuthResponse(authentication_, result);
if (result != ResultOk) {
LOG_ERROR(cnxString_ << "Failed to send auth response: " << result);
close(result);
break;
}
asyncWrite(buffer.const_asio_buffer(),
std::bind(&ClientConnection::handleSentAuthResponse, shared_from_this(),
std::placeholders::_1, buffer));
Expand Down Expand Up @@ -1472,7 +1485,7 @@ void ClientConnection::handleConsumerStatsTimeout(const boost::system::error_cod
startConsumerStatsTimer(consumerStatsRequests);
}

void ClientConnection::close() {
void ClientConnection::close(Result result) {
Lock lock(mutex_);
if (isClosed()) {
return;
Expand Down Expand Up @@ -1529,7 +1542,7 @@ void ClientConnection::close() {
HandlerBase::handleDisconnection(ResultConnectError, shared_from_this(), it->second);
}

connectPromise_.setFailed(ResultConnectError);
connectPromise_.setFailed(result);

// Fail all pending requests, all these type are map whose value type contains the Promise object
for (auto& kv : pendingRequests) {
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
*/
void tcpConnectAsync();

void close();
void close(Result result = ResultConnectError);

bool isClosed() const;

Expand Down
18 changes: 14 additions & 4 deletions pulsar-client-cpp/lib/Commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ PairSharedBuffer Commands::newSend(SharedBuffer& headers, BaseCommand& cmd, uint
}

SharedBuffer Commands::newConnect(const AuthenticationPtr& authentication, const std::string& logicalAddress,
bool connectingThroughProxy) {
bool connectingThroughProxy, Result& result) {
BaseCommand cmd;
cmd.set_type(BaseCommand::CONNECT);
CommandConnect* connect = cmd.mutable_connect();
Expand All @@ -228,13 +228,18 @@ SharedBuffer Commands::newConnect(const AuthenticationPtr& authentication, const
}

AuthenticationDataPtr authDataContent;
if (authentication->getAuthData(authDataContent) == ResultOk && authDataContent->hasDataFromCommand()) {
result = authentication->getAuthData(authDataContent);
if (result != ResultOk) {
return SharedBuffer::allocate(0);
BewareMyPower marked this conversation as resolved.
Show resolved Hide resolved
}

if (authDataContent->hasDataFromCommand()) {
connect->set_auth_data(authDataContent->getCommandData());
}
return writeMessageWithSize(cmd);
}

SharedBuffer Commands::newAuthResponse(const AuthenticationPtr& authentication) {
SharedBuffer Commands::newAuthResponse(const AuthenticationPtr& authentication, Result& result) {
BaseCommand cmd;
cmd.set_type(BaseCommand::AUTH_RESPONSE);
CommandAuthResponse* authResponse = cmd.mutable_authresponse();
Expand All @@ -244,7 +249,12 @@ SharedBuffer Commands::newAuthResponse(const AuthenticationPtr& authentication)
authData->set_auth_method_name(authentication->getAuthMethodName());

AuthenticationDataPtr authDataContent;
if (authentication->getAuthData(authDataContent) == ResultOk && authDataContent->hasDataFromCommand()) {
result = authentication->getAuthData(authDataContent);
if (result != ResultOk) {
return SharedBuffer::allocate(0);
}

if (authDataContent->hasDataFromCommand()) {
authData->set_auth_data(authDataContent->getCommandData());
}

Expand Down
4 changes: 2 additions & 2 deletions pulsar-client-cpp/lib/Commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ class Commands {
const static int checksumSize = 4;

static SharedBuffer newConnect(const AuthenticationPtr& authentication, const std::string& logicalAddress,
bool connectingThroughProxy);
bool connectingThroughProxy, Result& result);

static SharedBuffer newAuthResponse(const AuthenticationPtr& authentication);
static SharedBuffer newAuthResponse(const AuthenticationPtr& authentication, Result& result);

static SharedBuffer newPartitionMetadataRequest(const std::string& topic, uint64_t requestId);

Expand Down
5 changes: 1 addition & 4 deletions pulsar-client-cpp/lib/HTTPLookupService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,7 @@ Result HTTPLookupService::sendHTTPRequest(const std::string completeUrl, std::st
AuthenticationDataPtr authDataContent;
Result authResult = authenticationPtr_->getAuthData(authDataContent);
if (authResult != ResultOk) {
LOG_ERROR(
"All Authentication methods should have AuthenticationData and return true on getAuthData for "
"url "
<< completeUrl);
LOG_ERROR("Failed to getAuthData: " << authResult);
curl_easy_cleanup(handle);
return authResult;
}
Expand Down
42 changes: 32 additions & 10 deletions pulsar-client-cpp/lib/auth/AuthOauth2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ static size_t curlWriteCallback(void* contents, size_t size, size_t nmemb, void*
}

void ClientCredentialFlow::initialize() {
if (issuerUrl_.empty()) {
LOG_ERROR("Failed to initialize ClientCredentialFlow: issuer_url is not set");
return;
}
if (!keyFile_.isValid()) {
return;
}
Expand Down Expand Up @@ -189,6 +193,9 @@ void ClientCredentialFlow::initialize() {
curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 0L);
curl_easy_setopt(handle, CURLOPT_SSL_VERIFYHOST, 0L);

char errorBuffer[CURL_ERROR_SIZE];
curl_easy_setopt(handle, CURLOPT_ERRORBUFFER, errorBuffer);

// Make get call to server
res = curl_easy_perform(handle);

Expand Down Expand Up @@ -218,8 +225,8 @@ void ClientCredentialFlow::initialize() {
}
break;
default:
LOG_ERROR("Response failed for getting the well-known configuration " << issuerUrl_
<< ". Error Code " << res);
LOG_ERROR("Response failed for getting the well-known configuration "
<< issuerUrl_ << ". Error Code " << res << ": " << errorBuffer);
break;
}
// Free header list
Expand Down Expand Up @@ -283,6 +290,9 @@ Oauth2TokenResultPtr ClientCredentialFlow::authenticate() {

curl_easy_setopt(handle, CURLOPT_POSTFIELDS, jsonBody.c_str());

char errorBuffer[CURL_ERROR_SIZE];
curl_easy_setopt(handle, CURLOPT_ERRORBUFFER, errorBuffer);

// Make get call to server
res = curl_easy_perform(handle);

Expand All @@ -303,19 +313,26 @@ Oauth2TokenResultPtr ClientCredentialFlow::authenticate() {
break;
}

resultPtr->setAccessToken(root.get<std::string>("access_token"));
resultPtr->setExpiresIn(root.get<uint32_t>("expires_in"));

LOG_DEBUG("access_token: " << resultPtr->getAccessToken()
<< " expires_in: " << resultPtr->getExpiresIn());
resultPtr->setAccessToken(root.get<std::string>("access_token", ""));
resultPtr->setExpiresIn(
root.get<uint32_t>("expires_in", Oauth2TokenResult::undefined_expiration));
resultPtr->setRefreshToken(root.get<std::string>("refresh_token", ""));
resultPtr->setIdToken(root.get<std::string>("id_token", ""));

if (!resultPtr->getAccessToken().empty()) {
LOG_DEBUG("access_token: " << resultPtr->getAccessToken()
<< " expires_in: " << resultPtr->getExpiresIn());
} else {
LOG_ERROR("Response doesn't contain access_token, the response is: " << responseData);
}
} else {
LOG_ERROR("Response failed for issuerurl " << issuerUrl_ << ". response Code "
<< response_code << " passedin: " << jsonBody);
}
break;
default:
LOG_ERROR("Response failed for issuerurl " << issuerUrl_ << ". Error Code " << res
<< " passedin: " << jsonBody);
LOG_ERROR("Response failed for issuerurl " << issuerUrl_ << ". ErrorCode " << res << ": "
<< errorBuffer << " passedin: " << jsonBody);
break;
}
// Free header list
Expand Down Expand Up @@ -363,7 +380,12 @@ const std::string AuthOauth2::getAuthMethodName() const { return "token"; }

Result AuthOauth2::getAuthData(AuthenticationDataPtr& authDataContent) {
if (cachedTokenPtr_ == nullptr || cachedTokenPtr_->isExpired()) {
cachedTokenPtr_ = CachedTokenPtr(new Oauth2CachedToken(flowPtr_->authenticate()));
try {
cachedTokenPtr_ = CachedTokenPtr(new Oauth2CachedToken(flowPtr_->authenticate()));
} catch (const std::runtime_error& e) {
// The real error logs have already been printed in authenticate()
return ResultAuthenticationError;
}
}

authDataContent = cachedTokenPtr_->getAuthData();
Expand Down
50 changes: 47 additions & 3 deletions pulsar-client-cpp/tests/AuthPluginTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -366,9 +366,7 @@ TEST(AuthPluginTest, testOauth2WrongSecret) {
LOG_INFO("PARAMS: " << params);
pulsar::AuthenticationPtr auth = pulsar::AuthOauth2::create(params);
ASSERT_EQ(auth->getAuthMethodName(), "token");

EXPECT_THROW(auth->getAuthData(data), std::runtime_error)
<< "Expected fail for wrong secret when to get token from server";
ASSERT_EQ(auth->getAuthData(data), ResultAuthenticationError);
}

TEST(AuthPluginTest, testOauth2CredentialFile) {
Expand Down Expand Up @@ -420,3 +418,49 @@ TEST(AuthPluginTest, testOauth2RequestBody) {
ClientCredentialFlow flow2(params);
ASSERT_EQ(flow2.generateJsonBody(), expectedJson);
}

TEST(AuthPluginTest, testOauth2Failure) {
ParamMap params;
auto addKeyValue = [&](const std::string& key, const std::string& value) {
params[key] = value;
LOG_INFO("Configure \"" << key << "\" to \"" << value << "\"");
};

auto createClient = [&]() -> Client {
ClientConfiguration conf;
conf.setAuth(AuthOauth2::create(params));
return {"pulsar://localhost:6650", conf};
};

const std::string topic = "AuthPluginTest-testOauth2Failure";
Producer producer;

// No issuer_url
auto client1 = createClient();
ASSERT_EQ(client1.createProducer(topic, producer), ResultAuthenticationError);
client1.close();

// Invalid issuer_url
addKeyValue("issuer_url", "hello");
auto client2 = createClient();
ASSERT_EQ(client2.createProducer(topic, producer), ResultAuthenticationError);
client2.close();

addKeyValue("issuer_url", "https://google.com");
auto client3 = createClient();
ASSERT_EQ(client3.createProducer(topic, producer), ResultAuthenticationError);
client3.close();

// No client id and secret
addKeyValue("issuer_url", "https://dev-kt-aa9ne.us.auth0.com");
auto client4 = createClient();
ASSERT_EQ(client4.createProducer(topic, producer), ResultAuthenticationError);
client4.close();

// Invalid client_id and client_secret
addKeyValue("client_id", "my_id");
addKeyValue("client_secret", "my-secret");
auto client5 = createClient();
ASSERT_EQ(client5.createProducer(topic, producer), ResultAuthenticationError);
client5.close();
}