From ebc416b99dad2ebe7b5d840c0e84752455feacc3 Mon Sep 17 00:00:00 2001 From: EduardHantig Date: Fri, 2 Feb 2024 14:52:15 +0200 Subject: [PATCH] Upgrade Snowflake to 2.1.2 - Try with System.getenv() to get privateKey and Pass for fixing Its --- .../connector/SnowflakeStreamkapSinkIT.java | 21 +++++++++++++++++- .../kafka/connector/StreamkapITSnowflake.java | 22 +++++++++++++++++-- 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/src/test/java/com/snowflake/kafka/connector/SnowflakeStreamkapSinkIT.java b/src/test/java/com/snowflake/kafka/connector/SnowflakeStreamkapSinkIT.java index 49a2931ce..e26938200 100644 --- a/src/test/java/com/snowflake/kafka/connector/SnowflakeStreamkapSinkIT.java +++ b/src/test/java/com/snowflake/kafka/connector/SnowflakeStreamkapSinkIT.java @@ -1,5 +1,8 @@ package com.snowflake.kafka.connector; +import com.snowflake.client.jdbc.SnowflakeDriver; +import com.snowflake.kafka.connector.internal.InternalUtils; +import com.snowflake.kafka.connector.internal.SnowflakeURL; import com.streamkap.common.test.TestUtils; import java.sql.Connection; import java.sql.SQLException; @@ -7,6 +10,7 @@ import java.util.List; import java.util.Map; +import java.util.Properties; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.transforms.ReplaceField; import org.junit.jupiter.api.Test; @@ -25,11 +29,26 @@ public SnowflakeStreamkapSinkIT() throws Exception { "account:_account,all:_all,alter:_alter,and:_and,any:_any,as:_as,between:_between,by:_by,case:_case,cast:_cast,check:_check,column:_column,connect:_connect,connection:_connection,constraint:_constraint,create:_create,cross:_cross,current:_current,current_date:_current_date,current_time:_current_time,current_timestamp:_current_timestamp,current_user:_current_user,database:_database,delete:_delete,distinct:_distinct,drop:_drop,else:_else,exists:_exists,false:_false,following:_following,for:_for,from:_from,full:_full,grant:_grant,group:_group,gscluster:_gscluster,having:_having,ilike:_ilike,in:_in,increment:_increment,inner:_inner,insert:_insert,intersect:_intersect,into:_into,is:_is,issue:_issue,join:_join,lateral:_lateral,left:_left,like:_like,localtime:_localtime,localtimestamp:_localtimestamp,minus:_minus,natural:_natural,not:_not,null:_null,of:_of,on:_on,or:_or,order:_order,organization:_organization,qualify:_qualify,regexp:_regexp,revoke:_revoke,right:_right,rlike:_rlike,row:_row,rows:_rows,sample:_sample,schema:_schema,select:_select,set:_set,some:_some,start:_start,table:_table,tablesample:_tablesample,then:_then,to:_to,trigger:_trigger,true:_true,try_cast:_try_cast,union:_union,unique:_unique,update:_update,using:_using,values:_values,view:_view,when:_when,whenever:_whenever,where:_where,with:_with"); renameAmbigiousFields.configure(config); + Map conf = new HashMap<>(); + conf.put(Utils.SF_USER, "STREAMKAP_USER_JUNIT"); + conf.put(Utils.SF_DATABASE, "JUNIT"); + conf.put(Utils.SF_SCHEMA, "JUNIT"); + conf.put(Utils.SF_URL, "sab25080.prod3.us-west-2.aws.snowflakecomputing.com"); + conf.put(Utils.SF_WAREHOUSE, "STREAMKAP_WH"); + conf.put(Utils.SF_PRIVATE_KEY, System.getenv("SNOWFLAKE_KEY")); + conf.put(Utils.PRIVATE_KEY_PASSPHRASE, System.getenv("SNOWFLAKE_KEY_PASS")); + conf.put(Utils.NAME, "TEST_CONNECTOR"); + conf.put(Utils.TASK_ID, ""); + + SnowflakeURL url = new SnowflakeURL(conf.get(Utils.SF_URL)); + + Properties properties = InternalUtils.createProperties(conf, url); + super.init(new TestUtils() { @Override protected Connection createCon() { try { - return com.snowflake.kafka.connector.internal.TestUtils.generateConnectionToSnowflakeWithEncryptedKey(); + return new SnowflakeDriver().connect(url.getJdbcUrl(), properties); } catch (Exception e) { return null; } diff --git a/src/test/java/com/snowflake/kafka/connector/StreamkapITSnowflake.java b/src/test/java/com/snowflake/kafka/connector/StreamkapITSnowflake.java index f2829d8bc..36302713e 100644 --- a/src/test/java/com/snowflake/kafka/connector/StreamkapITSnowflake.java +++ b/src/test/java/com/snowflake/kafka/connector/StreamkapITSnowflake.java @@ -1,16 +1,19 @@ package com.snowflake.kafka.connector; +import com.snowflake.client.jdbc.SnowflakeDriver; +import com.snowflake.kafka.connector.internal.InternalUtils; +import com.snowflake.kafka.connector.internal.SnowflakeURL; import java.sql.Connection; import java.sql.SQLException; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.transforms.ReplaceField; import org.junit.jupiter.api.Test; -import com.snowflake.kafka.connector.internal.TestUtils; import com.streamkap.common.test.sink.StreamkapSinkITBase; public class StreamkapITSnowflake extends StreamkapSinkITBase { @@ -25,11 +28,26 @@ public StreamkapITSnowflake() throws Exception { "account:_account,all:_all,alter:_alter,and:_and,any:_any,as:_as,between:_between,by:_by,case:_case,cast:_cast,check:_check,column:_column,connect:_connect,connection:_connection,constraint:_constraint,create:_create,cross:_cross,current:_current,current_date:_current_date,current_time:_current_time,current_timestamp:_current_timestamp,current_user:_current_user,database:_database,delete:_delete,distinct:_distinct,drop:_drop,else:_else,exists:_exists,false:_false,following:_following,for:_for,from:_from,full:_full,grant:_grant,group:_group,gscluster:_gscluster,having:_having,ilike:_ilike,in:_in,increment:_increment,inner:_inner,insert:_insert,intersect:_intersect,into:_into,is:_is,issue:_issue,join:_join,lateral:_lateral,left:_left,like:_like,localtime:_localtime,localtimestamp:_localtimestamp,minus:_minus,natural:_natural,not:_not,null:_null,of:_of,on:_on,or:_or,order:_order,organization:_organization,qualify:_qualify,regexp:_regexp,revoke:_revoke,right:_right,rlike:_rlike,row:_row,rows:_rows,sample:_sample,schema:_schema,select:_select,set:_set,some:_some,start:_start,table:_table,tablesample:_tablesample,then:_then,to:_to,trigger:_trigger,true:_true,try_cast:_try_cast,union:_union,unique:_unique,update:_update,using:_using,values:_values,view:_view,when:_when,whenever:_whenever,where:_where,with:_with"); renameAmbigiousFields.configure(config); + Map conf = new HashMap<>(); + conf.put(Utils.SF_USER, "STREAMKAP_USER_JUNIT"); + conf.put(Utils.SF_DATABASE, "JUNIT"); + conf.put(Utils.SF_SCHEMA, "JUNIT"); + conf.put(Utils.SF_URL, "sab25080.prod3.us-west-2.aws.snowflakecomputing.com"); + conf.put(Utils.SF_WAREHOUSE, "STREAMKAP_WH"); + conf.put(Utils.SF_PRIVATE_KEY, System.getenv("SNOWFLAKE_KEY")); + conf.put(Utils.PRIVATE_KEY_PASSPHRASE, System.getenv("SNOWFLAKE_KEY_PASS")); + conf.put(Utils.NAME, "TEST_CONNECTOR"); + conf.put(Utils.TASK_ID, ""); + + SnowflakeURL url = new SnowflakeURL(conf.get(Utils.SF_URL)); + + Properties properties = InternalUtils.createProperties(conf, url); + super.init(new com.streamkap.common.test.TestUtils() { @Override protected Connection createCon() { try { - return TestUtils.generateConnectionToSnowflakeWithEncryptedKey(); + return new SnowflakeDriver().connect(url.getJdbcUrl(), properties); } catch (Exception e) { return null; }