Skip to content

Commit

Permalink
Move generating of PUT & COPY INTO queries into SnowflakeQueryBuilder…
Browse files Browse the repository at this point in the history
…, update tests
  • Loading branch information
romantmb committed Mar 13, 2024
1 parent d89c438 commit f7ba548
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 87 deletions.
12 changes: 6 additions & 6 deletions composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion phpstan.neon
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
parameters:
checkMissingIterableValueType: false
ignoreErrors:
- '#Call to an undefined method Symfony\\Component\\Config\\Definition\\Builder\\NodeDefinition::arrayPrototype\(\)\.#'
70 changes: 62 additions & 8 deletions src/Writer/SnowflakeQueryBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,6 @@ public function createQueryStatement(
);
}

public function dropStageStatement(Connection $connection, string $stageName): string
{
return sprintf(
'DROP STAGE IF EXISTS %s',
$connection->quoteIdentifier($stageName),
);
}

public function tableExistsQueryStatement(Connection $connection, string $tableName): string
{
return sprintf(
Expand All @@ -73,6 +65,68 @@ public function addPrimaryKeyQueryStatement(Connection $connection, string $tabl
);
}

public function putFileQueryStatement(Connection $connection, string $tableFilePath, string $tmpTableName): string
{
$warehouse = $this->databaseConfig->hasWarehouse() ? $this->databaseConfig->getWarehouse() : null;
$database = $this->databaseConfig->getDatabase();
$schema = $this->databaseConfig->hasSchema() ? $this->databaseConfig->getSchema() : null;

$sql = [];
if ($warehouse) {
$sql[] = sprintf('USE WAREHOUSE %s;', $connection->quoteIdentifier($warehouse));
}

$sql[] = sprintf('USE DATABASE %s;', $connection->quoteIdentifier($database));

if ($schema) {
$sql[] = sprintf(
'USE SCHEMA %s.%s;',
$connection->quoteIdentifier($database),
$connection->quoteIdentifier($schema),
);
}

$sql[] = sprintf(
'PUT file://%s @~/%s;',
$tableFilePath,
$tmpTableName,
);

return trim(implode("\n", $sql));
}

public function copyIntoTableQueryStatement(Connection $connection, string $tmpTableName, array $items): string
{
$csvOptions = [
'SKIP_HEADER = 1',
sprintf('FIELD_DELIMITER = %s', $connection->quote(',')),
sprintf('FIELD_OPTIONALLY_ENCLOSED_BY = %s', $connection->quote('"')),
sprintf('ESCAPE_UNENCLOSED_FIELD = %s', $connection->quote('\\')),
sprintf('COMPRESSION = %s', $connection->quote('GZIP')),
];

$tmpTableNameWithSchema = sprintf(
'%s.%s',
$connection->quoteIdentifier($this->databaseConfig->getSchema()),
$connection->quoteIdentifier($tmpTableName),
);

$columns = array_map(fn(ItemConfig $column) => $connection->quoteIdentifier($column->getDbName()), $items);

return sprintf(
'
COPY INTO %s(%s)
FROM @~/%s
FILE_FORMAT = (TYPE=CSV %s)
;
',
$tmpTableNameWithSchema,
implode(', ', $columns),
$tmpTableName,
implode(' ', $csvOptions),
);
}

public function upsertUpdateRowsQueryStatement(
Connection $connection,
ExportConfig $exportConfig,
Expand Down
71 changes: 3 additions & 68 deletions src/Writer/SnowflakeWriteAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public function writeData(string $tableName, ExportConfig $exportConfig): void

// Copy from internal stage to staging table
$this->logger->info(sprintf('Copying data from internal stage to staging table "%s"', $tableName));
$query = $this->generateCopyQuery($exportConfig, $tableName, $items);
$query = $this->queryBuilder->copyIntoTableQueryStatement($this->connection, $tableName, $items);
$this->logger->debug($query);
$this->connection->exec($query);
} finally {
Expand All @@ -71,7 +71,8 @@ public function writeData(string $tableName, ExportConfig $exportConfig): void

private function putIntoInternalStage(ExportConfig $exportConfig, string $tmpTableName): void
{
$putSql = $this->generatePutQuery($exportConfig, $tmpTableName);
$putSql = $this->queryBuilder
->putFileQueryStatement($this->connection, $exportConfig->getTableFilePath(), $tmpTableName);

$sqlFile = $this->tempDir->createTmpFile('snowsql.sql');
file_put_contents($sqlFile->getPathname(), $putSql);
Expand Down Expand Up @@ -105,72 +106,6 @@ private function cleanupInternalStage(string $tmpTableName): void
$this->connection->exec($sql);
}

public function generatePutQuery(ExportConfig $exportConfig, string $tmpTableName): string
{
/** @var SnowflakeDatabaseConfig $databaseConfig */
$databaseConfig = $exportConfig->getDatabaseConfig();

$warehouse = $databaseConfig->hasWarehouse() ? $databaseConfig->getWarehouse() : null;
$database = $databaseConfig->getDatabase();
$schema = $databaseConfig->hasSchema() ? $databaseConfig->getSchema() : null;

$sql = [];
if ($warehouse) {
$sql[] = sprintf('USE WAREHOUSE %s;', $this->quoteIdentifier($warehouse));
}

$sql[] = sprintf('USE DATABASE %s;', $this->quoteIdentifier($database));

if ($schema) {
$sql[] = sprintf(
'USE SCHEMA %s.%s;',
$this->quoteIdentifier($database),
$this->quoteIdentifier($schema),
);
}

$sql[] = sprintf(
'PUT file://%s @~/%s;',
$exportConfig->getTableFilePath(),
$tmpTableName,
);

return trim(implode("\n", $sql));
}

/**
* @param array<ItemConfig> $items
*/
public function generateCopyQuery(ExportConfig $exportConfig, string $tmpTableName, array $items): string
{
$csvOptions = [
'SKIP_HEADER = 1',
sprintf('FIELD_DELIMITER = %s', $this->quote(',')),
sprintf('FIELD_OPTIONALLY_ENCLOSED_BY = %s', $this->quote('"')),
sprintf('ESCAPE_UNENCLOSED_FIELD = %s', $this->quote('\\')),
sprintf('COMPRESSION = %s', $this->quote('GZIP')),
];

$tmpTableNameWithSchema = sprintf(
'%s.%s',
$this->quoteIdentifier($exportConfig->getDatabaseConfig()->getSchema()),
$this->quoteIdentifier($tmpTableName),
);

return sprintf(
'
COPY INTO %s(%s)
FROM @~/%s
FILE_FORMAT = (TYPE=CSV %s)
;
',
$tmpTableNameWithSchema,
implode(', ', $this->quoteManyIdentifiers($items, fn(ItemConfig $column) => $column->getDbName())),
$tmpTableName,
implode(' ', $csvOptions),
);
}

public function upsert(ExportConfig $exportConfig, string $stageTableName): void
{
$this->logger->info(sprintf('Upserting data to table "%s"', $exportConfig->getDbName()));
Expand Down
17 changes: 13 additions & 4 deletions tests/phpunit/SnowflakeTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,12 @@ public function queryTaggingProvider(): array
public function testGeneratePutQuery(): void
{
$config = $this->getConfig('simple');
$adapter = $this->getWriteAdapter($config);
$exportConfig = $this->getExportConfig($config);
$connection = $this->getConnection($config);

/** @var SnowflakeDatabaseConfig $databaseConfig */
$databaseConfig = $exportConfig->getDatabaseConfig();
$queryBuilder = new SnowflakeQueryBuilder($databaseConfig);

$schema = $config['parameters']['db']['schema'];
$database = $config['parameters']['db']['database'];
Expand All @@ -353,7 +357,8 @@ public function testGeneratePutQuery(): void
USE SCHEMA \"$database\".\"$schema\";
PUT file:///code/tests/phpunit/in/tables/simple.csv @~/simple_temp;";

$actual = $adapter->generatePutQuery($exportConfig, 'simple_temp');
$tableFilePath = $exportConfig->getTableFilePath();
$actual = $queryBuilder->putFileQueryStatement($connection, $tableFilePath, 'simple_temp');

Assert::assertSame($expected, $actual);
}
Expand All @@ -364,8 +369,12 @@ public function testGeneratePutQuery(): void
public function testGenerateCopyQuery(): void
{
$config = $this->getConfig('simple');
$adapter = $this->getWriteAdapter($config);
$exportConfig = $this->getExportConfig($config);
$connection = $this->getConnection($config);

/** @var SnowflakeDatabaseConfig $databaseConfig */
$databaseConfig = $exportConfig->getDatabaseConfig();
$queryBuilder = new SnowflakeQueryBuilder($databaseConfig);

$schema = $config['parameters']['db']['schema'];

Expand All @@ -376,7 +385,7 @@ public function testGenerateCopyQuery(): void
;
";

$actual = $adapter->generateCopyQuery($exportConfig, 'simple_temp', $exportConfig->getItems());
$actual = $queryBuilder->copyIntoTableQueryStatement($connection, 'simple_temp', $exportConfig->getItems());

Assert::assertSame($expected, $actual);
}
Expand Down

0 comments on commit f7ba548

Please sign in to comment.