Skip to content

Commit

Permalink
More refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
grishick committed Dec 24, 2022
1 parent 9047f39 commit a3f7ed2
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 184 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ protected BigQuery getBigQuery(final JsonNode config) {
}
}

private static GoogleCredentials getServiceAccountCredentials(final JsonNode config) throws IOException {
public static GoogleCredentials getServiceAccountCredentials(final JsonNode config) throws IOException {
if (!BigQueryUtils.isUsingJsonCredentials(config)) {
LOGGER.info("No service account key json is provided. It is required if you are using Airbyte cloud.");
LOGGER.info("Using the default service account credential from environment.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@
import com.google.cloud.bigquery.ConnectionProperty;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.FieldList;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;
import com.google.common.collect.Streams;
Expand All @@ -33,23 +30,20 @@
import java.util.List;
import java.util.Optional;
import java.util.TimeZone;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.junit.jupiter.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BigQueryDestinationAcceptanceTest extends DestinationAcceptanceTest {
public abstract class AbstractBigQueryDestinationAcceptanceTest extends DestinationAcceptanceTest {

private static final NamingConventionTransformer NAME_TRANSFORMER = new BigQuerySQLNameTransformer();
private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDestinationAcceptanceTest.class);
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractBigQueryDestinationAcceptanceTest.class);

protected static final String CONFIG_PROJECT_ID = "project_id";
protected static final String CONFIG_DATASET_LOCATION = "dataset_location";

protected Path secretsFile;
protected BigQuery bigquery;
protected Dataset dataset;
protected boolean bqTornDown = false;

protected JsonNode config;
protected final StandardNameTransformer namingResolver = new StandardNameTransformer();
Expand Down Expand Up @@ -155,7 +149,7 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv env,
.collect(Collectors.toList());
}

private List<JsonNode> retrieveRecordsFromTable(final String tableName, final String schema) throws InterruptedException {
protected List<JsonNode> retrieveRecordsFromTable(final String tableName, final String schema) throws InterruptedException {
TimeZone.setDefault(TimeZone.getTimeZone("UTC"));

final QueryJobConfiguration queryConfig =
Expand All @@ -167,75 +161,27 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St
.setConnectionProperties(Collections.singletonList(ConnectionProperty.of("time_zone", "UTC")))
.build();

final TableResult queryResults = executeQuery(bigquery, queryConfig).getLeft().getQueryResults();
final TableResult queryResults = BigQueryUtils.executeQuery(bigquery, queryConfig).getLeft().getQueryResults();
final FieldList fields = queryResults.getSchema().getFields();
BigQuerySourceOperations sourceOperations = new BigQuerySourceOperations();

return Streams.stream(queryResults.iterateAll())
.map(fieldValues -> sourceOperations.rowToJson(new BigQueryResultSet(fieldValues, fields))).collect(Collectors.toList());
}

@Override
protected void setup(final TestDestinationEnv testEnv) throws Exception {
protected void setUpBigQuery() throws IOException {
//secrets file should be set by the inhereting class
Assertions.assertNotNull(secretsFile);
final String datasetId = Strings.addRandomSuffix("airbyte_tests", "_", 8);
config = BigQueryDestinationTestUtils.createConfig(Path.of("secrets/credentials-standard.json"), datasetId);
bigquery = null;
dataset = null;
bqTornDown = false;

setUpBigQuery(config, datasetId);
addShutdownHook();
}
config = BigQueryDestinationTestUtils.createConfig(secretsFile, datasetId);

protected void setUpBigQuery(JsonNode config, String datasetId) throws IOException {
String projectId = config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText();
final String projectId = config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText();
bigquery = BigQueryDestinationTestUtils.initBigQuery(config, projectId);
try {
dataset = BigQueryDestinationTestUtils.initDataSet(config, bigquery, datasetId);
} catch(Exception ex) {
//ignore
}
}

protected void addShutdownHook() {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (!bqTornDown) {
bqTornDown = BigQueryDestinationTestUtils.tearDownBigQuery(bigquery, dataset, LOGGER);
}
}));
}

@Override
protected void tearDown(final TestDestinationEnv testEnv) {
bqTornDown = BigQueryDestinationTestUtils.tearDownBigQuery(bigquery, dataset, LOGGER);
}

// todo (cgardens) - figure out how to share these helpers. they are currently copied from
// BigQueryDestination.
private static ImmutablePair<Job, String> executeQuery(final BigQuery bigquery, final QueryJobConfiguration queryConfig) {
final JobId jobId = JobId.of(UUID.randomUUID().toString());
final Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
return executeQuery(queryJob);
dataset = BigQueryDestinationTestUtils.initDataSet(config, bigquery, datasetId);
}

private static ImmutablePair<Job, String> executeQuery(final Job queryJob) {
final Job completedJob = waitForQuery(queryJob);
if (completedJob == null) {
throw new RuntimeException("Job no longer exists");
} else if (completedJob.getStatus().getError() != null) {
// You can also look at queryJob.getStatus().getExecutionErrors() for all
// errors, not just the latest one.
return ImmutablePair.of(null, (completedJob.getStatus().getError().toString()));
}

return ImmutablePair.of(completedJob, null);
protected void tearDownBigQuery() {
BigQueryDestinationTestUtils.tearDownBigQuery(bigquery, dataset, LOGGER);
}

private static Job waitForQuery(final Job queryJob) {
try {
return queryJob.waitFor();
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
import static org.mockito.Mockito.spy;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.Dataset;
Expand Down Expand Up @@ -53,7 +51,6 @@
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -112,7 +109,6 @@ class BigQueryDestinationTest {
private static final NamingConventionTransformer NAMING_RESOLVER = new BigQuerySQLNameTransformer();

protected static String projectId;
protected JsonNode credentialsJson;
protected static String datasetId;
protected static JsonNode config;
protected static JsonNode configWithProjectId;
Expand All @@ -124,8 +120,6 @@ class BigQueryDestinationTest {
protected Dataset dataset;
protected static Map<String, JsonNode> configs;
protected static ConfiguredAirbyteCatalog catalog;
protected boolean bqTornDown = false;
protected boolean gcsTornDown = false;

private AmazonS3 s3Client;

Expand Down Expand Up @@ -162,13 +156,13 @@ public static void beforeAll() throws IOException {
throw new IllegalStateException("""
Json config not found. Must provide path to a big query credentials file,
please add file with creds to
../destination-bigquery/secrets/credentials-with-missed-dataset-creation-role.json.""");
<...>/destination-bigquery/secrets/credentials-with-missed-dataset-creation-role.json.""");
}
if (!Files.exists(CREDENTIALS_NON_BILLABLE_PROJECT_PATH)) {
throw new IllegalStateException("""
Json config not found. Must provide path to a big query credentials file,
please add file with creds to
../destination-bigquery/secrets/credentials-non-billable-project.json""");
<...>/destination-bigquery/secrets/credentials-non-billable-project.json""");
}
if (!Files.exists(CREDENTIALS_WITH_GCS_STAGING_PATH)) {
throw new IllegalStateException(
Expand Down Expand Up @@ -238,64 +232,18 @@ void setup(final TestInfo info) throws IOException {
}
bigquery = null;
dataset = null;
bqTornDown = false;
gcsTornDown = false;
final GcsDestinationConfig gcsDestinationConfig = GcsDestinationConfig
.getGcsDestinationConfig(BigQueryUtils.getGcsJsonNodeConfig(gcsStagingConfig));
this.s3Client = gcsDestinationConfig.getS3Client();

addShutdownHook();
}

protected void addShutdownHook() {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (!bqTornDown) {
bqTornDown = BigQueryDestinationTestUtils.tearDownBigQuery(bigquery, dataset, LOGGER);
}
if(!gcsTornDown) {
tearDownGcs();
}
}));
}

@AfterEach
void tearDown(final TestInfo info) {
if (info.getDisplayName().equals("testSpec()")) {
return;
}
bqTornDown = BigQueryDestinationTestUtils.tearDownBigQuery(bigquery, dataset, LOGGER);
tearDownGcs();
}

/**
* Remove all the GCS output from the tests.
*/
protected void tearDownGcs() {
if(this.s3Client == null) {
return;
}

final JsonNode properties = gcsStagingConfig.get(BigQueryConsts.LOADING_METHOD);
final String gcsBucketName = properties.get(BigQueryConsts.GCS_BUCKET_NAME).asText();
final String gcs_bucket_path = properties.get(BigQueryConsts.GCS_BUCKET_PATH).asText();

final List<KeyVersion> keysToDelete = new LinkedList<>();
final List<S3ObjectSummary> objects = s3Client
.listObjects(gcsBucketName, gcs_bucket_path)
.getObjectSummaries();
for (final S3ObjectSummary object : objects) {
keysToDelete.add(new KeyVersion(object.getKey()));
}

if (keysToDelete.size() > 0) {
LOGGER.info("Tearing down test bucket path: {}/{}", gcsBucketName, gcs_bucket_path);
// Google Cloud Storage doesn't accept request to delete multiple objects
for (final KeyVersion keyToDelete : keysToDelete) {
s3Client.deleteObject(gcsBucketName, keyToDelete.getKey());
}
LOGGER.info("Deleted {} file(s).", keysToDelete.size());
}
gcsTornDown = true;
BigQueryDestinationTestUtils.tearDownBigQuery(bigquery, dataset, LOGGER);
BigQueryDestinationTestUtils.tearDownGcs(s3Client, config, LOGGER);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,14 @@
import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
import io.airbyte.commons.json.Jsons;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.LinkedList;
Expand All @@ -23,25 +20,28 @@

public class BigQueryDestinationTestUtils {

/**
* Parse the config file and replace dataset with datasetId randomly generated by the test
* @param configFile
* @param datasetId
* @return
* @throws IOException
*/
public static JsonNode createConfig(Path configFile, String datasetId) throws IOException {
final String tmpConfigAsString = Files.readString(configFile);
final JsonNode tmpConfigJson = Jsons.deserialize(tmpConfigAsString);
final JsonNode tmpCredentialsJson = tmpConfigJson.get(BigQueryConsts.BIGQUERY_BASIC_CONFIG);
Builder<Object, Object> mapBuilder = ImmutableMap.builder();
mapBuilder.put(BigQueryConsts.CONFIG_PROJECT_ID, tmpCredentialsJson.get(BigQueryConsts.CONFIG_PROJECT_ID).asText());
mapBuilder.put(BigQueryConsts.CONFIG_CREDS, tmpCredentialsJson.toString());
mapBuilder.put(BigQueryConsts.CONFIG_DATASET_ID, datasetId);
mapBuilder.put(BigQueryConsts.CONFIG_DATASET_LOCATION, tmpConfigJson.get(BigQueryConsts.CONFIG_DATASET_LOCATION).asText());

//if current test config includes GCS staging - add the staging configuration to the returned config object
if(tmpConfigJson.has(BigQueryConsts.LOADING_METHOD)) {
final JsonNode loadingMethodJson = tmpConfigJson.get(BigQueryConsts.LOADING_METHOD);
mapBuilder.put(BigQueryConsts.LOADING_METHOD, loadingMethodJson);
}

return Jsons.jsonNode(mapBuilder.build());
return Jsons.jsonNode(((ObjectNode)tmpConfigJson).put(BigQueryConsts.CONFIG_DATASET_ID, datasetId));
}

/**
* Get a handle for the BigQuery dataset instance used by the test.
* This dataset instance will be used to verify results of test operations
* and for cleaning up after the test runs
* @param config
* @param bigquery
* @param datasetId
* @return
*/
public static Dataset initDataSet(JsonNode config, BigQuery bigquery, String datasetId) {
final DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId)
.setLocation(config.get(BigQueryConsts.CONFIG_DATASET_LOCATION).asText()).build();
Expand All @@ -55,44 +55,57 @@ public static Dataset initDataSet(JsonNode config, BigQuery bigquery, String dat
return null;
}

/**
* Initialized bigQuery instance that will be used for verifying results of test operations
* and for cleaning up BigQuery dataset after the test
* @param config
* @param projectId
* @return
* @throws IOException
*/
public static BigQuery initBigQuery(JsonNode config, String projectId) throws IOException {
final ServiceAccountCredentials credentials = ServiceAccountCredentials
.fromStream(new ByteArrayInputStream(config.get(BigQueryConsts.CONFIG_CREDS).asText().getBytes(StandardCharsets.UTF_8)));
final GoogleCredentials credentials = BigQueryDestination.getServiceAccountCredentials(config);
return BigQueryOptions.newBuilder()
.setProjectId(projectId)
.setCredentials(credentials)
.build()
.getService();
}

public static boolean tearDownBigQuery(BigQuery bigquery, Dataset dataset, Logger LOGGER) {
/**
* Deletes bigquery data set created during the test
* @param bigquery
* @param dataset
* @param LOGGER
*/
public static void tearDownBigQuery(BigQuery bigquery, Dataset dataset, Logger LOGGER) {
// allows deletion of a dataset that has contents
final BigQuery.DatasetDeleteOption option = BigQuery.DatasetDeleteOption.deleteContents();
if(bigquery == null || dataset == null) {
return false;
return;
}
try {
final boolean success = bigquery.delete(dataset.getDatasetId(), option);
if (success) {
LOGGER.info("BQ Dataset " + dataset + " deleted...");
return true;
} else {
LOGGER.info("BQ Dataset cleanup for " + dataset + " failed!");
return false;
}
} catch (Exception ex) {
return false;
LOGGER.error("Failed to remove BigQuery resources after the test", ex);
}
}

/**
* Remove all the GCS output from the tests.
*/
public static boolean tearDownGcs(AmazonS3 s3Client, JsonNode config, Logger LOGGER) {
public static void tearDownGcs(AmazonS3 s3Client, JsonNode config, Logger LOGGER) {
if(s3Client == null) {
return false;
return;
}
if(BigQueryUtils.getLoadingMethod(config) != UploadingMethod.GCS) {
return;
}

final JsonNode properties = config.get(BigQueryConsts.LOADING_METHOD);
final String gcsBucketName = properties.get(BigQueryConsts.GCS_BUCKET_NAME).asText();
final String gcs_bucket_path = properties.get(BigQueryConsts.GCS_BUCKET_PATH).asText();
Expand All @@ -113,9 +126,8 @@ public static boolean tearDownGcs(AmazonS3 s3Client, JsonNode config, Logger LOG
}
LOGGER.info("Deleted {} file(s).", keysToDelete.size());
}
return true;
} catch (Exception ex) {
return false;
LOGGER.error("Failed to remove GCS resources after the test", ex);
}
}
}
Loading

0 comments on commit a3f7ed2

Please sign in to comment.