Skip to content

Commit

Permalink
Upgrade Snowflake to 2.1.2 - Try with System.getenv() to get privateK…
Browse files Browse the repository at this point in the history
…ey and Pass for fixing Its
  • Loading branch information
EduardHantig committed Feb 2, 2024
1 parent ae72fc2 commit ebc416b
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package com.snowflake.kafka.connector;

import com.snowflake.client.jdbc.SnowflakeDriver;
import com.snowflake.kafka.connector.internal.InternalUtils;
import com.snowflake.kafka.connector.internal.SnowflakeURL;
import com.streamkap.common.test.TestUtils;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import java.util.Properties;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.transforms.ReplaceField;
import org.junit.jupiter.api.Test;
Expand All @@ -25,11 +29,26 @@ public SnowflakeStreamkapSinkIT() throws Exception {
"account:_account,all:_all,alter:_alter,and:_and,any:_any,as:_as,between:_between,by:_by,case:_case,cast:_cast,check:_check,column:_column,connect:_connect,connection:_connection,constraint:_constraint,create:_create,cross:_cross,current:_current,current_date:_current_date,current_time:_current_time,current_timestamp:_current_timestamp,current_user:_current_user,database:_database,delete:_delete,distinct:_distinct,drop:_drop,else:_else,exists:_exists,false:_false,following:_following,for:_for,from:_from,full:_full,grant:_grant,group:_group,gscluster:_gscluster,having:_having,ilike:_ilike,in:_in,increment:_increment,inner:_inner,insert:_insert,intersect:_intersect,into:_into,is:_is,issue:_issue,join:_join,lateral:_lateral,left:_left,like:_like,localtime:_localtime,localtimestamp:_localtimestamp,minus:_minus,natural:_natural,not:_not,null:_null,of:_of,on:_on,or:_or,order:_order,organization:_organization,qualify:_qualify,regexp:_regexp,revoke:_revoke,right:_right,rlike:_rlike,row:_row,rows:_rows,sample:_sample,schema:_schema,select:_select,set:_set,some:_some,start:_start,table:_table,tablesample:_tablesample,then:_then,to:_to,trigger:_trigger,true:_true,try_cast:_try_cast,union:_union,unique:_unique,update:_update,using:_using,values:_values,view:_view,when:_when,whenever:_whenever,where:_where,with:_with");
renameAmbigiousFields.configure(config);

Map<String, String> conf = new HashMap<>();
conf.put(Utils.SF_USER, "STREAMKAP_USER_JUNIT");
conf.put(Utils.SF_DATABASE, "JUNIT");
conf.put(Utils.SF_SCHEMA, "JUNIT");
conf.put(Utils.SF_URL, "sab25080.prod3.us-west-2.aws.snowflakecomputing.com");
conf.put(Utils.SF_WAREHOUSE, "STREAMKAP_WH");
conf.put(Utils.SF_PRIVATE_KEY, System.getenv("SNOWFLAKE_KEY"));
conf.put(Utils.PRIVATE_KEY_PASSPHRASE, System.getenv("SNOWFLAKE_KEY_PASS"));
conf.put(Utils.NAME, "TEST_CONNECTOR");
conf.put(Utils.TASK_ID, "");

SnowflakeURL url = new SnowflakeURL(conf.get(Utils.SF_URL));

Properties properties = InternalUtils.createProperties(conf, url);

super.init(new TestUtils() {
@Override
protected Connection createCon() {
try {
return com.snowflake.kafka.connector.internal.TestUtils.generateConnectionToSnowflakeWithEncryptedKey();
return new SnowflakeDriver().connect(url.getJdbcUrl(), properties);
} catch (Exception e) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
package com.snowflake.kafka.connector;

import com.snowflake.client.jdbc.SnowflakeDriver;
import com.snowflake.kafka.connector.internal.InternalUtils;
import com.snowflake.kafka.connector.internal.SnowflakeURL;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import java.util.Properties;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.transforms.ReplaceField;
import org.junit.jupiter.api.Test;

import com.snowflake.kafka.connector.internal.TestUtils;
import com.streamkap.common.test.sink.StreamkapSinkITBase;

public class StreamkapITSnowflake extends StreamkapSinkITBase<SnowflakeSinkTask> {
Expand All @@ -25,11 +28,26 @@ public StreamkapITSnowflake() throws Exception {
"account:_account,all:_all,alter:_alter,and:_and,any:_any,as:_as,between:_between,by:_by,case:_case,cast:_cast,check:_check,column:_column,connect:_connect,connection:_connection,constraint:_constraint,create:_create,cross:_cross,current:_current,current_date:_current_date,current_time:_current_time,current_timestamp:_current_timestamp,current_user:_current_user,database:_database,delete:_delete,distinct:_distinct,drop:_drop,else:_else,exists:_exists,false:_false,following:_following,for:_for,from:_from,full:_full,grant:_grant,group:_group,gscluster:_gscluster,having:_having,ilike:_ilike,in:_in,increment:_increment,inner:_inner,insert:_insert,intersect:_intersect,into:_into,is:_is,issue:_issue,join:_join,lateral:_lateral,left:_left,like:_like,localtime:_localtime,localtimestamp:_localtimestamp,minus:_minus,natural:_natural,not:_not,null:_null,of:_of,on:_on,or:_or,order:_order,organization:_organization,qualify:_qualify,regexp:_regexp,revoke:_revoke,right:_right,rlike:_rlike,row:_row,rows:_rows,sample:_sample,schema:_schema,select:_select,set:_set,some:_some,start:_start,table:_table,tablesample:_tablesample,then:_then,to:_to,trigger:_trigger,true:_true,try_cast:_try_cast,union:_union,unique:_unique,update:_update,using:_using,values:_values,view:_view,when:_when,whenever:_whenever,where:_where,with:_with");
renameAmbigiousFields.configure(config);

Map<String, String> conf = new HashMap<>();
conf.put(Utils.SF_USER, "STREAMKAP_USER_JUNIT");
conf.put(Utils.SF_DATABASE, "JUNIT");
conf.put(Utils.SF_SCHEMA, "JUNIT");
conf.put(Utils.SF_URL, "sab25080.prod3.us-west-2.aws.snowflakecomputing.com");
conf.put(Utils.SF_WAREHOUSE, "STREAMKAP_WH");
conf.put(Utils.SF_PRIVATE_KEY, System.getenv("SNOWFLAKE_KEY"));
conf.put(Utils.PRIVATE_KEY_PASSPHRASE, System.getenv("SNOWFLAKE_KEY_PASS"));
conf.put(Utils.NAME, "TEST_CONNECTOR");
conf.put(Utils.TASK_ID, "");

SnowflakeURL url = new SnowflakeURL(conf.get(Utils.SF_URL));

Properties properties = InternalUtils.createProperties(conf, url);

super.init(new com.streamkap.common.test.TestUtils() {
@Override
protected Connection createCon() {
try {
return TestUtils.generateConnectionToSnowflakeWithEncryptedKey();
return new SnowflakeDriver().connect(url.getJdbcUrl(), properties);
} catch (Exception e) {
return null;
}
Expand Down

0 comments on commit ebc416b

Please sign in to comment.