From 7636978df929c2e268046ffde0ae284ef2b08fb9 Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Mon, 9 Sep 2024 15:43:31 -0400 Subject: [PATCH] fix: close pending zero-copy responses when Storage#close is called Update gRPC based Storage instances to close out in progress reads when zero-copy is used and Storage#close() is called. --- .../cloud/storage/GrpcStorageOptions.java | 62 +++++++++++-------- .../ResponseContentLifecycleManager.java | 7 ++- 2 files changed, 41 insertions(+), 28 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java index 64376cc107..07616d0ef1 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java @@ -70,7 +70,6 @@ import com.google.storage.v2.StorageSettings; import com.google.storage.v2.stub.GrpcStorageCallableFactory; import com.google.storage.v2.stub.GrpcStorageStub; -import com.google.storage.v2.stub.StorageStub; import com.google.storage.v2.stub.StorageStubSettings; import io.grpc.ClientInterceptor; import io.grpc.Detachable; @@ -89,9 +88,9 @@ import java.time.Clock; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.IdentityHashMap; +import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; @@ -908,9 +907,27 @@ private Object readResolve() { private static final class InternalStorageClient extends StorageClient { - private InternalStorageClient(StorageStub stub) { + private InternalStorageClient(InternalZeroCopyGrpcStorageStub stub) { super(stub); } + + @Override + public void shutdownNow() { + try { + // GrpcStorageStub#close() is final and we can't override it + // instead hook in here to close out the zero-copy marshaller + getStub().getObjectMediaResponseMarshaller.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + super.shutdownNow(); + } + } + + @Override + public InternalZeroCopyGrpcStorageStub getStub() { + return (InternalZeroCopyGrpcStorageStub) super.getStub(); + } } private static final class InternalZeroCopyGrpcStorageStub extends GrpcStorageStub @@ -1071,30 +1088,21 @@ public void close() throws IOException { * them all as suppressed exceptions on the first occurrence. */ @VisibleForTesting - static void closeAllStreams(Collection inputStreams) throws IOException { - IOException ioException = - inputStreams.stream() - .map( - stream -> { - try { - stream.close(); - return null; - } catch (IOException e) { - return e; - } - }) - .filter(Objects::nonNull) - .reduce( - null, - (l, r) -> { - if (l != null) { - l.addSuppressed(r); - return l; - } else { - return r; - } - }, - (l, r) -> l); + static void closeAllStreams(Iterable inputStreams) throws IOException { + Iterator iterator = inputStreams.iterator(); + IOException ioException = null; + while (iterator.hasNext()) { + InputStream next = iterator.next(); + try { + next.close(); + } catch (IOException e) { + if (ioException == null) { + ioException = e; + } else { + ioException.addSuppressed(e); + } + } + } if (ioException != null) { throw ioException; diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleManager.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleManager.java index 3236513398..732cc5cb8b 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleManager.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleManager.java @@ -16,10 +16,15 @@ package com.google.cloud.storage; import com.google.storage.v2.ReadObjectResponse; +import java.io.Closeable; +import java.io.IOException; -interface ResponseContentLifecycleManager { +interface ResponseContentLifecycleManager extends Closeable { ResponseContentLifecycleHandle get(ReadObjectResponse response); + @Override + default void close() throws IOException {} + static ResponseContentLifecycleManager noop() { return response -> new ResponseContentLifecycleHandle(