Skip to content

Commit

Permalink
refactor: customize HTTP transport for GCS client
Browse files Browse the repository at this point in the history
This change set is the result of some investigation on the poor throughput issue (see #1).

It includes the following:

* Upgrades google-cloud-storage to latest release
* Setup the ApacheHttpTransport to use a pooling client connection manager. This addresses an issue observed with the default connection manager; after a period of inactivity, the client would fail to connect to the google API endpoint and not retry. This implementation does successfully retry when connections are severed. This change comes with a caveat in that it requires us to use deprecated HTTP Client constructors instead of their current equivalents.
* Improve the performance of hard delete by skipping an unnecessary prior GET.
* Lean down object read requests to only retrieve the MEDIA_LINK field (at this time all other GCS object attributes aren't used by this implementation).
  • Loading branch information
nblair committed Jun 28, 2018
1 parent 2cd29c9 commit e8626bc
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 36 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-storage</artifactId>
<version>1.28.0</version>
<version>1.35.0</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.Storage.BlobField;
import com.google.cloud.storage.Storage.BlobGetOption;
import com.google.cloud.storage.Storage.BlobListOption;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.LoadingCache;
Expand Down Expand Up @@ -139,6 +141,12 @@ protected void doStart() throws Exception {
metricsStore.start();
}

@Override
protected void doStop() throws Exception {
liveBlobs = null;
metricsStore.stop();
}

@Override
@Guarded(by = STARTED)
public Blob create(final InputStream inputStream, final Map<String, String> headers) {
Expand All @@ -162,7 +170,7 @@ public Blob create(final Path path, final Map<String, String> map, final long si
@Override
@Guarded(by = STARTED)
public Blob copy(final BlobId blobId, final Map<String, String> headers) {
GoogleCloudStorageBlob sourceBlob = checkNotNull(getInternal(blobId));
GoogleCloudStorageBlob sourceBlob = (GoogleCloudStorageBlob) checkNotNull(get(blobId));

return createInternal(headers, destination -> {
sourceBlob.getBlob().copyTo(getConfiguredBucketName(), destination);
Expand All @@ -175,17 +183,12 @@ public Blob copy(final BlobId blobId, final Map<String, String> headers) {
@Override
@Guarded(by = STARTED)
public Blob get(final BlobId blobId) {
return getInternal(blobId);
return get(blobId, false);
}

@Nullable
@Override
public Blob get(final BlobId blobId, final boolean includeDeleted) {
// TODO implement soft-delete
return getInternal(blobId);
}

GoogleCloudStorageBlob getInternal(final BlobId blobId) {
checkNotNull(blobId);

final GoogleCloudStorageBlob blob = liveBlobs.getUnchecked(blobId);
Expand All @@ -194,14 +197,14 @@ GoogleCloudStorageBlob getInternal(final BlobId blobId) {
Lock lock = blob.lock();
try {
if (blob.isStale()) {
GoogleCloudBlobAttributes blobAttributes = new GoogleCloudBlobAttributes(bucket, attributePath(blobId).toString());
GoogleCloudBlobAttributes blobAttributes = new GoogleCloudBlobAttributes(bucket, attributePath(blobId));
boolean loaded = blobAttributes.load();
if (!loaded) {
log.warn("Attempt to access non-existent blob {} ({})", blobId, blobAttributes);
return null;
}

if (blobAttributes.isDeleted()) {
if (blobAttributes.isDeleted() && !includeDeleted) {
log.warn("Attempt to access soft-deleted blob {} ({})", blobId, blobAttributes);
return null;
}
Expand All @@ -226,7 +229,7 @@ GoogleCloudStorageBlob getInternal(final BlobId blobId) {

@Override
@Guarded(by = STARTED)
public boolean delete(final BlobId blobId, final String s) {
public boolean delete(final BlobId blobId, final String reason) {
// FIXME: implement soft delete
return deleteHard(blobId);
}
Expand All @@ -243,13 +246,8 @@ public boolean deleteHard(final BlobId blobId) {
GoogleCloudBlobAttributes blobAttributes = new GoogleCloudBlobAttributes(bucket, attributePath);
Long contentSize = getContentSizeForDeletion(blobAttributes);

GoogleCloudStorageBlob blob = getInternal(blobId);
boolean blobDeleted = false;
if (blob != null) {
blobDeleted = blob.getBlob().delete();
}

blobAttributes.setDeleted(blobDeleted);
boolean blobDeleted = storage.delete(getConfiguredBucketName(), contentPath(blobId));
storage.delete(getConfiguredBucketName(), attributePath);

if (blobDeleted && contentSize != null) {
metricsStore.recordDeletion(contentSize);
Expand Down Expand Up @@ -310,7 +308,7 @@ protected Bucket getOrCreateStorageBucket() {
@Override
@Guarded(by = {NEW, STOPPED, FAILED})
public void remove() {
// TODO delete bucket?
// TODO delete bucket only if it is empty
}

@Override
Expand Down Expand Up @@ -413,17 +411,29 @@ Blob createInternal(final Map<String, String> headers, BlobIngester ingester) {
return blob;
}
catch (IOException e) {
// TODO delete what we created?
blob.getBlob().delete();
if (blobAttributes != null) {
blobAttributes.setDeleted(true);
}
deleteNonExplosively(attributePath);
deleteNonExplosively(blobPath);
throw new BlobStoreException(e, blobId);
}
finally {
lock.unlock();
}
}

/**
* Intended for use only within catch blocks that intend to throw their own {@link BlobStoreException}
* for another good reason.
*
* @param contentPath the path within the configured bucket to delete
*/
private void deleteNonExplosively(final String contentPath) {
try {
storage.delete(getConfiguredBucketName(), contentPath);
} catch (Exception e) {
log.warn("caught exception attempting to delete during cleanup", e);
}
}

/**
* Returns path for blob-id content file relative to root directory.
*/
Expand Down Expand Up @@ -473,7 +483,7 @@ public InputStream getInputStream() {
}

com.google.cloud.storage.Blob getBlob() {
return bucket.get(contentPath(getId()));
return bucket.get(contentPath(getId()), BlobGetOption.fields(BlobField.MEDIA_LINK));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.google.cloud.ReadChannel;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.Storage.BlobField;
import com.google.cloud.storage.Storage.BlobGetOption;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -45,7 +47,7 @@ public GoogleCloudPropertiesFile(final Bucket bucket, final String key) {
public void load() throws IOException {
log.debug("Loading properties: {}", key);

Blob blob = bucket.get(key);
Blob blob = bucket.get(key, BlobGetOption.fields(BlobField.MEDIA_LINK));
try (ReadChannel channel = blob.reader()) {
load(Channels.newInputStream(channel));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@
import com.google.cloud.http.HttpTransportOptions;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import org.apache.http.client.HttpClient;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingClientConnectionManager;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.params.BasicHttpParams;
import org.apache.http.params.CoreConnectionPNames;
import org.apache.shiro.util.StringUtils;

import static org.sonatype.nexus.blobstore.gcloud.internal.GoogleCloudBlobStore.CONFIG_KEY;
Expand All @@ -44,9 +51,33 @@ Storage create(final BlobStoreConfiguration configuration) throws Exception {
return builder.build().getService();
}

/**
* This method overrides the default {@link com.google.auth.http.HttpTransportFactory} with the Apache HTTP Client
* backed implementation. In addition, it modifies the {@link HttpClient} used internally to use a
* {@link PoolingClientConnectionManager}.
*
* Note: at time of writing, this method uses deprecated classes that have been replaced in HttpClient with
* {@link HttpClientBuilder}. We cannot use {@link HttpClientBuilder} currently because of a problem with the
* Google Cloud Storage library's {@link ApacheHttpTransport} constructor; the {@link HttpClient} instance
* returned by {@link HttpClientBuilder#build()} throws an {@link UnsupportedOperationException} for
* {@link HttpClient#getParams()}.
*
* @see PoolingHttpClientConnectionManager
* @see HttpClientBuilder
* @return customized {@link TransportOptions} to use for our {@link Storage} client instance
*/
TransportOptions transportOptions() {
return HttpTransportOptions.newBuilder()
.setHttpTransportFactory(() -> new ApacheHttpTransport())
.build();
// replicate default connection and protocol parameters used within {@link ApacheHttpTransport}
PoolingClientConnectionManager connManager = new PoolingClientConnectionManager();
connManager.setDefaultMaxPerRoute(20);
connManager.setMaxTotal(200);
BasicHttpParams params = new BasicHttpParams();
params.setParameter(CoreConnectionPNames.STALE_CONNECTION_CHECK, false);
params.setParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8192);
DefaultHttpClient client = new DefaultHttpClient(connManager, params);

return HttpTransportOptions.newBuilder()
.setHttpTransportFactory(() -> new ApacheHttpTransport(client))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import com.google.api.gax.paging.Page
import com.google.cloud.storage.Bucket
import com.google.cloud.storage.Storage
import com.google.cloud.storage.Storage.BlobListOption
import org.apache.commons.io.IOUtils
import spock.lang.Specification

class GoogleCloudBlobStoreTest
Expand Down Expand Up @@ -131,8 +132,8 @@ class GoogleCloudBlobStoreTest
storage.get('mybucket') >> bucket
blobStore.init(config)
blobStore.doStart()
bucket.get('content/existing.properties') >> mockGoogleObject(tempFileAttributes)
bucket.get('content/existing.bytes') >> mockGoogleObject(tempFileBytes)
bucket.get('content/existing.properties', _) >> mockGoogleObject(tempFileAttributes)
bucket.get('content/existing.bytes', _) >> mockGoogleObject(tempFileBytes)

when: 'call create'
Blob blob = blobStore.get(new BlobId('existing'))
Expand Down Expand Up @@ -168,7 +169,7 @@ class GoogleCloudBlobStoreTest
def 'start will accept a metadata.properties originally created with file blobstore'() {
given: 'metadata.properties comes from a file blobstore'
storage.get('mybucket') >> bucket
2 * bucket.get('metadata.properties') >> mockGoogleObject(fileMetadata)
2 * bucket.get('metadata.properties', _) >> mockGoogleObject(fileMetadata)

when: 'doStart is called'
blobStore.init(config)
Expand All @@ -182,7 +183,7 @@ class GoogleCloudBlobStoreTest
given: 'metadata.properties comes from some unknown blobstore'
storage.get('mybucket') >> bucket
storage.get('mybucket') >> bucket
2 * bucket.get('metadata.properties') >> mockGoogleObject(otherMetadata)
2 * bucket.get('metadata.properties', _) >> mockGoogleObject(otherMetadata)

when: 'doStart is called'
blobStore.init(config)
Expand All @@ -197,7 +198,7 @@ class GoogleCloudBlobStoreTest
storage.get('mybucket') >> bucket
blobStore.init(config)
blobStore.doStart()
bucket.get('content/existing.properties') >> mockGoogleObject(tempFileAttributes)
bucket.get('content/existing.properties', _) >> mockGoogleObject(tempFileAttributes)

when: 'call exists'
boolean exists = blobStore.exists(new BlobId('existing'))
Expand All @@ -224,7 +225,7 @@ class GoogleCloudBlobStoreTest
storage.get('mybucket') >> bucket
blobStore.init(config)
blobStore.doStart()
bucket.get('content/existing.properties') >> { throw new IOException("this is a test") }
bucket.get('content/existing.properties', _) >> { throw new IOException("this is a test") }

when: 'call exists'
blobStore.exists(new BlobId('existing'))
Expand All @@ -236,6 +237,7 @@ class GoogleCloudBlobStoreTest
private mockGoogleObject(File file) {
com.google.cloud.storage.Blob blob = Mock()
blob.reader() >> new DelegatingReadChannel(FileChannel.open(file.toPath()))
blob.getContent() >> IOUtils.toByteArray(new FileInputStream(file))
blob
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import java.nio.channels.FileChannel

import com.google.cloud.storage.Blob
import com.google.cloud.storage.Bucket
import org.apache.commons.io.IOUtils
import spock.lang.Specification

/**
Expand All @@ -42,7 +43,8 @@ class GoogleCloudPropertiesFileTest
def setup() {
Blob blob = Mock()
blob.reader() >> new DelegatingReadChannel(FileChannel.open(tempFile.toPath()))
bucket.get('mykey') >> blob
blob.getContent() >> IOUtils.toByteArray(new FileInputStream(tempFile))
bucket.get('mykey', _) >> blob
}

def "Load ingests properties from google cloud storage object"() {
Expand Down

0 comments on commit e8626bc

Please sign in to comment.