Skip to content

Commit

Permalink
Add more tests for SnowflakeFileTransferAgent (#1293)
Browse files Browse the repository at this point in the history
* add more tests for SnowflakeFileTransferAgent
  • Loading branch information
sfc-gh-ext-simba-lb authored Mar 16, 2023
1 parent 0e49d60 commit 520dd90
Show file tree
Hide file tree
Showing 2 changed files with 196 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,15 @@ public Void call() throws Exception {

if (uploadStream == null) {
try {

// Normal flow will never hit here. This is only for testing purposes
if (isInjectedFileTransferExceptionEnabled()
&& SnowflakeFileTransferAgent.injectedFileTransferException
instanceof FileNotFoundException) {
throw (FileNotFoundException)
SnowflakeFileTransferAgent.injectedFileTransferException;
}

uploadStream = new FileInputStream(srcFilePath);
} catch (FileNotFoundException ex) {
metadata.resultStatus = ResultStatus.ERROR;
Expand Down Expand Up @@ -1411,6 +1420,13 @@ private void uploadStream() throws SnowflakeSQLException {
threadExecutor.shutdown();

try {
// Normal flow will never hit here. This is only for testing purposes
if (isInjectedFileTransferExceptionEnabled()
&& SnowflakeFileTransferAgent.injectedFileTransferException
instanceof InterruptedException) {
throw (InterruptedException) SnowflakeFileTransferAgent.injectedFileTransferException;
}

// wait for the task to complete
uploadTask.get();
} catch (InterruptedException ex) {
Expand Down Expand Up @@ -2273,6 +2289,13 @@ private void filterExistingFiles() throws SnowflakeSQLException {
logger.debug("start dragging object summaries from remote storage");
do {
try {
// Normal flow will never hit here. This is only for testing purposes
if (isInjectedFileTransferExceptionEnabled()
&& SnowflakeFileTransferAgent.injectedFileTransferException
instanceof StorageProviderException) {
throw (StorageProviderException)
SnowflakeFileTransferAgent.injectedFileTransferException;
}
objectSummaries =
storageClient.listObjects(
storeLocation.location,
Expand Down
176 changes: 173 additions & 3 deletions src/test/java/net/snowflake/client/jdbc/FileUploaderLatestIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@
*/
package net.snowflake.client.jdbc;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.*;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.security.NoSuchAlgorithmException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.*;
import java.util.List;
import java.util.Properties;
import net.snowflake.client.ConditionalIgnoreRule;
Expand Down Expand Up @@ -480,4 +481,173 @@ public void testParseCommandException() throws SQLException {
}
SnowflakeFileTransferAgent.setInjectedFileTransferException(null);
}

@Test
public void testPopulateStatusRowsWithSortOn() throws Exception {
Connection con = null;
try {
con = getConnection();
Statement statement = con.createStatement();
statement.execute("create or replace stage testStage");
statement.execute("set-sf-property sort on");
SFSession sfSession = con.unwrap(SnowflakeConnectionV1.class).getSfSession();

// upload files orders_101.csv and orders_100.csv
String command = "PUT file://" + getFullPathFileInResource("") + "/orders_10*.csv @testStage";
SnowflakeFileTransferAgent sfAgent1 =
new SnowflakeFileTransferAgent(command, sfSession, new SFStatement(sfSession));
sfAgent1.execute(); // upload files

// check that source files were sorted
assertEquals(2, sfAgent1.statusRows.size());
assertEquals("orders_100.csv", sfAgent1.getNextRow().get(0).toString());

String getCommand = "GET @testStage file:///tmp";
SnowflakeFileTransferAgent sfAgent2 =
new SnowflakeFileTransferAgent(getCommand, sfSession, new SFStatement(sfSession));
sfAgent2.execute();
// check that files are sorted on download
assertEquals(2, sfAgent2.statusRows.size());
assertEquals("orders_100.csv.gz", sfAgent2.getNextRow().get(0).toString());
} finally {
if (con != null) {
con.createStatement().execute("DROP STAGE if exists testStage");
con.close();
}
}
}

@Test
public void testListObjectsStorageException() throws Exception {
Connection con = null;
// inject the StorageProviderException
SnowflakeFileTransferAgent.setInjectedFileTransferException(
new StorageProviderException(new Exception("could not list objects")));

try {
con = getConnection();
Statement statement = con.createStatement();
statement.execute("create or replace stage testStage");
SFSession sfSession = con.unwrap(SnowflakeConnectionV1.class).getSfSession();
String command = "PUT file://" + getFullPathFileInResource(TEST_DATA_FILE) + " @testStage";
SnowflakeFileTransferAgent sfAgent =
new SnowflakeFileTransferAgent(command, sfSession, new SFStatement(sfSession));

sfAgent.execute();
} catch (SnowflakeSQLException err) {
Assert.assertEquals(200016, err.getErrorCode());
Assert.assertTrue(err.getMessage().contains("Encountered exception during listObjects"));
} finally {
if (con != null) {
con.createStatement().execute("DROP STAGE if exists testStage");
con.close();
}
}
SnowflakeFileTransferAgent.setInjectedFileTransferException(null);
}

@Test
public void testUploadStreamInterruptedException() throws IOException, SQLException {
final String DEST_PREFIX = TEST_UUID + "/testUploadStream";
// inject the InterruptedException
SnowflakeFileTransferAgent.setInjectedFileTransferException(new InterruptedException());
Connection connection = null;
Statement statement = null;

try {
connection = getConnection();

statement = connection.createStatement();

FileBackedOutputStream outputStream = new FileBackedOutputStream(1000000);
outputStream.write("hello".getBytes(StandardCharsets.UTF_8));
outputStream.flush();

// upload the data to user stage under testUploadStream with name hello.txt
connection
.unwrap(SnowflakeConnection.class)
.uploadStream(
"~", DEST_PREFIX, outputStream.asByteSource().openStream(), "hello.txt", false);

} catch (SnowflakeSQLLoggedException err) {
Assert.assertEquals(200003, err.getErrorCode());
} finally {
if (statement != null) {
statement.execute("rm @~/" + DEST_PREFIX);
statement.close();
}
closeSQLObjects(statement, connection);
}
SnowflakeFileTransferAgent.setInjectedFileTransferException(null);
}

@Test
public void testFileTransferStageInfo() throws SQLException {
Connection con = getConnection();
Statement statement = con.createStatement();
statement.execute("CREATE OR REPLACE STAGE testStage");
SFSession sfSession = con.unwrap(SnowflakeConnectionV1.class).getSfSession();

SnowflakeFileTransferAgent sfAgent =
new SnowflakeFileTransferAgent(PUT_COMMAND, sfSession, new SFStatement(sfSession));

StageInfo stageInfo = sfAgent.getStageInfo();
assertEquals(sfAgent.getStageCredentials(), stageInfo.getCredentials());
assertEquals(sfAgent.getStageLocation(), stageInfo.getLocation());

statement.execute("drop stage if exists testStage");
con.close();
}

@Test
public void testFileTransferMappingFromSourceFile() throws SQLException {
Connection con = getConnection();
Statement statement = con.createStatement();
statement.execute("CREATE OR REPLACE STAGE testStage");
SFSession sfSession = con.unwrap(SnowflakeConnectionV1.class).getSfSession();

String command = "PUT file://" + getFullPathFileInResource("") + "/orders_10*.csv @testStage";
SnowflakeFileTransferAgent sfAgent1 =
new SnowflakeFileTransferAgent(command, sfSession, new SFStatement(sfSession));
sfAgent1.execute();

SnowflakeFileTransferAgent sfAgent2 =
new SnowflakeFileTransferAgent(
"GET @testStage file:///tmp/", sfSession, new SFStatement(sfSession));

assertEquals(2, sfAgent2.getSrcToMaterialsMap().size());
assertEquals(2, sfAgent2.getSrcToPresignedUrlMap().size());

statement.execute("drop stage if exists testStage");
con.close();
}

@Test
public void testUploadFileCallableFileNotFound() throws Exception {
// inject the FileNotFoundException
SnowflakeFileTransferAgent.setInjectedFileTransferException(
new FileNotFoundException("file does not exist"));
Connection connection = null;
Statement statement = null;
try {
connection = getConnection();

statement = connection.createStatement();
statement.execute("CREATE OR REPLACE STAGE testStage");
SFSession sfSession = connection.unwrap(SnowflakeConnectionV1.class).getSfSession();

String command = "PUT file://" + getFullPathFileInResource(TEST_DATA_FILE) + " @testStage";
SnowflakeFileTransferAgent sfAgent =
new SnowflakeFileTransferAgent(command, sfSession, new SFStatement(sfSession));
sfAgent.execute();
} catch (Exception err) {
assertEquals(err.getCause(), instanceOf(FileNotFoundException.class));
} finally {
if (connection != null) {
connection.createStatement().execute("DROP STAGE if exists testStage");
connection.close();
}
}
SnowflakeFileTransferAgent.setInjectedFileTransferException(null);
}
}

0 comments on commit 520dd90

Please sign in to comment.