From 38cdeb9f038c9f15209646ffdb46cf988395bb76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Mon, 10 Jun 2024 15:24:17 +0200 Subject: [PATCH] chore: remove unused multiplexed session code Removes the unused code for multiplexed sessions in the session pool. All relevant code has been moved to the MultiplexedSessionDatabaseClient. --- .../com/google/cloud/spanner/SessionPool.java | 737 +----------------- .../MultiplexedSessionMaintainerTest.java | 310 -------- .../spanner/MultiplexedSessionPoolTest.java | 182 ----- .../google/cloud/spanner/SessionPoolTest.java | 11 - 4 files changed, 30 insertions(+), 1210 deletions(-) delete mode 100644 google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionMaintainerTest.java delete mode 100644 google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionPoolTest.java diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index f36da57a816..1819224495d 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -65,7 +65,6 @@ import com.google.cloud.spanner.SpannerImpl.ClosedException; import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -107,9 +106,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import org.threeten.bp.Duration; @@ -144,14 +144,6 @@ void maybeWaitOnMinSessions() { ErrorCode.DEADLINE_EXCEEDED, "Timed out after waiting " + timeoutMillis + "ms for session pool creation"); } - - if (useMultiplexedSessions() - && !waitOnMultiplexedSessionsLatch.await(timeoutNanos, TimeUnit.NANOSECONDS)) { - final long timeoutMillis = options.getWaitForMinSessions().toMillis(); - throw SpannerExceptionFactory.newSpannerException( - ErrorCode.DEADLINE_EXCEEDED, - "Timed out after waiting " + timeoutMillis + "ms for multiplexed session creation"); - } } catch (InterruptedException e) { throw SpannerExceptionFactory.propagateInterrupt(e); } @@ -241,7 +233,7 @@ public ApiFuture setCallback(Executor exec, ReadyCallback cb) { private AutoClosingReadContext( Function delegateSupplier, SessionPool sessionPool, - SessionReplacementHandler sessionReplacementHandler, + SessionReplacementHandler sessionReplacementHandler, I session, boolean isSingleUse) { this.readContextDelegateSupplier = delegateSupplier; @@ -554,7 +546,7 @@ private static class AutoClosingReadTransaction AutoClosingReadTransaction( Function txnSupplier, SessionPool sessionPool, - SessionReplacementHandler sessionReplacementHandler, + SessionReplacementHandler sessionReplacementHandler, I session, boolean isSingleUse) { super(txnSupplier, sessionPool, sessionReplacementHandler, session, isSingleUse); @@ -590,23 +582,6 @@ public PooledSessionFuture replaceSession( } } - static class MultiplexedSessionReplacementHandler - implements SessionReplacementHandler { - @Override - public MultiplexedSessionFuture replaceSession( - SessionNotFoundException e, MultiplexedSessionFuture session) { - /** - * For multiplexed sessions, we would never obtain a {@link SessionNotFoundException}. Hence, - * this method will ideally never be invoked. - */ - logger.log( - Level.WARNING, - String.format( - "Replace session invoked for multiplexed session => %s", session.getName())); - throw e; - } - } - interface SessionNotFoundHandler { /** * Handles the given {@link SessionNotFoundException} by possibly converting it to a different @@ -781,10 +756,13 @@ public ApiFuture bufferAsync(Iterable mutations) { return delegate.bufferAsync(mutations); } + @SuppressWarnings("deprecation") @Override public ResultSetStats analyzeUpdate( Statement statement, QueryAnalyzeMode analyzeMode, UpdateOption... options) { - return analyzeUpdateStatement(statement, analyzeMode, options).getStats(); + try (ResultSet resultSet = analyzeUpdateStatement(statement, analyzeMode, options)) { + return resultSet.getStats(); + } } @Override @@ -870,7 +848,7 @@ private static class AutoClosingTransactionManager AutoClosingTransactionManager( T session, - SessionReplacementHandler sessionReplacementHandler, + SessionReplacementHandler sessionReplacementHandler, TransactionOption... options) { this.session = session; this.options = options; @@ -1000,7 +978,7 @@ private static final class SessionPoolTransactionRunner private SessionPoolTransactionRunner( I session, - SessionReplacementHandler sessionReplacementHandler, + SessionReplacementHandler sessionReplacementHandler, TransactionOption... options) { this.session = session; this.options = options; @@ -1032,6 +1010,7 @@ public T run(TransactionCallable callable) { session.get().markUsed(); return result; } catch (SpannerException e) { + //noinspection ThrowableNotThrown session.get().setLastException(e); throw e; } finally { @@ -1064,7 +1043,7 @@ private static class SessionPoolAsyncRunner implements private SessionPoolAsyncRunner( I session, - SessionReplacementHandler sessionReplacementHandler, + SessionReplacementHandler sessionReplacementHandler, TransactionOption... options) { this.session = session; this.options = options; @@ -1100,7 +1079,6 @@ public ApiFuture runAsync(final AsyncWork work, Executor executor) { session = sessionReplacementHandler.replaceSession( (SessionNotFoundException) se, session); - se = null; } catch (SessionNotFoundException e) { exception = e; break; @@ -1266,39 +1244,6 @@ public PooledSessionFuture get() { } } - class MultiplexedSessionFutureWrapper implements SessionFutureWrapper { - private ISpan span; - private volatile MultiplexedSessionFuture multiplexedSessionFuture; - - public MultiplexedSessionFutureWrapper(ISpan span) { - this.span = span; - } - - @Override - public MultiplexedSessionFuture get() { - if (resourceNotFoundException != null) { - span.addAnnotation("Database has been deleted"); - throw SpannerExceptionFactory.newSpannerException( - ErrorCode.NOT_FOUND, - String.format( - "The session pool has been invalidated because a previous RPC returned 'Database not found': %s", - resourceNotFoundException.getMessage()), - resourceNotFoundException); - } - if (multiplexedSessionFuture == null) { - synchronized (lock) { - if (multiplexedSessionFuture == null) { - // Creating a new reference where the request's span state can be stored. - MultiplexedSessionFuture multiplexedSessionFuture = new MultiplexedSessionFuture(span); - this.multiplexedSessionFuture = multiplexedSessionFuture; - return multiplexedSessionFuture; - } - } - } - return multiplexedSessionFuture; - } - } - interface SessionFuture extends Session { /** @@ -1318,8 +1263,8 @@ class PooledSessionFuture extends SimpleForwardingListenableFuture(this, pooledSessionReplacementHandler, options); } @Override @@ -1563,7 +1508,7 @@ PooledSession get(final boolean eligibleForLongRunning) { res.markBusy(span); span.addAnnotation("Using Session", "sessionId", res.getName()); synchronized (lock) { - incrementNumSessionsInUse(false); + incrementNumSessionsInUse(); checkedOutSessions.add(this); } res.eligibleForLongRunning = eligibleForLongRunning; @@ -1581,247 +1526,6 @@ PooledSession get(final boolean eligibleForLongRunning) { } } - class MultiplexedSessionFuture implements SessionFuture { - - private final ISpan span; - private volatile MultiplexedSession multiplexedSession; - - MultiplexedSessionFuture(ISpan span) { - this.span = span; - } - - @Override - public Timestamp write(Iterable mutations) throws SpannerException { - return writeWithOptions(mutations).getCommitTimestamp(); - } - - @Override - public CommitResponse writeWithOptions( - Iterable mutations, TransactionOption... options) throws SpannerException { - try { - return get().writeWithOptions(mutations, options); - } finally { - close(); - } - } - - @Override - public Timestamp writeAtLeastOnce(Iterable mutations) throws SpannerException { - return writeAtLeastOnceWithOptions(mutations).getCommitTimestamp(); - } - - @Override - public CommitResponse writeAtLeastOnceWithOptions( - Iterable mutations, TransactionOption... options) throws SpannerException { - try { - return get().writeAtLeastOnceWithOptions(mutations, options); - } finally { - close(); - } - } - - @Override - public ServerStream batchWriteAtLeastOnce( - Iterable mutationGroups, TransactionOption... options) - throws SpannerException { - try { - return get().batchWriteAtLeastOnce(mutationGroups, options); - } finally { - close(); - } - } - - @Override - public ReadContext singleUse() { - try { - return new AutoClosingReadContext<>( - session -> { - MultiplexedSession multiplexedSession = session.get(); - return multiplexedSession.getDelegate().singleUse(); - }, - SessionPool.this, - multiplexedSessionReplacementHandler, - this, - true); - } catch (Exception e) { - close(); - throw e; - } - } - - @Override - public ReadContext singleUse(final TimestampBound bound) { - try { - return new AutoClosingReadContext<>( - session -> { - MultiplexedSession multiplexedSession = session.get(); - return multiplexedSession.getDelegate().singleUse(bound); - }, - SessionPool.this, - multiplexedSessionReplacementHandler, - this, - true); - } catch (Exception e) { - close(); - throw e; - } - } - - @Override - public ReadOnlyTransaction singleUseReadOnlyTransaction() { - return internalReadOnlyTransaction( - session -> { - MultiplexedSession multiplexedSession = session.get(); - return multiplexedSession.getDelegate().singleUseReadOnlyTransaction(); - }, - true); - } - - @Override - public ReadOnlyTransaction singleUseReadOnlyTransaction(final TimestampBound bound) { - return internalReadOnlyTransaction( - session -> { - MultiplexedSession multiplexedSession = session.get(); - return multiplexedSession.getDelegate().singleUseReadOnlyTransaction(bound); - }, - true); - } - - @Override - public ReadOnlyTransaction readOnlyTransaction() { - return internalReadOnlyTransaction( - session -> { - MultiplexedSession multiplexedSession = session.get(); - return multiplexedSession.getDelegate().readOnlyTransaction(); - }, - false); - } - - @Override - public ReadOnlyTransaction readOnlyTransaction(final TimestampBound bound) { - return internalReadOnlyTransaction( - session -> { - MultiplexedSession multiplexedSession = session.get(); - return multiplexedSession.getDelegate().readOnlyTransaction(bound); - }, - false); - } - - private ReadOnlyTransaction internalReadOnlyTransaction( - Function transactionSupplier, - boolean isSingleUse) { - try { - return new AutoClosingReadTransaction<>( - transactionSupplier, - SessionPool.this, - multiplexedSessionReplacementHandler, - this, - isSingleUse); - } catch (Exception e) { - close(); - throw e; - } - } - - @Override - public TransactionRunner readWriteTransaction(TransactionOption... options) { - return new SessionPoolTransactionRunner<>( - this, multiplexedSessionReplacementHandler, options); - } - - @Override - public TransactionManager transactionManager(TransactionOption... options) { - return new AutoClosingTransactionManager<>( - this, multiplexedSessionReplacementHandler, options); - } - - @Override - public AsyncRunner runAsync(TransactionOption... options) { - return new SessionPoolAsyncRunner(this, multiplexedSessionReplacementHandler, options); - } - - @Override - public AsyncTransactionManager transactionManagerAsync(TransactionOption... options) { - return new SessionPoolAsyncTransactionManager<>( - multiplexedSessionReplacementHandler, this, options); - } - - @Override - public long executePartitionedUpdate(Statement stmt, UpdateOption... options) { - try { - return get().executePartitionedUpdate(stmt, options); - } finally { - close(); - } - } - - @Override - public String getName() { - return get().getName(); - } - - @Override - public void close() { - try { - asyncClose().get(); - } catch (InterruptedException e) { - throw SpannerExceptionFactory.propagateInterrupt(e); - } catch (ExecutionException e) { - throw asSpannerException(e.getCause()); - } - } - - @Override - public ApiFuture asyncClose() { - MultiplexedSession delegate = getOrNull(); - if (delegate != null) { - return delegate.asyncClose(); - } - return ApiFutures.immediateFuture(Empty.getDefaultInstance()); - } - - private MultiplexedSession getOrNull() { - try { - return get(); - } catch (Throwable ignore) { - // this exception will never be thrown for a multiplexed session since the Future - // object is already initialised. - return null; - } - } - - @Override - public MultiplexedSession get() { - try { - if (multiplexedSession == null) { - boolean created = false; - synchronized (this) { - if (multiplexedSession == null) { - SessionImpl sessionImpl = - new SessionImpl( - sessionClient.getSpanner(), currentMultiplexedSessionReference.get().get()); - MultiplexedSession multiplexedSession = new MultiplexedSession(sessionImpl); - multiplexedSession.markBusy(span); - span.addAnnotation("Using Session", "sessionId", multiplexedSession.getName()); - this.multiplexedSession = multiplexedSession; - created = true; - } - } - if (created) { - synchronized (lock) { - incrementNumSessionsInUse(true); - } - } - } - return multiplexedSession; - } catch (ExecutionException e) { - throw SpannerExceptionFactory.newSpannerException(e.getCause()); - } catch (InterruptedException e) { - throw SpannerExceptionFactory.propagateInterrupt(e); - } - } - } - interface CachedSession extends Session { SessionImpl getDelegate(); @@ -1832,9 +1536,6 @@ interface CachedSession extends Session { SpannerException setLastException(SpannerException exception); - // TODO This method can be removed once we fully migrate to multiplexed sessions. - boolean isAllowReplacing(); - AsyncTransactionManagerImpl transactionManagerAsync(TransactionOption... options); void setAllowReplacing(boolean b); @@ -2024,7 +1725,7 @@ public void close() { if ((lastException != null && isSessionNotFound(lastException)) || isRemovedFromPool) { invalidateSession(this); } else { - if (lastException != null && isDatabaseOrInstanceNotFound(lastException)) { + if (isDatabaseOrInstanceNotFound(lastException)) { // Mark this session pool as no longer valid and then release the session into the pool as // there is nothing we can do with it anyways. synchronized (lock) { @@ -2116,8 +1817,7 @@ public SpannerException setLastException(SpannerException exception) { return exception; } - @Override - public boolean isAllowReplacing() { + boolean isAllowReplacing() { return this.allowReplacing; } @@ -2127,172 +1827,12 @@ public TransactionManager transactionManager(TransactionOption... options) { } } - class MultiplexedSession implements CachedSession { - final SessionImpl delegate; - private volatile SpannerException lastException; - - MultiplexedSession(SessionImpl session) { - this.delegate = session; - } - - @Override - public boolean isAllowReplacing() { - // for multiplexed session there is only 1 session, hence there is nothing that we - // can replace. - return false; - } - - @Override - public void setAllowReplacing(boolean allowReplacing) { - // for multiplexed session there is only 1 session, there is nothing that can be replaced. - // hence this is no-op. - } - - @Override - public void markBusy(ISpan span) { - this.delegate.setCurrentSpan(span); - } - - @Override - public void markUsed() { - // no-op for a multiplexed session since we don't track the last-used time - // in case of multiplexed session - } - - @Override - public SpannerException setLastException(SpannerException exception) { - this.lastException = exception; - return exception; - } - - @Override - public SessionImpl getDelegate() { - return delegate; - } - - @Override - public Timestamp write(Iterable mutations) throws SpannerException { - throw SpannerExceptionFactory.newSpannerException( - ErrorCode.UNIMPLEMENTED, "Unimplemented with Multiplexed Session"); - } - - @Override - public CommitResponse writeWithOptions( - Iterable mutations, TransactionOption... options) throws SpannerException { - throw SpannerExceptionFactory.newSpannerException( - ErrorCode.UNIMPLEMENTED, "Unimplemented with Multiplexed Session"); - } - - @Override - public Timestamp writeAtLeastOnce(Iterable mutations) throws SpannerException { - throw SpannerExceptionFactory.newSpannerException( - ErrorCode.UNIMPLEMENTED, "Unimplemented with Multiplexed Session"); - } - - @Override - public CommitResponse writeAtLeastOnceWithOptions( - Iterable mutations, TransactionOption... options) throws SpannerException { - throw SpannerExceptionFactory.newSpannerException( - ErrorCode.UNIMPLEMENTED, "Unimplemented with Multiplexed Session"); - } - - @Override - public ServerStream batchWriteAtLeastOnce( - Iterable mutationGroups, TransactionOption... options) - throws SpannerException { - throw SpannerExceptionFactory.newSpannerException( - ErrorCode.UNIMPLEMENTED, "Unimplemented with Multiplexed Session"); - } - - @Override - public ReadContext singleUse() { - return delegate.singleUse(); - } - - @Override - public ReadContext singleUse(TimestampBound bound) { - return delegate.singleUse(bound); - } - - @Override - public ReadOnlyTransaction singleUseReadOnlyTransaction() { - return delegate.singleUseReadOnlyTransaction(); - } - - @Override - public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) { - return delegate.singleUseReadOnlyTransaction(bound); - } - - @Override - public ReadOnlyTransaction readOnlyTransaction() { - return delegate.readOnlyTransaction(); - } - - @Override - public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) { - return delegate.readOnlyTransaction(bound); - } - - @Override - public TransactionRunner readWriteTransaction(TransactionOption... options) { - throw SpannerExceptionFactory.newSpannerException( - ErrorCode.UNIMPLEMENTED, "Unimplemented with Multiplexed Session"); - } - - @Override - public TransactionManager transactionManager(TransactionOption... options) { - throw SpannerExceptionFactory.newSpannerException( - ErrorCode.UNIMPLEMENTED, "Unimplemented with Multiplexed Session"); - } - - @Override - public AsyncRunner runAsync(TransactionOption... options) { - throw SpannerExceptionFactory.newSpannerException( - ErrorCode.UNIMPLEMENTED, "Unimplemented with Multiplexed Session"); - } - - @Override - public AsyncTransactionManagerImpl transactionManagerAsync(TransactionOption... options) { - throw SpannerExceptionFactory.newSpannerException( - ErrorCode.UNIMPLEMENTED, "Unimplemented with Multiplexed Session"); - } - - @Override - public long executePartitionedUpdate(Statement stmt, UpdateOption... options) { - throw SpannerExceptionFactory.newSpannerException( - ErrorCode.UNIMPLEMENTED, "Unimplemented with Multiplexed Session"); - } - - @Override - public String getName() { - return delegate.getName(); - } - - @Override - public void close() { - synchronized (lock) { - if (lastException != null && isDatabaseOrInstanceNotFound(lastException)) { - SessionPool.this.resourceNotFoundException = - MoreObjects.firstNonNull( - SessionPool.this.resourceNotFoundException, - (ResourceNotFoundException) lastException); - } - } - } - - @Override - public ApiFuture asyncClose() { - close(); - return ApiFutures.immediateFuture(Empty.getDefaultInstance()); - } - } - private final class WaiterFuture extends ForwardingListenableFuture { private static final long MAX_SESSION_WAIT_TIMEOUT = 240_000L; private final SettableFuture waiter = SettableFuture.create(); @Override + @Nonnull protected ListenableFuture delegate() { return waiter; } @@ -2310,7 +1850,7 @@ public PooledSession get() { long currentTimeout = options.getInitialWaitForSessionTimeoutMillis(); while (true) { ISpan span = tracer.spanBuilder(WAIT_FOR_SESSION); - try (IScope waitScope = tracer.withSpan(span)) { + try (IScope ignore = tracer.withSpan(span)) { PooledSession s = pollUninterruptiblyWithTimeout(currentTimeout, options.getAcquireSessionTimeout()); if (s == null) { @@ -2395,9 +1935,6 @@ private PooledSession pollUninterruptiblyWithTimeout( */ final class PoolMaintainer { - // Delay post which the maintainer will retry creating/replacing the current multiplexed session - private final Duration multiplexedSessionCreationRetryDelay = Duration.ofMinutes(10); - // Length of the window in millis over which we keep track of maximum number of concurrent // sessions in use. private final Duration windowLength = Duration.ofMillis(TimeUnit.MINUTES.toMillis(10)); @@ -2421,8 +1958,6 @@ final class PoolMaintainer { */ @VisibleForTesting Instant lastExecutionTime; - @VisibleForTesting Instant multiplexedSessionReplacementAttemptTime; - /** * The previous numSessionsAcquired seen by the maintainer. This is used to calculate the * transactions per second, which again is used to determine whether to randomize the order of @@ -2440,7 +1975,6 @@ final class PoolMaintainer { void init() { lastExecutionTime = clock.instant(); - multiplexedSessionReplacementAttemptTime = clock.instant(); // Scheduled pool maintenance worker. synchronized (lock) { @@ -2483,7 +2017,6 @@ void maintainPool() { this.prevNumSessionsAcquired = SessionPool.this.numSessionsAcquired; } Instant currTime = clock.instant(); - maintainMultiplexedSession(currTime); removeIdleSessions(currTime); // Now go over all the remaining sessions and see if they need to be kept alive explicitly. keepAliveSessions(currTime); @@ -2652,44 +2185,6 @@ private void removeLongRunningSessions( } } } - - void maintainMultiplexedSession(Instant currentTime) { - try { - if (useMultiplexedSessions()) { - if (currentMultiplexedSessionReference.get().isDone()) { - SessionReference sessionReference = getMultiplexedSessionInstance(); - if (sessionReference != null - && isMultiplexedSessionStale(sessionReference, currentTime)) { - final Instant minExecutionTime = - multiplexedSessionReplacementAttemptTime.plus( - multiplexedSessionCreationRetryDelay); - if (currentTime.isBefore(minExecutionTime)) { - return; - } - /* - This will attempt to create a new multiplexed session. if successfully created then - the existing session will be replaced. Note that there maybe active transactions - running on the stale session. Hence, it is important that we only replace the reference - and not invoke a DeleteSession RPC. - */ - maybeCreateMultiplexedSession(multiplexedMaintainerConsumer); - - // update this only after we have attempted to replace the multiplexed session - multiplexedSessionReplacementAttemptTime = currentTime; - } - } - } - } catch (final Throwable t) { - logger.log(Level.WARNING, "Failed to maintain multiplexed session", t); - } - } - - boolean isMultiplexedSessionStale(SessionReference sessionReference, Instant currentTime) { - final Duration durationFromCreationTime = - Duration.between(sessionReference.getCreateTime(), currentTime); - return durationFromCreationTime.compareTo(options.getMultiplexedSessionMaintenanceDuration()) - > 0; - } } enum Position { @@ -2754,9 +2249,6 @@ enum Position { @GuardedBy("lock") private ResourceNotFoundException resourceNotFoundException; - @GuardedBy("lock") - private boolean stopAutomaticPrepare; - @GuardedBy("lock") private final LinkedList sessions = new LinkedList<>(); @@ -2766,9 +2258,6 @@ enum Position { @GuardedBy("lock") private int numSessionsBeingCreated = 0; - @GuardedBy("lock") - private boolean multiplexedSessionBeingCreated = false; - @GuardedBy("lock") private int numSessionsInUse = 0; @@ -2790,10 +2279,7 @@ enum Position { @GuardedBy("lock") private long numLeakedSessionsRemoved = 0; - private AtomicLong numWaiterTimeouts = new AtomicLong(); - - private final AtomicReference> - currentMultiplexedSessionReference = new AtomicReference<>(SettableApiFuture.create()); + private final AtomicLong numWaiterTimeouts = new AtomicLong(); @GuardedBy("lock") private final Set allSessions = new HashSet<>(); @@ -2807,21 +2293,12 @@ enum Position { private final SessionConsumer sessionConsumer = new SessionConsumerImpl(); - private final MultiplexedSessionInitializationConsumer multiplexedSessionInitializationConsumer = - new MultiplexedSessionInitializationConsumer(); - private final MultiplexedSessionMaintainerConsumer multiplexedMaintainerConsumer = - new MultiplexedSessionMaintainerConsumer(); - @VisibleForTesting Function idleSessionRemovedListener; @VisibleForTesting Function longRunningSessionRemovedListener; - @VisibleForTesting Function multiplexedSessionRemovedListener; private final CountDownLatch waitOnMinSessionsLatch; - private final CountDownLatch waitOnMultiplexedSessionsLatch; - private final SessionReplacementHandler pooledSessionReplacementHandler = + private final PooledSessionReplacementHandler pooledSessionReplacementHandler = new PooledSessionReplacementHandler(); - private static final SessionReplacementHandler multiplexedSessionReplacementHandler = - new MultiplexedSessionReplacementHandler(); /** * Create a session pool with the given options and for the given database. It will also start @@ -2965,13 +2442,6 @@ private SessionPool( openTelemetry, attributes, numMultiplexedSessionsAcquired, numMultiplexedSessionsReleased); this.waitOnMinSessionsLatch = options.getMinSessions() > 0 ? new CountDownLatch(1) : new CountDownLatch(0); - this.waitOnMultiplexedSessionsLatch = new CountDownLatch(1); - } - - // TODO: Remove once all code for multiplexed sessions has been removed from the pool. - private boolean useMultiplexedSessions() { - // Multiplexed sessions have moved to MultiplexedSessionDatabaseClient - return false; } /** @@ -3007,7 +2477,7 @@ Dialect getDialect() { } } - SessionReplacementHandler getPooledSessionReplacementHandler() { + PooledSessionReplacementHandler getPooledSessionReplacementHandler() { return pooledSessionReplacementHandler; } @@ -3087,13 +2557,6 @@ int getTotalSessionsPlusNumSessionsBeingCreated() { } } - @VisibleForTesting - boolean isMultiplexedSessionBeingCreated() { - synchronized (lock) { - return multiplexedSessionBeingCreated; - } - } - @VisibleForTesting long getNumWaiterTimeouts() { return numWaiterTimeouts.get(); @@ -3105,9 +2568,6 @@ private void initPool() { if (options.getMinSessions() > 0) { createSessions(options.getMinSessions(), true); } - if (useMultiplexedSessions()) { - maybeCreateMultiplexedSession(multiplexedSessionInitializationConsumer); - } } } @@ -3173,36 +2633,8 @@ boolean isValid() { * Returns a multiplexed session. The method fallbacks to a regular session if {@link * SessionPoolOptions#getUseMultiplexedSession} is not set. */ - SessionFutureWrapper getMultiplexedSessionWithFallback() throws SpannerException { - if (useMultiplexedSessions()) { - ISpan span = tracer.getCurrentSpan(); - try { - return getWrappedMultiplexedSessionFuture(span); - } catch (Throwable t) { - span.addAnnotation("No multiplexed session available."); - throw asSpannerException(t.getCause()); - } - } else { - return new PooledSessionFutureWrapper(getSession()); - } - } - - SessionFutureWrapper getWrappedMultiplexedSessionFuture(ISpan span) { - return new MultiplexedSessionFutureWrapper(span); - } - - /** - * This method is a blocking method. It will block until the underlying {@code - * SettableApiFuture} is resolved. - */ - SessionReference getMultiplexedSessionInstance() { - try { - return currentMultiplexedSessionReference.get().get(); - } catch (InterruptedException e) { - throw SpannerExceptionFactory.propagateInterrupt(e); - } catch (ExecutionException e) { - throw asSpannerException(e.getCause()); - } + PooledSessionFutureWrapper getMultiplexedSessionWithFallback() throws SpannerException { + return new PooledSessionFutureWrapper(getSession()); } /** @@ -3271,14 +2703,12 @@ private PooledSessionFuture checkoutSession( return res; } - private void incrementNumSessionsInUse(boolean isMultiplexed) { + private void incrementNumSessionsInUse() { synchronized (lock) { - if (!isMultiplexed) { - if (maxSessionsInUse < ++numSessionsInUse) { - maxSessionsInUse = numSessionsInUse; - } - numSessionsAcquired++; + if (maxSessionsInUse < ++numSessionsInUse) { + maxSessionsInUse = numSessionsInUse; } + numSessionsAcquired++; } } @@ -3496,7 +2926,7 @@ static boolean isUnbalanced( private void handleCreateSessionsFailure(SpannerException e, int count) { synchronized (lock) { for (int i = 0; i < count; i++) { - if (waiters.size() > 0) { + if (!waiters.isEmpty()) { waiters.poll().put(e); } else { break; @@ -3638,20 +3068,6 @@ private boolean canCreateSession() { } } - private void maybeCreateMultiplexedSession(SessionConsumer sessionConsumer) { - synchronized (lock) { - if (!multiplexedSessionBeingCreated) { - logger.log(Level.FINE, String.format("Creating multiplexed sessions")); - try { - multiplexedSessionBeingCreated = true; - sessionClient.asyncCreateMultiplexedSession(sessionConsumer); - } catch (Throwable ignore) { - // such an exception will never be thrown. the exception will be passed onto the consumer. - } - } - } - } - private void createSessions(final int sessionCount, boolean distributeOverChannels) { logger.log(Level.FINE, String.format("Creating %d sessions", sessionCount)); synchronized (lock) { @@ -3674,99 +3090,6 @@ private void createSessions(final int sessionCount, boolean distributeOverChanne } } - /** - * Callback interface which is invoked when a multiplexed session is being replaced by the - * background maintenance thread. When a multiplexed session creation fails during background - * thread, it would simply log the exception and retry the session creation in the next background - * thread invocation. - * - *

This consumer is not used when the multiplexed session is getting initialized for the first - * time during application startup. We instead use {@link - * MultiplexedSessionInitializationConsumer} for the first time when multiplexed session is - * getting created. - */ - class MultiplexedSessionMaintainerConsumer implements SessionConsumer { - @Override - public void onSessionReady(SessionImpl sessionImpl) { - final SessionReference sessionReference = sessionImpl.getSessionReference(); - final SettableFuture settableFuture = SettableFuture.create(); - settableFuture.set(sessionReference); - - synchronized (lock) { - SessionReference oldSession = null; - if (currentMultiplexedSessionReference.get().isDone()) { - oldSession = getMultiplexedSessionInstance(); - } - SettableApiFuture settableApiFuture = SettableApiFuture.create(); - settableApiFuture.set(sessionReference); - currentMultiplexedSessionReference.set(settableApiFuture); - if (oldSession != null) { - logger.log( - Level.INFO, - String.format( - "Removed Multiplexed Session => %s created at => %s", - oldSession.getName(), oldSession.getCreateTime())); - if (multiplexedSessionRemovedListener != null) { - multiplexedSessionRemovedListener.apply(oldSession); - } - } - multiplexedSessionBeingCreated = false; - } - } - - /** - * Method which logs the exception so that session creation can be re-attempted in the next - * background thread invocation. - */ - @Override - public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount) { - synchronized (lock) { - multiplexedSessionBeingCreated = false; - } - logger.log( - Level.WARNING, - String.format( - "Failed to create multiplexed session. " - + "Pending replacing stale multiplexed session", - t)); - } - } - - /** - * Callback interface which is invoked when a multiplexed session is getting initialised for the - * first time when a session is getting created. - */ - class MultiplexedSessionInitializationConsumer implements SessionConsumer { - @Override - public void onSessionReady(SessionImpl sessionImpl) { - final SessionReference sessionReference = sessionImpl.getSessionReference(); - synchronized (lock) { - SettableApiFuture settableApiFuture = - currentMultiplexedSessionReference.get(); - settableApiFuture.set(sessionReference); - multiplexedSessionBeingCreated = false; - waitOnMultiplexedSessionsLatch.countDown(); - } - } - - /** - * When a multiplexed session fails during initialization we would like all pending threads to - * receive the exception and throw the error. This is done because at the time of start up there - * is no other multiplexed session which could have been assigned to the pending requests. - */ - @Override - public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount) { - synchronized (lock) { - multiplexedSessionBeingCreated = false; - if (isDatabaseOrInstanceNotFound(asSpannerException(t))) { - setResourceNotFoundException((ResourceNotFoundException) t); - poolMaintainer.close(); - } - currentMultiplexedSessionReference.get().setException(asSpannerException(t)); - } - } - } - /** * {@link SessionConsumer} that receives the created sessions from a {@link SessionClient} and * releases these into the pool. The session pool only needs one instance of this, as all sessions diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionMaintainerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionMaintainerTest.java deleted file mode 100644 index ca7f8386894..00000000000 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionMaintainerTest.java +++ /dev/null @@ -1,310 +0,0 @@ -/* - * Copyright 2024 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.spanner; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assume.assumeTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.mockito.MockitoAnnotations.initMocks; - -import com.google.cloud.Timestamp; -import com.google.cloud.spanner.SessionPool.CachedSession; -import com.google.cloud.spanner.SessionPool.MultiplexedSessionInitializationConsumer; -import com.google.cloud.spanner.SessionPool.MultiplexedSessionMaintainerConsumer; -import com.google.cloud.spanner.SessionPool.Position; -import com.google.cloud.spanner.SessionPool.SessionFutureWrapper; -import io.opencensus.trace.Tracing; -import io.opentelemetry.api.OpenTelemetry; -import java.util.ArrayList; -import java.util.List; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.stream.Collectors; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.threeten.bp.Duration; -import org.threeten.bp.Instant; - -@RunWith(JUnit4.class) -public class MultiplexedSessionMaintainerTest extends BaseSessionPoolTest { - - private ExecutorService executor = Executors.newSingleThreadExecutor(); - private @Mock SpannerImpl client; - private @Mock SessionClient sessionClient; - private @Mock SpannerOptions spannerOptions; - private DatabaseId db = DatabaseId.of("projects/p/instances/i/databases/unused"); - private SessionPoolOptions options; - private FakeClock clock = new FakeClock(); - private List multiplexedSessionsRemoved = new ArrayList<>(); - - @BeforeClass - public static void checkUsesMultiplexedSessionPool() { - assumeTrue("Only run if the maintainer in the session pool is used", false); - } - - @Before - public void setUp() { - initMocks(this); - when(client.getOptions()).thenReturn(spannerOptions); - when(client.getSessionClient(db)).thenReturn(sessionClient); - when(sessionClient.getSpanner()).thenReturn(client); - when(spannerOptions.getNumChannels()).thenReturn(4); - when(spannerOptions.getDatabaseRole()).thenReturn("role"); - options = - SessionPoolOptions.newBuilder() - .setMinSessions(1) - .setMaxIdleSessions(1) - .setMaxSessions(5) - .setIncStep(1) - .setKeepAliveIntervalMinutes(2) - .setUseMultiplexedSession(true) - .setPoolMaintainerClock(clock) - .build(); - when(spannerOptions.getSessionPoolOptions()).thenReturn(options); - assumeTrue(options.getUseMultiplexedSession()); - multiplexedSessionsRemoved.clear(); - } - - @Test - public void testMaintainMultiplexedSession_whenNewSessionCreated_assertThatStaleSessionIsRemoved() - throws Exception { - doAnswer( - invocation -> { - MultiplexedSessionInitializationConsumer consumer = - invocation.getArgument(0, MultiplexedSessionInitializationConsumer.class); - ReadContext mockContext = mock(ReadContext.class); - Timestamp timestamp = - Timestamp.ofTimeSecondsAndNanos( - Instant.ofEpochMilli(clock.currentTimeMillis.get()).getEpochSecond(), 0); - consumer.onSessionReady( - setupMockSession( - buildMockMultiplexedSession(client, mockContext, timestamp.toProto()), - mockContext)); - return null; - }) - .when(sessionClient) - .asyncCreateMultiplexedSession(any(MultiplexedSessionInitializationConsumer.class)); - doAnswer( - invocation -> { - MultiplexedSessionMaintainerConsumer consumer = - invocation.getArgument(0, MultiplexedSessionMaintainerConsumer.class); - ReadContext mockContext = mock(ReadContext.class); - Timestamp timestamp = - Timestamp.ofTimeSecondsAndNanos( - Instant.ofEpochMilli(clock.currentTimeMillis.get()).getEpochSecond(), 0); - consumer.onSessionReady( - setupMockSession( - buildMockMultiplexedSession(client, mockContext, timestamp.toProto()), - mockContext)); - return null; - }) - .when(sessionClient) - .asyncCreateMultiplexedSession(any(MultiplexedSessionMaintainerConsumer.class)); - - SessionPool pool = createPool(); - - // Run one maintenance loop. - CachedSession session1 = pool.getMultiplexedSessionWithFallback().get().get(); - runMaintenanceLoop(clock, pool, 1); - assertTrue(multiplexedSessionsRemoved.isEmpty()); - - // Advance clock by 8 days - clock.currentTimeMillis.addAndGet(Duration.ofDays(8).toMillis()); - - // Run second maintenance loop. the first session would now be stale since it has now existed - // for more than 7 days. - runMaintenanceLoop(clock, pool, 1); - - CachedSession session2 = pool.getMultiplexedSessionWithFallback().get().get(); - assertNotEquals(session1.getName(), session2.getName()); - assertEquals(1, multiplexedSessionsRemoved.size()); - assertTrue(getNameOfSessionRemoved().contains(session1.getName())); - - // Advance clock by 8 days - clock.currentTimeMillis.addAndGet(Duration.ofDays(8).toMillis()); - - // Run third maintenance loop. the second session would now be stale since it has now existed - // for more than 7 days - runMaintenanceLoop(clock, pool, 1); - - CachedSession session3 = pool.getMultiplexedSessionWithFallback().get().get(); - assertNotEquals(session2.getName(), session3.getName()); - assertEquals(2, multiplexedSessionsRemoved.size()); - assertTrue(getNameOfSessionRemoved().contains(session2.getName())); - } - - @Test - public void - testMaintainMultiplexedSession_whenMultiplexedSessionNotStale_assertThatSessionIsNotRemoved() { - doAnswer( - invocation -> { - MultiplexedSessionInitializationConsumer consumer = - invocation.getArgument(0, MultiplexedSessionInitializationConsumer.class); - ReadContext mockContext = mock(ReadContext.class); - Timestamp timestamp = - Timestamp.ofTimeSecondsAndNanos( - Instant.ofEpochMilli(clock.currentTimeMillis.get()).getEpochSecond(), 0); - consumer.onSessionReady( - setupMockSession( - buildMockMultiplexedSession(client, mockContext, timestamp.toProto()), - mockContext)); - return null; - }) - .when(sessionClient) - .asyncCreateMultiplexedSession(any(MultiplexedSessionInitializationConsumer.class)); - SessionPool pool = createPool(); - - // Run one maintenance loop. - SessionFutureWrapper session1 = pool.getMultiplexedSessionWithFallback(); - runMaintenanceLoop(clock, pool, 1); - assertTrue(multiplexedSessionsRemoved.isEmpty()); - - // Advance clock by 4 days - clock.currentTimeMillis.addAndGet(Duration.ofDays(4).toMillis()); - // Run one maintenance loop. the first session would not be stale yet since it has now existed - // for less than 7 days. - runMaintenanceLoop(clock, pool, 1); - SessionFutureWrapper session2 = pool.getMultiplexedSessionWithFallback(); - assertTrue(multiplexedSessionsRemoved.isEmpty()); - assertEquals(session1.get().getName(), session2.get().getName()); - } - - @Test - public void - testMaintainMultiplexedSession_whenMultiplexedSessionCreationFailed_testRetryAfterDelay() { - doAnswer( - invocation -> { - MultiplexedSessionInitializationConsumer consumer = - invocation.getArgument(0, MultiplexedSessionInitializationConsumer.class); - ReadContext mockContext = mock(ReadContext.class); - Timestamp timestamp = - Timestamp.ofTimeSecondsAndNanos( - Instant.ofEpochMilli(clock.currentTimeMillis.get()).getEpochSecond(), 0); - consumer.onSessionReady( - setupMockSession( - buildMockMultiplexedSession(client, mockContext, timestamp.toProto()), - mockContext)); - return null; - }) - .when(sessionClient) - .asyncCreateMultiplexedSession(any(MultiplexedSessionInitializationConsumer.class)); - doAnswer( - invocation -> { - MultiplexedSessionMaintainerConsumer consumer = - invocation.getArgument(0, MultiplexedSessionMaintainerConsumer.class); - consumer.onSessionCreateFailure( - SpannerExceptionFactory.newSpannerException(ErrorCode.DEADLINE_EXCEEDED, ""), 1); - return null; - }) - .when(sessionClient) - .asyncCreateMultiplexedSession(any(MultiplexedSessionMaintainerConsumer.class)); - SessionPool pool = createPool(); - - // Advance clock by 8 days - clock.currentTimeMillis.addAndGet(Duration.ofDays(8).toMillis()); - - // Run one maintenance loop. Attempt replacing stale session should fail. - SessionFutureWrapper session1 = pool.getMultiplexedSessionWithFallback(); - runMaintenanceLoop(clock, pool, 1); - assertTrue(multiplexedSessionsRemoved.isEmpty()); - verify(sessionClient, times(1)) - .asyncCreateMultiplexedSession(any(MultiplexedSessionMaintainerConsumer.class)); - - // Advance clock by 10s and now mock session creation to be successful. - clock.currentTimeMillis.addAndGet(Duration.ofSeconds(10).toMillis()); - doAnswer( - invocation -> { - MultiplexedSessionMaintainerConsumer consumer = - invocation.getArgument(0, MultiplexedSessionMaintainerConsumer.class); - ReadContext mockContext = mock(ReadContext.class); - Timestamp timestamp = - Timestamp.ofTimeSecondsAndNanos( - Instant.ofEpochMilli(clock.currentTimeMillis.get()).getEpochSecond(), 0); - consumer.onSessionReady( - setupMockSession( - buildMockMultiplexedSession(client, mockContext, timestamp.toProto()), - mockContext)); - return null; - }) - .when(sessionClient) - .asyncCreateMultiplexedSession(any(MultiplexedSessionMaintainerConsumer.class)); - // Run one maintenance loop. Attempt should be ignored as it has not been 10 minutes since last - // attempt. - runMaintenanceLoop(clock, pool, 1); - SessionFutureWrapper session2 = pool.getMultiplexedSessionWithFallback(); - assertTrue(multiplexedSessionsRemoved.isEmpty()); - assertEquals(session1.get().getName(), session2.get().getName()); - verify(sessionClient, times(1)) - .asyncCreateMultiplexedSession(any(MultiplexedSessionMaintainerConsumer.class)); - - // Advance clock by 15 minutes - clock.currentTimeMillis.addAndGet(Duration.ofMinutes(15).toMillis()); - // Run one maintenance loop. Attempt should succeed since its already more than 10 minutes since - // the last attempt. - runMaintenanceLoop(clock, pool, 1); - SessionFutureWrapper session3 = pool.getMultiplexedSessionWithFallback(); - assertTrue(getNameOfSessionRemoved().contains(session1.get().get().getName())); - assertNotEquals(session1.get().getName(), session3.get().getName()); - verify(sessionClient, times(2)) - .asyncCreateMultiplexedSession(any(MultiplexedSessionMaintainerConsumer.class)); - } - - private SessionImpl setupMockSession(final SessionImpl session, final ReadContext mockContext) { - final ResultSet mockResult = mock(ResultSet.class); - when(mockContext.executeQuery(any(Statement.class))).thenAnswer(invocation -> mockResult); - when(mockResult.next()).thenReturn(true); - return session; - } - - private SessionPool createPool() { - // Allow sessions to be added to the head of the pool in all cases in this test, as it is - // otherwise impossible to know which session exactly is getting pinged at what point in time. - SessionPool pool = - SessionPool.createPool( - options, - new TestExecutorFactory(), - client.getSessionClient(db), - clock, - Position.FIRST, - new TraceWrapper(Tracing.getTracer(), OpenTelemetry.noop().getTracer(""), false), - OpenTelemetry.noop()); - pool.multiplexedSessionRemovedListener = - input -> { - multiplexedSessionsRemoved.add(input); - return null; - }; - return pool; - } - - Set getNameOfSessionRemoved() { - return multiplexedSessionsRemoved.stream() - .map(session -> session.getName()) - .collect(Collectors.toSet()); - } -} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionPoolTest.java deleted file mode 100644 index fcad1ff22c6..00000000000 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionPoolTest.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Copyright 2024 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.spanner; - -import static com.google.common.truth.Truth.assertThat; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThrows; -import static org.junit.Assert.assertTrue; -import static org.junit.Assume.assumeTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.mockito.MockitoAnnotations.initMocks; - -import com.google.cloud.spanner.SessionPool.MultiplexedSessionFuture; -import com.google.cloud.spanner.SessionPool.MultiplexedSessionInitializationConsumer; -import com.google.cloud.spanner.SessionPool.SessionFutureWrapper; -import com.google.cloud.spanner.SpannerImpl.ClosedException; -import io.opencensus.trace.Tracing; -import io.opentelemetry.api.OpenTelemetry; -import java.io.PrintWriter; -import java.io.StringWriter; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.mockito.Mock; -import org.threeten.bp.Duration; - -/** - * Tests for {@link com.google.cloud.spanner.SessionPool.MultiplexedSession} component within the - * {@link SessionPool} class. - */ -public class MultiplexedSessionPoolTest extends BaseSessionPoolTest { - - @Mock SpannerImpl client; - @Mock SessionClient sessionClient; - @Mock SpannerOptions spannerOptions; - private final DatabaseId db = DatabaseId.of("projects/p/instances/i/databases/unused"); - private final TraceWrapper tracer = - new TraceWrapper(Tracing.getTracer(), OpenTelemetry.noop().getTracer(""), false); - SessionPoolOptions options; - SessionPool pool; - - private SessionPool createPool() { - return SessionPool.createPool( - options, - new TestExecutorFactory(), - client.getSessionClient(db), - tracer, - OpenTelemetry.noop()); - } - - @BeforeClass - public static void checkUsesMultiplexedSessionPool() { - assumeTrue("Only run if the maintainer in the session pool is used", false); - } - - @Before - public void setUp() { - initMocks(this); - SpannerOptions.resetActiveTracingFramework(); - SpannerOptions.enableOpenTelemetryTraces(); - when(client.getOptions()).thenReturn(spannerOptions); - when(client.getSessionClient(db)).thenReturn(sessionClient); - when(sessionClient.getSpanner()).thenReturn(client); - when(spannerOptions.getNumChannels()).thenReturn(4); - when(spannerOptions.getDatabaseRole()).thenReturn("role"); - options = - SessionPoolOptions.newBuilder() - .setMinSessions(2) - .setMaxSessions(2) - .setUseMultiplexedSession(true) - .build(); - when(spannerOptions.getSessionPoolOptions()).thenReturn(options); - assumeTrue(options.getUseMultiplexedSession()); - } - - @Test - public void testGetMultiplexedSession_whenSessionInitializationSucceeded_assertSessionReturned() { - setupMockMultiplexedSessionCreation(); - - pool = createPool(); - assertTrue(pool.isValid()); - - // create 5 requests which require a session - for (int i = 0; i < 5; i++) { - // checking out a multiplexed session - SessionFutureWrapper multiplexedSessionFuture = pool.getMultiplexedSessionWithFallback(); - assertNotNull(multiplexedSessionFuture.get()); - } - verify(sessionClient, times(1)) - .asyncCreateMultiplexedSession(any(MultiplexedSessionInitializationConsumer.class)); - } - - @Test - public void testGetMultiplexedSession_whenClosedPool_assertSessionReturned() { - setupMockMultiplexedSessionCreation(); - - pool = createPool(); - assertTrue(pool.isValid()); - closePoolWithStacktrace(); - - // checking out a multiplexed session does not throw error even if pool is closed - MultiplexedSessionFuture multiplexedSessionFuture = - (MultiplexedSessionFuture) pool.getMultiplexedSessionWithFallback().get(); - assertNotNull(multiplexedSessionFuture); - - // checking out a regular session throws error. - IllegalStateException e = assertThrows(IllegalStateException.class, () -> pool.getSession()); - assertThat(e.getCause()).isInstanceOf(ClosedException.class); - StringWriter sw = new StringWriter(); - e.getCause().printStackTrace(new PrintWriter(sw)); - assertThat(sw.toString()).contains("closePoolWithStacktrace"); - } - - private void closePoolWithStacktrace() { - pool.closeAsync(new SpannerImpl.ClosedException()); - } - - @Test - public void testGetMultiplexedSession_whenSessionCreationFailed_assertErrorForWaiters() { - doAnswer( - invocation -> { - MultiplexedSessionInitializationConsumer consumer = - invocation.getArgument(0, MultiplexedSessionInitializationConsumer.class); - consumer.onSessionCreateFailure( - SpannerExceptionFactory.newSpannerException(ErrorCode.DEADLINE_EXCEEDED, ""), 1); - return null; - }) - .when(sessionClient) - .asyncCreateMultiplexedSession(any(MultiplexedSessionInitializationConsumer.class)); - options = - options - .toBuilder() - .setMinSessions(2) - .setUseMultiplexedSession(true) - .setAcquireSessionTimeout( - Duration.ofMillis(50)) // block for a max of 50 ms for session to be available - .build(); - pool = createPool(); - - // create 5 requests which require a session - for (int i = 0; i < 5; i++) { - SpannerException e = - assertThrows( - SpannerException.class, () -> pool.getMultiplexedSessionWithFallback().get().get()); - assertEquals(ErrorCode.DEADLINE_EXCEEDED, e.getErrorCode()); - } - // assert that all 5 requests failed with exception - assertEquals(0, pool.getNumWaiterTimeouts()); - assertEquals(0, pool.getNumberOfSessionsInPool()); - } - - private void setupMockMultiplexedSessionCreation() { - doAnswer( - invocation -> { - MultiplexedSessionInitializationConsumer consumer = - invocation.getArgument(0, MultiplexedSessionInitializationConsumer.class); - consumer.onSessionReady(mockSession()); - return null; - }) - .when(sessionClient) - .asyncCreateMultiplexedSession(any(MultiplexedSessionInitializationConsumer.class)); - } -} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java index 8ffc4f21a10..564e2b97aa0 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java @@ -64,7 +64,6 @@ import com.google.cloud.spanner.MetricRegistryTestUtils.PointWithFunction; import com.google.cloud.spanner.ReadContext.QueryAnalyzeMode; import com.google.cloud.spanner.SessionClient.SessionConsumer; -import com.google.cloud.spanner.SessionPool.MultiplexedSessionInitializationConsumer; import com.google.cloud.spanner.SessionPool.PooledSession; import com.google.cloud.spanner.SessionPool.PooledSessionFuture; import com.google.cloud.spanner.SessionPool.Position; @@ -2212,16 +2211,6 @@ public void testWaitOnMinSessionsWhenSessionsAreCreatedBeforeTimeout() { })) .when(sessionClient) .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); - doAnswer( - invocation -> - executor.submit( - () -> { - MultiplexedSessionInitializationConsumer consumer = - invocation.getArgument(0, MultiplexedSessionInitializationConsumer.class); - consumer.onSessionReady(mockMultiplexedSession()); - })) - .when(sessionClient) - .asyncCreateMultiplexedSession(any(MultiplexedSessionInitializationConsumer.class)); pool = createPool(new FakeClock(), new FakeMetricRegistry(), SPANNER_DEFAULT_LABEL_VALUES); pool.maybeWaitOnMinSessions();