Skip to content

Commit

Permalink
encode parameters in http, bump version to 1.3
Browse files Browse the repository at this point in the history
  • Loading branch information
ildus committed Nov 30, 2020
1 parent a482ab2 commit fc4f6de
Show file tree
Hide file tree
Showing 11 changed files with 205 additions and 51 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
cmake_minimum_required(VERSION 3.5 FATAL_ERROR)
project(clickhouse_fdw VERSION 1.2.0 LANGUAGES C)
project(clickhouse_fdw VERSION 1.3.0 LANGUAGES C)

set(CMAKE_COLOR_MAKEFILE ON)
set(CMAKE_EXPORT_COMPILE_COMMANDS 1)
Expand Down
4 changes: 3 additions & 1 deletion src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,15 @@ execute_process(
set(sql_out "${CMAKE_BINARY_DIR}/clickhouse_fdw--${EXT_VERSION}.sql")
set(sql_migration_11 "${CMAKE_CURRENT_SOURCE_DIR}/sql/clickhouse_fdw--1.0--1.1.sql")
set(sql_migration_12 "${CMAKE_CURRENT_SOURCE_DIR}/sql/clickhouse_fdw--1.1--1.2.sql")
set(sql_migration_13 "${CMAKE_CURRENT_SOURCE_DIR}/sql/clickhouse_fdw--1.2--1.3.sql")

add_custom_command(
OUTPUT ${sql_out}
COMMAND cat ${CMAKE_CURRENT_SOURCE_DIR}/sql/init.sql ${CMAKE_CURRENT_SOURCE_DIR}/sql/functions.sql> ${sql_out}
DEPENDS sql/init.sql sql/functions.sql
)
add_custom_target(clickhouse_fdw_sql
ALL DEPENDS ${sql_out} ${sql_migration_11} ${sql_migration_12})
ALL DEPENDS ${sql_out} ${sql_migration_11} ${sql_migration_12} ${sql_migration_13})
add_dependencies(clickhouse_fdw clickhouse_fdw_sql)

#------------------------------------------------------------------------------
Expand All @@ -92,6 +93,7 @@ set (install_files
"${sql_out}"
"${sql_migration_11}"
"${sql_migration_12}"
"${sql_migration_13}"
"${CMAKE_SOURCE_DIR}/src/clickhouse_fdw.control"
)

Expand Down
3 changes: 2 additions & 1 deletion src/clickhouse_fdw.c
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,8 @@ clickhousedb_raw_query(PG_FUNCTION_ARGS)
char *connstring = TextDatumGetCString(PG_GETARG_TEXT_P(1)),
*query = TextDatumGetCString(PG_GETARG_TEXT_P(0));

ch_connection conn = chfdw_http_connect(connstring);
ch_connection_details *details = connstring_parse(connstring);
ch_connection conn = chfdw_http_connect(details);
ch_cursor *cursor = conn.methods->simple_query(conn.conn, query);
text *res = chfdw_http_fetch_raw_data(cursor);

Expand Down
2 changes: 1 addition & 1 deletion src/clickhouse_fdw.control
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
comment = 'foreign-data wrapper for remote ClickHouse servers'
default_version = '1.2'
default_version = '1.3'
module_pathname = '$libdir/clickhouse_fdw'
relocatable = true
170 changes: 141 additions & 29 deletions src/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "miscadmin.h"
#include "pgstat.h"
#include "storage/latch.h"
#include "utils/builtins.h"
#include "utils/hsearch.h"
#include "utils/inval.h"
#include "utils/memutils.h"
Expand Down Expand Up @@ -52,21 +53,7 @@ clickhouse_connect(ForeignServer *server, UserMapping *user)

if (strcmp(driver, "http") == 0)
{
ch_connection conn;
char *connstring;

if (details.username && details.password)
connstring = psprintf("http://%s:%s@%s:%d/", details.username,
details.password, details.host, details.port);
else if (details.username)
connstring = psprintf("http://%s@%s:%d/", details.username,
details.host, details.port);
else
connstring = psprintf("http://%s:%d/", details.host, details.port);

conn = chfdw_http_connect(connstring);
pfree(connstring);
return conn;
return chfdw_http_connect(&details);
}
else if (strcmp(driver, "binary") == 0)
{
Expand Down Expand Up @@ -97,17 +84,17 @@ chfdw_get_connection(UserMapping *user)
/* allocate ConnectionHash in the cache context */
ctl.hcxt = CacheMemoryContext;
ConnectionHash = hash_create("clickhousedb_fdw connections", 8,
&ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
&ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);

/*
* Register some callback functions that manage connection cleanup.
* This should be done just once in each backend.
*/
CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
chfdw_inval_callback, (Datum) 0);
chfdw_inval_callback, (Datum) 0);
CacheRegisterSyscacheCallback(USERMAPPINGOID,
chfdw_inval_callback, (Datum) 0);
chfdw_inval_callback, (Datum) 0);
}

/* Create hash key for the entry. Assume no pad bytes in key struct */
Expand Down Expand Up @@ -155,18 +142,18 @@ chfdw_get_connection(UserMapping *user)
/* Reset all transient state fields, to be sure all are clean */
entry->invalidated = false;
entry->server_hashvalue =
GetSysCacheHashValue1(FOREIGNSERVEROID,
ObjectIdGetDatum(server->serverid));
GetSysCacheHashValue1(FOREIGNSERVEROID,
ObjectIdGetDatum(server->serverid));
entry->mapping_hashvalue =
GetSysCacheHashValue1(USERMAPPINGOID,
ObjectIdGetDatum(user->umid));
GetSysCacheHashValue1(USERMAPPINGOID,
ObjectIdGetDatum(user->umid));

/* Now try to make the connection */
entry->gate = clickhouse_connect(server, user);

elog(DEBUG3,
"new clickhousedb_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
entry->gate.conn, server->servername, user->umid, user->userid);
"new clickhousedb_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
entry->gate.conn, server->servername, user->umid, user->userid);
}

return entry->gate;
Expand Down Expand Up @@ -205,12 +192,137 @@ chfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)

/* hashvalue == 0 means a cache reset, must clear all state */
if (hashvalue == 0 ||
(cacheid == FOREIGNSERVEROID &&
entry->server_hashvalue == hashvalue) ||
(cacheid == USERMAPPINGOID &&
entry->mapping_hashvalue == hashvalue))
(cacheid == FOREIGNSERVEROID &&
entry->server_hashvalue == hashvalue) ||
(cacheid == USERMAPPINGOID &&
entry->mapping_hashvalue == hashvalue))
{
entry->invalidated = true;
}
}
}


ch_connection_details *
connstring_parse(const char *connstring)
{
char *pname;
char *pval;
char *buf;
char *cp;
char *cp2;
ch_connection_details *details = palloc0(sizeof(ch_connection_details));

/* Need a modifiable copy of the input string */
if ((buf = pstrdup(connstring)) == NULL)
return NULL;

cp = buf;

while (*cp)
{
/* Skip blanks before the parameter name */
if (isspace((unsigned char) *cp))
{
cp++;
continue;
}

/* Get the parameter name */
pname = cp;
while (*cp)
{
if (*cp == '=')
break;
if (isspace((unsigned char) *cp))
{
*cp++ = '\0';
while (*cp)
{
if (!isspace((unsigned char) *cp))
break;
cp++;
}
break;
}
cp++;
}

/* Check that there is a following '=' */
if (*cp != '=')
elog(ERROR, "missing \"=\" after \"%s\" in connection info string", pname);

*cp++ = '\0';

/* Skip blanks after the '=' */
while (*cp)
{
if (!isspace((unsigned char) *cp))
break;
cp++;
}

/* Get the parameter value */
pval = cp;

if (*cp != '\'')
{
cp2 = pval;
while (*cp)
{
if (isspace((unsigned char) *cp))
{
*cp++ = '\0';
break;
}
if (*cp == '\\')
{
cp++;
if (*cp != '\0')
*cp2++ = *cp++;
}
else
*cp2++ = *cp++;
}
*cp2 = '\0';
}
else
{
cp2 = pval;
cp++;
for (;;)
{
if (*cp == '\0')
elog(ERROR, "unterminated quoted string in connection info string");

if (*cp == '\\')
{
cp++;
if (*cp != '\0')
*cp2++ = *cp++;
continue;
}
if (*cp == '\'')
{
*cp2 = '\0';
cp++;
break;
}
*cp2++ = *cp++;
}
}

if (strcmp(pname, "host") == 0) {
details->host = pstrdup(pval);
} else if (strcmp(pname, "port") == 0) {
details->port = pg_strtoint32(pval);
} else if (strcmp(pname, "username") == 0) {
details->username = pstrdup(pval);
} else if (strcmp(pname, "password") == 0) {
details->password = pstrdup(pval);
}
}

pfree(buf);
return details;
}
56 changes: 44 additions & 12 deletions src/http.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,36 +50,68 @@ size_t write_data(void *contents, size_t size, size_t nmemb, void *userp)
return realsize;
}

ch_http_connection_t *ch_http_connection(char *connstring)
ch_http_connection_t *ch_http_connection(char *host, int port, char *username, char *password)
{
int n;
char *connstring = NULL;
size_t len = 20; /* all symbols from url string + some extra */

curl_error_happened = false;
ch_http_connection_t *conn = calloc(sizeof(ch_http_connection_t), 1);
if (conn == NULL)
{
curl_error_happened = true;
snprintf(curl_error_buffer, CURL_ERROR_SIZE, "OOM");
return NULL;
}
if (!conn)
goto cleanup;

conn->curl = curl_easy_init();
if (!conn->curl)
goto cleanup;

conn->base_url = strdup(connstring);
if (conn->base_url == NULL)
len += strlen(host) + snprintf(NULL, 0, "%d", port);

if (username) {
username = curl_easy_escape(conn->curl, username, 0);
len += strlen(username);
}

if (password) {
password = curl_easy_escape(conn->curl, password, 0);
len += strlen(password);
}

connstring = calloc(len, 1);
if (!connstring)
goto cleanup;

if (username && password)
{
n = snprintf(connstring, len, "http://%s:%s@%s:%d/", username, password, host, port);
curl_free(username);
curl_free(password);
}
else if (username)
{
n = snprintf(connstring, len, "http://%s@%s:%d/", username, host, port);
curl_free(username);
}
else
n = snprintf(connstring, len, "http://%s:%d/", host, port);

if (n < 0)
goto cleanup;

conn->base_url = connstring;
conn->base_url_len = strlen(conn->base_url);

return conn;

cleanup:
snprintf(curl_error_buffer, CURL_ERROR_SIZE, "OOM");
curl_error_happened = true;
if (conn->base_url)
free(conn->base_url);
if (connstring)
free(connstring);

if (conn)
free(conn);

free(conn);
return NULL;
}

Expand Down
2 changes: 1 addition & 1 deletion src/include/clickhouse_http.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ typedef struct {

void ch_http_init(int verbose, uint32_t query_id_prefix);
void ch_http_set_progress_func(void *progressfunc);
ch_http_connection_t *ch_http_connection(char *connstring);
ch_http_connection_t *ch_http_connection(char *, int, char *, char *);
void ch_http_close(ch_http_connection_t *conn);
ch_http_response_t *ch_http_simple_query(ch_http_connection_t *conn, const char *query);
char *ch_http_last_error(void);
Expand Down
5 changes: 3 additions & 2 deletions src/include/clickhousedb_fdw.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ typedef struct {
char *password;
char *dbname;
} ch_connection_details;

ch_connection chfdw_http_connect(char *connstring);

ch_connection_details *connstring_parse(const char *connstring);
ch_connection chfdw_http_connect(ch_connection_details *details);
ch_connection chfdw_binary_connect(ch_connection_details *details);
text *chfdw_http_fetch_raw_data(ch_cursor *cursor);
List *chfdw_construct_create_tables(ImportForeignSchemaStmt *stmt, ForeignServer *server);
Expand Down
5 changes: 3 additions & 2 deletions src/pglink.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,11 @@ static bool is_canceled(void)
}

ch_connection
chfdw_http_connect(char *connstring)
chfdw_http_connect(ch_connection_details *details)
{
ch_connection res;
ch_http_connection_t *conn = ch_http_connection(connstring);
ch_http_connection_t *conn = ch_http_connection(details->host, details->port,
details->username, details->password);
if (!initialized)
{
initialized = true;
Expand Down
5 changes: 5 additions & 0 deletions src/sql/clickhouse_fdw--1.2--1.3.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
DROP FUNCTION clickhousedb_raw_query(TEXT, TEXT);
CREATE FUNCTION clickhousedb_raw_query(TEXT, TEXT DEFAULT 'host=localhost port=8123')
RETURNS TEXT
AS 'MODULE_PATHNAME'
LANGUAGE C STRICT;
2 changes: 1 addition & 1 deletion src/sql/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ RETURNS fdw_handler
AS 'MODULE_PATHNAME'
LANGUAGE C STRICT;

CREATE FUNCTION clickhousedb_raw_query(TEXT, TEXT DEFAULT 'http://localhost:8123')
CREATE FUNCTION clickhousedb_raw_query(TEXT, TEXT DEFAULT 'host=localhost port=8123')
RETURNS TEXT
AS 'MODULE_PATHNAME'
LANGUAGE C STRICT;
Expand Down

0 comments on commit fc4f6de

Please sign in to comment.