Skip to content

Commit

Permalink
Fixing bug where the lastRefreshInterval was not properly set. (Azure…
Browse files Browse the repository at this point in the history
…#4069)

* Fixing bug where the lastRefreshInterval was not properly set.
  • Loading branch information
conniey authored and sima-zhu committed Jun 29, 2019
1 parent 02e6334 commit 7fd418f
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,35 +64,33 @@ Flux<AmqpResponseCode> getAuthorizationResults() {

/**
* Invokes an authorization call on the CBS node.
*
* @return A Mono that completes with the milliseconds corresponding to when this token should be refreshed.
*/
Mono<Void> authorize() {
Mono<Long> authorize() {
if (hasDisposed.get()) {
return Mono.error(new AzureException("Cannot authorize with CBS node when this token manager has been disposed of."));
}

return cbsNode.flatMap(cbsNode -> cbsNode.authorize(tokenAudience))
.map(expiresOn -> {
if (!hasScheduled.getAndSet(true)) {
logger.asInfo().log("Scheduling refresh token.");
Duration between = Duration.between(OffsetDateTime.now(ZoneOffset.UTC), expiresOn);
final Duration between = Duration.between(OffsetDateTime.now(ZoneOffset.UTC), expiresOn);

// We want to refresh the token when 90% of the time before expiry has elapsed.
long refreshSeconds = (long) Math.floor(between.getSeconds() * 0.9);
long refreshIntervalMS = refreshSeconds * 1000;
// We want to refresh the token when 90% of the time before expiry has elapsed.
final long refreshSeconds = (long) Math.floor(between.getSeconds() * 0.9);
// This converts it to milliseconds
final long refreshIntervalMS = refreshSeconds * 1000;

lastRefreshInterval.set(refreshIntervalMS);
lastRefreshInterval.set(refreshIntervalMS);

// This converts it to milliseconds
try {
this.timer.schedule(new RefreshAuthorizationToken(), refreshIntervalMS);
} catch (IllegalStateException e) {
logger.asWarning().log("Unable to reschedule timer task.", e);
hasScheduled.set(false);
}
// If this is the first time authorize is called, the task will not have been scheduled yet.
if (!hasScheduled.getAndSet(true)) {
logger.asInfo().log("Scheduling refresh token task.");
scheduleRefreshTokenTask(refreshIntervalMS);
}

return expiresOn;
}).then();
return refreshIntervalMS;
});
}

@Override
Expand All @@ -106,39 +104,42 @@ public void close() {
}
}

private void scheduleRefreshTokenTask(Long refreshIntervalInMS) {
try {
timer.schedule(new RefreshAuthorizationToken(), refreshIntervalInMS);
} catch (IllegalStateException e) {
logger.asWarning().log("Unable to schedule RefreshAuthorizationToken task.", e);
hasScheduled.set(false);
}
}

private class RefreshAuthorizationToken extends TimerTask {
@Override
public void run() {
logger.asInfo().log("Refreshing authorization token.");
authorize().subscribe(
(Void response) -> {
logger.asInfo().log("Response acquired.");
}, error -> {
if ((error instanceof AmqpException) && ((AmqpException) error).isTransient()) {
logger.asError().log("Error is transient. Rescheduling authorization task.", error);
timer.schedule(new RefreshAuthorizationToken(), lastRefreshInterval.get());
} else {
logger.asError().log("Error occurred while refreshing token that is not retriable. Not scheduling"
+ " refresh task. Use ActiveClientTokenManager.authorize() to schedule task again.", error);
hasScheduled.set(false);
}
(Long refreshIntervalInMS) -> {

sink.error(error);
}, () -> {
if (hasDisposed.get()) {
logger.asInfo().log("Token manager has been disposed of. Not rescheduling.");
return;
}

logger.asInfo().log("Success. Rescheduling refresh authorization task.");
logger.asInfo().log("Authorization successful. Refreshing token in {} ms.", refreshIntervalInMS);
sink.next(AmqpResponseCode.ACCEPTED);

try {
timer.schedule(new RefreshAuthorizationToken(), lastRefreshInterval.get());
} catch (IllegalStateException e) {
logger.asWarning().log("Unable to reschedule timer task.", e);
scheduleRefreshTokenTask(refreshIntervalInMS);
}, error -> {
if ((error instanceof AmqpException) && ((AmqpException) error).isTransient()) {
logger.asError().log("Error is transient. Rescheduling authorization task.", error);
scheduleRefreshTokenTask(lastRefreshInterval.get());
} else {
logger.asError().log("Error occurred while refreshing token that is not retriable. Not scheduling"
+ " refresh task. Use ActiveClientTokenManager.authorize() to schedule task again.", error);
hasScheduled.set(false);
}

sink.error(error);
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,9 @@ public void getAuthorizationResultsRetriableError() {
final AmqpException error = new AmqpException(true, ErrorCondition.TIMEOUT_ERROR, "Timed out",
new ErrorContext("Test-context-namespace"));

when(cbsNode.authorize(any())).thenReturn(getNextExpiration(2), Mono.error(error),
getNextExpiration(5), getNextExpiration(5),
getNextExpiration(45), getNextExpiration(60));
when(cbsNode.authorize(any())).thenReturn(getNextExpiration(3), Mono.error(error),
getNextExpiration(5), getNextExpiration(10),
getNextExpiration(45));

// Act & Assert
try (ActiveClientTokenManager tokenManager = new ActiveClientTokenManager(cbsNodeMono, AUDIENCE)) {
Expand Down

0 comments on commit 7fd418f

Please sign in to comment.