diff --git a/eventhubs/client/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ActiveClientTokenManager.java b/eventhubs/client/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ActiveClientTokenManager.java index ef5a9291786b8..f48ad6210dbaf 100644 --- a/eventhubs/client/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ActiveClientTokenManager.java +++ b/eventhubs/client/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ActiveClientTokenManager.java @@ -64,35 +64,33 @@ Flux getAuthorizationResults() { /** * Invokes an authorization call on the CBS node. + * + * @return A Mono that completes with the milliseconds corresponding to when this token should be refreshed. */ - Mono authorize() { + Mono authorize() { if (hasDisposed.get()) { return Mono.error(new AzureException("Cannot authorize with CBS node when this token manager has been disposed of.")); } return cbsNode.flatMap(cbsNode -> cbsNode.authorize(tokenAudience)) .map(expiresOn -> { - if (!hasScheduled.getAndSet(true)) { - logger.asInfo().log("Scheduling refresh token."); - Duration between = Duration.between(OffsetDateTime.now(ZoneOffset.UTC), expiresOn); + final Duration between = Duration.between(OffsetDateTime.now(ZoneOffset.UTC), expiresOn); - // We want to refresh the token when 90% of the time before expiry has elapsed. - long refreshSeconds = (long) Math.floor(between.getSeconds() * 0.9); - long refreshIntervalMS = refreshSeconds * 1000; + // We want to refresh the token when 90% of the time before expiry has elapsed. + final long refreshSeconds = (long) Math.floor(between.getSeconds() * 0.9); + // This converts it to milliseconds + final long refreshIntervalMS = refreshSeconds * 1000; - lastRefreshInterval.set(refreshIntervalMS); + lastRefreshInterval.set(refreshIntervalMS); - // This converts it to milliseconds - try { - this.timer.schedule(new RefreshAuthorizationToken(), refreshIntervalMS); - } catch (IllegalStateException e) { - logger.asWarning().log("Unable to reschedule timer task.", e); - hasScheduled.set(false); - } + // If this is the first time authorize is called, the task will not have been scheduled yet. + if (!hasScheduled.getAndSet(true)) { + logger.asInfo().log("Scheduling refresh token task."); + scheduleRefreshTokenTask(refreshIntervalMS); } - return expiresOn; - }).then(); + return refreshIntervalMS; + }); } @Override @@ -106,39 +104,42 @@ public void close() { } } + private void scheduleRefreshTokenTask(Long refreshIntervalInMS) { + try { + timer.schedule(new RefreshAuthorizationToken(), refreshIntervalInMS); + } catch (IllegalStateException e) { + logger.asWarning().log("Unable to schedule RefreshAuthorizationToken task.", e); + hasScheduled.set(false); + } + } + private class RefreshAuthorizationToken extends TimerTask { @Override public void run() { logger.asInfo().log("Refreshing authorization token."); authorize().subscribe( - (Void response) -> { - logger.asInfo().log("Response acquired."); - }, error -> { - if ((error instanceof AmqpException) && ((AmqpException) error).isTransient()) { - logger.asError().log("Error is transient. Rescheduling authorization task.", error); - timer.schedule(new RefreshAuthorizationToken(), lastRefreshInterval.get()); - } else { - logger.asError().log("Error occurred while refreshing token that is not retriable. Not scheduling" - + " refresh task. Use ActiveClientTokenManager.authorize() to schedule task again.", error); - hasScheduled.set(false); - } + (Long refreshIntervalInMS) -> { - sink.error(error); - }, () -> { if (hasDisposed.get()) { logger.asInfo().log("Token manager has been disposed of. Not rescheduling."); return; } - logger.asInfo().log("Success. Rescheduling refresh authorization task."); + logger.asInfo().log("Authorization successful. Refreshing token in {} ms.", refreshIntervalInMS); sink.next(AmqpResponseCode.ACCEPTED); - try { - timer.schedule(new RefreshAuthorizationToken(), lastRefreshInterval.get()); - } catch (IllegalStateException e) { - logger.asWarning().log("Unable to reschedule timer task.", e); + scheduleRefreshTokenTask(refreshIntervalInMS); + }, error -> { + if ((error instanceof AmqpException) && ((AmqpException) error).isTransient()) { + logger.asError().log("Error is transient. Rescheduling authorization task.", error); + scheduleRefreshTokenTask(lastRefreshInterval.get()); + } else { + logger.asError().log("Error occurred while refreshing token that is not retriable. Not scheduling" + + " refresh task. Use ActiveClientTokenManager.authorize() to schedule task again.", error); hasScheduled.set(false); } + + sink.error(error); }); } } diff --git a/eventhubs/client/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/ActiveClientTokenManagerTest.java b/eventhubs/client/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/ActiveClientTokenManagerTest.java index 7aebfc762b9c8..6b3e4191fa3b9 100644 --- a/eventhubs/client/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/ActiveClientTokenManagerTest.java +++ b/eventhubs/client/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/ActiveClientTokenManagerTest.java @@ -119,9 +119,9 @@ public void getAuthorizationResultsRetriableError() { final AmqpException error = new AmqpException(true, ErrorCondition.TIMEOUT_ERROR, "Timed out", new ErrorContext("Test-context-namespace")); - when(cbsNode.authorize(any())).thenReturn(getNextExpiration(2), Mono.error(error), - getNextExpiration(5), getNextExpiration(5), - getNextExpiration(45), getNextExpiration(60)); + when(cbsNode.authorize(any())).thenReturn(getNextExpiration(3), Mono.error(error), + getNextExpiration(5), getNextExpiration(10), + getNextExpiration(45)); // Act & Assert try (ActiveClientTokenManager tokenManager = new ActiveClientTokenManager(cbsNodeMono, AUDIENCE)) {