Skip to content

Commit

Permalink
Support retrying to create BigQuery read session
Browse files Browse the repository at this point in the history
  • Loading branch information
vlad-lyutenko committed Nov 15, 2022
1 parent ccc06a5 commit 8054f84
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 13 deletions.
5 changes: 5 additions & 0 deletions plugin/trino-bigquery/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,11 @@
<artifactId>validation-api</artifactId>
</dependency>

<dependency>
<groupId>net.jodah</groupId>
<artifactId>failsafe</artifactId>
</dependency>

<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class BigQuerySplitManager
private final boolean viewEnabled;
private final Duration viewExpiration;
private final NodeManager nodeManager;
private final int maxReadRowsRetries;

@Inject
public BigQuerySplitManager(
Expand All @@ -83,6 +84,7 @@ public BigQuerySplitManager(
this.viewEnabled = config.isViewsEnabled();
this.viewExpiration = config.getViewExpireDuration();
this.nodeManager = requireNonNull(nodeManager, "nodeManager cannot be null");
this.maxReadRowsRetries = config.getMaxReadRowsRetries();
}

@Override
Expand Down Expand Up @@ -136,7 +138,7 @@ private List<BigQuerySplit> readFromBigQuery(ConnectorSession session, TableDefi
if (isSkipViewMaterialization(session) && type == VIEW) {
return ImmutableList.of(BigQuerySplit.forViewStream(columns, filter));
}
ReadSession readSession = new ReadSessionCreator(bigQueryClientFactory, bigQueryReadClientFactory, viewEnabled, viewExpiration)
ReadSession readSession = new ReadSessionCreator(bigQueryClientFactory, bigQueryReadClientFactory, viewEnabled, viewExpiration, maxReadRowsRetries)
.create(session, remoteTableId, projectedColumnsNames, filter, actualParallelism);

return readSession.getStreamsList().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ public final class BigQueryUtil
private static final Set<String> INTERNAL_ERROR_MESSAGES = ImmutableSet.of(
"HTTP/2 error code: INTERNAL_ERROR",
"Connection closed with unknown cause",
"Received unexpected EOS on DATA frame from server");
"Received unexpected EOS on DATA frame from server",
"INTERNAL: request failed: internal error");

private BigQueryUtil() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.DataFormat;
import com.google.cloud.bigquery.storage.v1.ReadSession;
import io.airlift.log.Logger;
import io.airlift.units.Duration;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.TableNotFoundException;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;

import java.util.List;
import java.util.Optional;
Expand All @@ -34,26 +37,32 @@
import static com.google.cloud.bigquery.TableDefinition.Type.VIEW;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.lang.String.format;
import static java.time.temporal.ChronoUnit.MILLIS;
import static java.util.stream.Collectors.toList;

// A helper class, also handles view materialization
public class ReadSessionCreator
{
private static final Logger log = Logger.get(ReadSessionCreator.class);

private final BigQueryClientFactory bigQueryClientFactory;
private final BigQueryReadClientFactory bigQueryReadClientFactory;
private final boolean viewEnabled;
private final Duration viewExpiration;
private final int maxCreateReadSessionRetries;

public ReadSessionCreator(
BigQueryClientFactory bigQueryClientFactory,
BigQueryReadClientFactory bigQueryReadClientFactory,
boolean viewEnabled,
Duration viewExpiration)
Duration viewExpiration,
int maxCreateReadSessionRetries)
{
this.bigQueryClientFactory = bigQueryClientFactory;
this.bigQueryReadClientFactory = bigQueryReadClientFactory;
this.viewEnabled = viewEnabled;
this.viewExpiration = viewExpiration;
this.maxCreateReadSessionRetries = maxCreateReadSessionRetries;
}

public ReadSession create(ConnectorSession session, TableId remoteTable, List<String> selectedFields, Optional<String> filter, int parallelism)
Expand All @@ -73,17 +82,21 @@ public ReadSession create(ConnectorSession session, TableId remoteTable, List<St
.addAllSelectedFields(filteredSelectedFields);
filter.ifPresent(readOptions::setRowRestriction);

ReadSession readSession = bigQueryReadClient.createReadSession(
CreateReadSessionRequest.newBuilder()
.setParent("projects/" + client.getProjectId())
.setReadSession(ReadSession.newBuilder()
.setDataFormat(DataFormat.AVRO)
.setTable(toTableResourceName(actualTable.getTableId()))
.setReadOptions(readOptions))
.setMaxStreamCount(parallelism)
.build());
CreateReadSessionRequest createReadSessionRequest = CreateReadSessionRequest.newBuilder()
.setParent("projects/" + client.getProjectId())
.setReadSession(ReadSession.newBuilder()
.setDataFormat(DataFormat.AVRO)
.setTable(toTableResourceName(actualTable.getTableId()))
.setReadOptions(readOptions))
.setMaxStreamCount(parallelism)
.build();

return readSession;
return Failsafe.with(new RetryPolicy<>()
.withMaxRetries(maxCreateReadSessionRetries)
.withBackoff(10, 500, MILLIS)
.onRetry(event -> log.debug("Request failed, retrying: %s", event.getLastFailure()))
.abortOn(failure -> !BigQueryUtil.isRetryable(failure)))
.get(() -> bigQueryReadClient.createReadSession(createReadSessionRequest));
}
}

Expand Down

0 comments on commit 8054f84

Please sign in to comment.