Skip to content

Commit

Permalink
feat(c/driver/postgresql): implement ADBC 1.1.0 features (#852)
Browse files Browse the repository at this point in the history
- ADBC_INFO_DRIVER_ADBC_VERSION
- StatementExecuteSchema (#318)
- ADBC_CONNECTION_OPTION_CURRENT_{CATALOG, DB_SCHEMA} (#319)
- Cancellation
- Get/SetOption
- New ingest modes
  • Loading branch information
lidavidm committed Aug 10, 2023
1 parent 49266ad commit 2c67736
Show file tree
Hide file tree
Showing 26 changed files with 1,752 additions and 209 deletions.
12 changes: 12 additions & 0 deletions adbc.h
Original file line number Diff line number Diff line change
Expand Up @@ -1022,6 +1022,7 @@ AdbcStatusCode AdbcDatabaseNew(struct AdbcDatabase* database, struct AdbcError*
/// \param[out] error An optional location to return an error
/// message if necessary.
/// \return ADBC_STATUS_NOT_FOUND if the option is not recognized.
ADBC_EXPORT
AdbcStatusCode AdbcDatabaseGetOption(struct AdbcDatabase* database, const char* key,
char* value, size_t* length,
struct AdbcError* error);
Expand Down Expand Up @@ -1062,6 +1063,7 @@ AdbcStatusCode AdbcDatabaseGetOption(struct AdbcDatabase* database, const char*
/// \param[out] error An optional location to return an error
/// message if necessary.
/// \return ADBC_STATUS_NOT_FOUND if the option is not recognized.
ADBC_EXPORT
AdbcStatusCode AdbcDatabaseGetOptionBytes(struct AdbcDatabase* database, const char* key,
uint8_t* value, size_t* length,
struct AdbcError* error);
Expand All @@ -1086,6 +1088,7 @@ AdbcStatusCode AdbcDatabaseGetOptionBytes(struct AdbcDatabase* database, const c
/// \param[out] error An optional location to return an error
/// message if necessary.
/// \return ADBC_STATUS_NOT_FOUND if the option is not recognized.
ADBC_EXPORT
AdbcStatusCode AdbcDatabaseGetOptionDouble(struct AdbcDatabase* database, const char* key,
double* value, struct AdbcError* error);

Expand All @@ -1109,6 +1112,7 @@ AdbcStatusCode AdbcDatabaseGetOptionDouble(struct AdbcDatabase* database, const
/// \param[out] error An optional location to return an error
/// message if necessary.
/// \return ADBC_STATUS_NOT_FOUND if the option is not recognized.
ADBC_EXPORT
AdbcStatusCode AdbcDatabaseGetOptionInt(struct AdbcDatabase* database, const char* key,
int64_t* value, struct AdbcError* error);

Expand Down Expand Up @@ -1518,6 +1522,7 @@ AdbcStatusCode AdbcConnectionGetObjects(struct AdbcConnection* connection, int d
/// \param[out] error An optional location to return an error
/// message if necessary.
/// \return ADBC_STATUS_NOT_FOUND if the option is not recognized.
ADBC_EXPORT
AdbcStatusCode AdbcConnectionGetOption(struct AdbcConnection* connection, const char* key,
char* value, size_t* length,
struct AdbcError* error);
Expand Down Expand Up @@ -1558,6 +1563,7 @@ AdbcStatusCode AdbcConnectionGetOption(struct AdbcConnection* connection, const
/// \param[out] error An optional location to return an error
/// message if necessary.
/// \return ADBC_STATUS_NOT_FOUND if the option is not recognized.
ADBC_EXPORT
AdbcStatusCode AdbcConnectionGetOptionBytes(struct AdbcConnection* connection,
const char* key, uint8_t* value,
size_t* length, struct AdbcError* error);
Expand All @@ -1582,6 +1588,7 @@ AdbcStatusCode AdbcConnectionGetOptionBytes(struct AdbcConnection* connection,
/// \param[out] error An optional location to return an error
/// message if necessary.
/// \return ADBC_STATUS_NOT_FOUND if the option is not recognized.
ADBC_EXPORT
AdbcStatusCode AdbcConnectionGetOptionInt(struct AdbcConnection* connection,
const char* key, int64_t* value,
struct AdbcError* error);
Expand All @@ -1606,6 +1613,7 @@ AdbcStatusCode AdbcConnectionGetOptionInt(struct AdbcConnection* connection,
/// \param[out] error An optional location to return an error
/// message if necessary.
/// \return ADBC_STATUS_NOT_FOUND if the option is not recognized.
ADBC_EXPORT
AdbcStatusCode AdbcConnectionGetOptionDouble(struct AdbcConnection* connection,
const char* key, double* value,
struct AdbcError* error);
Expand Down Expand Up @@ -2005,6 +2013,7 @@ AdbcStatusCode AdbcStatementCancel(struct AdbcStatement* statement,
/// \param[out] error An optional location to return an error
/// message if necessary.
/// \return ADBC_STATUS_NOT_FOUND if the option is not recognized.
ADBC_EXPORT
AdbcStatusCode AdbcStatementGetOption(struct AdbcStatement* statement, const char* key,
char* value, size_t* length,
struct AdbcError* error);
Expand Down Expand Up @@ -2045,6 +2054,7 @@ AdbcStatusCode AdbcStatementGetOption(struct AdbcStatement* statement, const cha
/// \param[out] error An optional location to return an error
/// message if necessary.
/// \return ADBC_STATUS_NOT_FOUND if the option is not recognized.
ADBC_EXPORT
AdbcStatusCode AdbcStatementGetOptionBytes(struct AdbcStatement* statement,
const char* key, uint8_t* value,
size_t* length, struct AdbcError* error);
Expand All @@ -2069,6 +2079,7 @@ AdbcStatusCode AdbcStatementGetOptionBytes(struct AdbcStatement* statement,
/// \param[out] error An optional location to return an error
/// message if necessary.
/// \return ADBC_STATUS_NOT_FOUND if the option is not recognized.
ADBC_EXPORT
AdbcStatusCode AdbcStatementGetOptionInt(struct AdbcStatement* statement, const char* key,
int64_t* value, struct AdbcError* error);

Expand All @@ -2092,6 +2103,7 @@ AdbcStatusCode AdbcStatementGetOptionInt(struct AdbcStatement* statement, const
/// \param[out] error An optional location to return an error
/// message if necessary.
/// \return ADBC_STATUS_NOT_FOUND if the option is not recognized.
ADBC_EXPORT
AdbcStatusCode AdbcStatementGetOptionDouble(struct AdbcStatement* statement,
const char* key, double* value,
struct AdbcError* error);
Expand Down
13 changes: 13 additions & 0 deletions c/driver/common/utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,19 @@ AdbcStatusCode AdbcConnectionGetInfoAppendString(struct ArrowArray* array,
return ADBC_STATUS_OK;
}

AdbcStatusCode AdbcConnectionGetInfoAppendInt(struct ArrowArray* array,
uint32_t info_code, int64_t info_value,
struct AdbcError* error) {
CHECK_NA(INTERNAL, ArrowArrayAppendUInt(array->children[0], info_code), error);
// Append to type variant
CHECK_NA(INTERNAL, ArrowArrayAppendInt(array->children[1]->children[2], info_value),
error);
// Append type code/offset
CHECK_NA(INTERNAL, ArrowArrayFinishUnionElement(array->children[1], /*type_id=*/2),
error);
return ADBC_STATUS_OK;
}

AdbcStatusCode AdbcInitConnectionObjectsSchema(struct ArrowSchema* schema,
struct AdbcError* error) {
ArrowSchemaInit(schema);
Expand Down
3 changes: 3 additions & 0 deletions c/driver/common/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ AdbcStatusCode AdbcConnectionGetInfoAppendString(struct ArrowArray* array,
uint32_t info_code,
const char* info_value,
struct AdbcError* error);
AdbcStatusCode AdbcConnectionGetInfoAppendInt(struct ArrowArray* array,
uint32_t info_code, int64_t info_value,
struct AdbcError* error);

AdbcStatusCode AdbcInitConnectionObjectsSchema(struct ArrowSchema* schema,
struct AdbcError* error);
Expand Down
3 changes: 2 additions & 1 deletion c/driver/flightsql/dremio_flightsql_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ class DremioFlightSqlQuirks : public adbc_validation::DriverQuirks {
}

std::string BindParameter(int index) const override { return "?"; }
bool supports_bulk_ingest(const char* /*mode*/) const override { return false; }
bool supports_concurrent_statements() const override { return true; }
bool supports_transactions() const override { return false; }
bool supports_get_sql_info() const override { return false; }
bool supports_get_objects() const override { return true; }
bool supports_bulk_ingest() const override { return false; }
bool supports_partitioned_data() const override { return true; }
bool supports_dynamic_parameter_binding() const override { return false; }
};
Expand Down Expand Up @@ -87,6 +87,7 @@ class DremioFlightSqlStatementTest : public ::testing::Test,
void SetUp() override { ASSERT_NO_FATAL_FAILURE(SetUpTest()); }
void TearDown() override { ASSERT_NO_FATAL_FAILURE(TearDownTest()); }

void TestResultInvalidation() { GTEST_SKIP() << "Dremio generates a CANCELLED"; }
void TestSqlIngestTableEscaping() { GTEST_SKIP() << "Table escaping not implemented"; }

protected:
Expand Down
3 changes: 2 additions & 1 deletion c/driver/flightsql/sqlite_flightsql_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ class SqliteFlightSqlQuirks : public adbc_validation::DriverQuirks {
}

std::string BindParameter(int index) const override { return "?"; }

bool supports_bulk_ingest(const char* /*mode*/) const override { return false; }
bool supports_concurrent_statements() const override { return true; }
bool supports_transactions() const override { return false; }
bool supports_get_sql_info() const override { return true; }
Expand All @@ -113,7 +115,6 @@ class SqliteFlightSqlQuirks : public adbc_validation::DriverQuirks {
}
}
bool supports_get_objects() const override { return true; }
bool supports_bulk_ingest() const override { return false; }
bool supports_partitioned_data() const override { return true; }
bool supports_dynamic_parameter_binding() const override { return true; }
};
Expand Down
111 changes: 105 additions & 6 deletions c/driver/postgresql/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@
namespace {

static const uint32_t kSupportedInfoCodes[] = {
ADBC_INFO_VENDOR_NAME, ADBC_INFO_VENDOR_VERSION, ADBC_INFO_DRIVER_NAME,
ADBC_INFO_DRIVER_VERSION, ADBC_INFO_DRIVER_ARROW_VERSION,
ADBC_INFO_VENDOR_NAME, ADBC_INFO_VENDOR_VERSION,
ADBC_INFO_DRIVER_NAME, ADBC_INFO_DRIVER_VERSION,
ADBC_INFO_DRIVER_ARROW_VERSION, ADBC_INFO_DRIVER_ADBC_VERSION,
};

static const std::unordered_map<std::string, std::string> kPgTableTypes = {
Expand Down Expand Up @@ -114,8 +115,10 @@ class PqResultHelper {
result_ = PQexecPrepared(conn_, "", param_values_.size(), param_c_strs.data(), NULL,
NULL, 0);

if (PQresultStatus(result_) != PGRES_TUPLES_OK) {
SetError(error_, "[libpq] Failed to execute query: %s", PQerrorMessage(conn_));
ExecStatusType status = PQresultStatus(result_);
if (status != PGRES_TUPLES_OK && status != PGRES_COMMAND_OK) {
SetError(error_, "[libpq] Failed to execute query '%s': %s", query_.c_str(),
PQerrorMessage(conn_));
return ADBC_STATUS_IO;
}

Expand Down Expand Up @@ -729,6 +732,20 @@ class PqGetObjectsHelper {

namespace adbcpq {

AdbcStatusCode PostgresConnection::Cancel(struct AdbcError* error) {
// > errbuf must be a char array of size errbufsize (the recommended size is
// > 256 bytes).
// https://www.postgresql.org/docs/current/libpq-cancel.html
char errbuf[256];
// > The return value is 1 if the cancel request was successfully dispatched
// > and 0 if not.
if (PQcancel(cancel_, errbuf, sizeof(errbuf)) != 1) {
SetError(error, "[libpq] Failed to cancel operation: %s", errbuf);
return ADBC_STATUS_UNKNOWN;
}
return ADBC_STATUS_OK;
}

AdbcStatusCode PostgresConnection::Commit(struct AdbcError* error) {
if (autocommit_) {
SetError(error, "%s", "[libpq] Cannot commit when autocommit is enabled");
Expand Down Expand Up @@ -776,6 +793,10 @@ AdbcStatusCode PostgresConnectionGetInfoImpl(const uint32_t* info_codes,
RAISE_ADBC(AdbcConnectionGetInfoAppendString(array, info_codes[i],
NANOARROW_VERSION, error));
break;
case ADBC_INFO_DRIVER_ADBC_VERSION:
RAISE_ADBC(AdbcConnectionGetInfoAppendInt(array, info_codes[i],
ADBC_VERSION_1_1_0, error));
break;
default:
// Ignore
continue;
Expand Down Expand Up @@ -840,6 +861,47 @@ AdbcStatusCode PostgresConnection::GetObjects(
return BatchToArrayStream(&array, &schema, out, error);
}

AdbcStatusCode PostgresConnection::GetOption(const char* option, char* value,
size_t* length, struct AdbcError* error) {
std::string output;
if (std::strcmp(option, ADBC_CONNECTION_OPTION_CURRENT_CATALOG) == 0) {
output = PQdb(conn_);
} else if (std::strcmp(option, ADBC_CONNECTION_OPTION_CURRENT_DB_SCHEMA) == 0) {
PqResultHelper result_helper{conn_, "SELECT CURRENT_SCHEMA", {}, error};
RAISE_ADBC(result_helper.Prepare());
RAISE_ADBC(result_helper.Execute());
auto it = result_helper.begin();
if (it == result_helper.end()) {
SetError(error, "[libpq] PostgreSQL returned no rows for 'SELECT CURRENT_SCHEMA'");
return ADBC_STATUS_INTERNAL;
}
output = (*it)[0].data;
} else if (std::strcmp(option, ADBC_CONNECTION_OPTION_AUTOCOMMIT) == 0) {
output = autocommit_ ? ADBC_OPTION_VALUE_ENABLED : ADBC_OPTION_VALUE_DISABLED;
} else {
return ADBC_STATUS_NOT_FOUND;
}

if (output.size() + 1 <= *length) {
std::memcpy(value, output.c_str(), output.size() + 1);
}
*length = output.size() + 1;
return ADBC_STATUS_OK;
}
AdbcStatusCode PostgresConnection::GetOptionBytes(const char* option, uint8_t* value,
size_t* length,
struct AdbcError* error) {
return ADBC_STATUS_NOT_FOUND;
}
AdbcStatusCode PostgresConnection::GetOptionInt(const char* option, int64_t* value,
struct AdbcError* error) {
return ADBC_STATUS_NOT_FOUND;
}
AdbcStatusCode PostgresConnection::GetOptionDouble(const char* option, double* value,
struct AdbcError* error) {
return ADBC_STATUS_NOT_FOUND;
}

AdbcStatusCode PostgresConnection::GetTableSchema(const char* catalog,
const char* db_schema,
const char* table_name,
Expand Down Expand Up @@ -964,16 +1026,26 @@ AdbcStatusCode PostgresConnection::GetTableTypes(struct AdbcConnection* connecti
AdbcStatusCode PostgresConnection::Init(struct AdbcDatabase* database,
struct AdbcError* error) {
if (!database || !database->private_data) {
SetError(error, "%s", "[libpq] Must provide an initialized AdbcDatabase");
SetError(error, "[libpq] Must provide an initialized AdbcDatabase");
return ADBC_STATUS_INVALID_ARGUMENT;
}
database_ =
*reinterpret_cast<std::shared_ptr<PostgresDatabase>*>(database->private_data);
type_resolver_ = database_->type_resolver();
return database_->Connect(&conn_, error);
RAISE_ADBC(database_->Connect(&conn_, error));
cancel_ = PQgetCancel(conn_);
if (!cancel_) {
SetError(error, "[libpq] Could not initialize PGcancel");
return ADBC_STATUS_UNKNOWN;
}
return ADBC_STATUS_OK;
}

AdbcStatusCode PostgresConnection::Release(struct AdbcError* error) {
if (cancel_) {
PQfreeCancel(cancel_);
cancel_ = nullptr;
}
if (conn_) {
return database_->Disconnect(&conn_, error);
}
Expand Down Expand Up @@ -1023,8 +1095,35 @@ AdbcStatusCode PostgresConnection::SetOption(const char* key, const char* value,
autocommit_ = autocommit;
}
return ADBC_STATUS_OK;
} else if (std::strcmp(key, ADBC_CONNECTION_OPTION_CURRENT_DB_SCHEMA) == 0) {
// PostgreSQL doesn't accept a parameter here
PqResultHelper result_helper{
conn_, std::string("SET search_path TO ") + value, {}, error};
RAISE_ADBC(result_helper.Prepare());
RAISE_ADBC(result_helper.Execute());
return ADBC_STATUS_OK;
}
SetError(error, "%s%s", "[libpq] Unknown option ", key);
return ADBC_STATUS_NOT_IMPLEMENTED;
}

AdbcStatusCode PostgresConnection::SetOptionBytes(const char* key, const uint8_t* value,
size_t length,
struct AdbcError* error) {
SetError(error, "%s%s", "[libpq] Unknown option ", key);
return ADBC_STATUS_NOT_IMPLEMENTED;
}

AdbcStatusCode PostgresConnection::SetOptionDouble(const char* key, double value,
struct AdbcError* error) {
SetError(error, "%s%s", "[libpq] Unknown option ", key);
return ADBC_STATUS_NOT_IMPLEMENTED;
}

AdbcStatusCode PostgresConnection::SetOptionInt(const char* key, int64_t value,
struct AdbcError* error) {
SetError(error, "%s%s", "[libpq] Unknown option ", key);
return ADBC_STATUS_NOT_IMPLEMENTED;
}

} // namespace adbcpq
17 changes: 16 additions & 1 deletion c/driver/postgresql/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ namespace adbcpq {
class PostgresDatabase;
class PostgresConnection {
public:
PostgresConnection() : database_(nullptr), conn_(nullptr), autocommit_(true) {}
PostgresConnection()
: database_(nullptr), conn_(nullptr), cancel_(nullptr), autocommit_(true) {}

AdbcStatusCode Cancel(struct AdbcError* error);
AdbcStatusCode Commit(struct AdbcError* error);
AdbcStatusCode GetInfo(struct AdbcConnection* connection, uint32_t* info_codes,
size_t info_codes_length, struct ArrowArrayStream* out,
Expand All @@ -40,6 +42,14 @@ class PostgresConnection {
const char* table_name, const char** table_types,
const char* column_name, struct ArrowArrayStream* out,
struct AdbcError* error);
AdbcStatusCode GetOption(const char* option, char* value, size_t* length,
struct AdbcError* error);
AdbcStatusCode GetOptionBytes(const char* option, uint8_t* value, size_t* length,
struct AdbcError* error);
AdbcStatusCode GetOptionDouble(const char* option, double* value,
struct AdbcError* error);
AdbcStatusCode GetOptionInt(const char* option, int64_t* value,
struct AdbcError* error);
AdbcStatusCode GetTableSchema(const char* catalog, const char* db_schema,
const char* table_name, struct ArrowSchema* schema,
struct AdbcError* error);
Expand All @@ -49,6 +59,10 @@ class PostgresConnection {
AdbcStatusCode Release(struct AdbcError* error);
AdbcStatusCode Rollback(struct AdbcError* error);
AdbcStatusCode SetOption(const char* key, const char* value, struct AdbcError* error);
AdbcStatusCode SetOptionBytes(const char* key, const uint8_t* value, size_t length,
struct AdbcError* error);
AdbcStatusCode SetOptionDouble(const char* key, double value, struct AdbcError* error);
AdbcStatusCode SetOptionInt(const char* key, int64_t value, struct AdbcError* error);

PGconn* conn() const { return conn_; }
const std::shared_ptr<PostgresTypeResolver>& type_resolver() const {
Expand All @@ -60,6 +74,7 @@ class PostgresConnection {
std::shared_ptr<PostgresDatabase> database_;
std::shared_ptr<PostgresTypeResolver> type_resolver_;
PGconn* conn_;
PGcancel* cancel_;
bool autocommit_;
};
} // namespace adbcpq
Loading

0 comments on commit 2c67736

Please sign in to comment.