Skip to content

Commit

Permalink
feat(c/driver/postgresql,python): implement error_details spec
Browse files Browse the repository at this point in the history
Fixes #939.
Fixes #942.
  • Loading branch information
lidavidm committed Jul 28, 2023
1 parent 65d785d commit 15bc1e7
Show file tree
Hide file tree
Showing 11 changed files with 292 additions and 142 deletions.
10 changes: 7 additions & 3 deletions c/driver/common/utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ static void ReleaseError(struct AdbcError* error) {
}

void SetError(struct AdbcError* error, const char* format, ...) {
va_list args;
va_start(args, format);
SetErrorVariadic(error, format, args);
va_end(args);
}

void SetErrorVariadic(struct AdbcError* error, const char* format, va_list args) {
if (!error) return;
if (error->release) {
// TODO: combine the errors if possible
Expand All @@ -44,10 +51,7 @@ void SetError(struct AdbcError* error, const char* format, ...) {

error->release = &ReleaseError;

va_list args;
va_start(args, format);
vsnprintf(error->message, kErrorBufferSize, format, args);
va_end(args);
}

struct SingleBatchArrayStream {
Expand Down
3 changes: 3 additions & 0 deletions c/driver/common/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#pragma once

#include <stdarg.h>
#include <stdbool.h>
#include <stdint.h>

Expand All @@ -39,6 +40,8 @@ extern "C" {
void SetError(struct AdbcError* error, const char* format,
...) ADBC_CHECK_PRINTF_ATTRIBUTE;

void SetErrorVariadic(struct AdbcError* error, const char* format, va_list args);

struct StringBuilder {
char* buffer;
// Not including null terminator
Expand Down
1 change: 1 addition & 0 deletions c/driver/postgresql/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ add_arrow_lib(adbc_driver_postgresql
SOURCES
connection.cc
database.cc
error.cc
postgresql.cc
statement.cc
OUTPUTS
Expand Down
8 changes: 4 additions & 4 deletions c/driver/postgresql/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ struct PqRecord {
};

// Used by PqResultHelper to provide index-based access to the records within each
// row of a pg_result
// row of a PGresult
class PqResultRow {
public:
PqResultRow(pg_result* result, int row_num) : result_(result), row_num_(row_num) {
PqResultRow(PGresult* result, int row_num) : result_(result), row_num_(row_num) {
ncols_ = PQnfields(result);
}

Expand All @@ -69,7 +69,7 @@ class PqResultRow {
}

private:
pg_result* result_ = nullptr;
PGresult* result_ = nullptr;
int row_num_;
int ncols_;
};
Expand Down Expand Up @@ -167,7 +167,7 @@ class PqResultHelper {
iterator end() { return iterator(*this, NumRows()); }

private:
pg_result* result_ = nullptr;
PGresult* result_ = nullptr;
PGconn* conn_;
std::string query_;
std::vector<std::string> param_values_;
Expand Down
3 changes: 3 additions & 0 deletions c/driver/postgresql/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <adbc.h>
#include <libpq-fe.h>

#include "error.h"
#include "postgres_type.h"

namespace adbcpq {
Expand Down Expand Up @@ -69,10 +70,12 @@ class PostgresConnection {
return type_resolver_;
}
bool autocommit() const { return autocommit_; }
ErrorDetailsState* error_details() { return &error_details_; }

private:
std::shared_ptr<PostgresDatabase> database_;
std::shared_ptr<PostgresTypeResolver> type_resolver_;
ErrorDetailsState error_details_;
PGconn* conn_;
PGcancel* cancel_;
bool autocommit_;
Expand Down
11 changes: 6 additions & 5 deletions c/driver/postgresql/database.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <adbc.h>
#include <libpq-fe.h>
#include <nanoarrow/nanoarrow.h>
#include <postgresql/error.h>

#include "common/utils.h"

Expand Down Expand Up @@ -125,10 +126,10 @@ AdbcStatusCode PostgresDatabase::Disconnect(PGconn** conn, struct AdbcError* err

// Helpers for building the type resolver from queries
static inline int32_t InsertPgAttributeResult(
pg_result* result, const std::shared_ptr<PostgresTypeResolver>& resolver);
PGresult* result, const std::shared_ptr<PostgresTypeResolver>& resolver);

static inline int32_t InsertPgTypeResult(
pg_result* result, const std::shared_ptr<PostgresTypeResolver>& resolver);
PGresult* result, const std::shared_ptr<PostgresTypeResolver>& resolver);

AdbcStatusCode PostgresDatabase::RebuildTypeResolver(struct AdbcError* error) {
PGconn* conn = nullptr;
Expand Down Expand Up @@ -177,7 +178,7 @@ ORDER BY
auto resolver = std::make_shared<PostgresTypeResolver>();

// Insert record type definitions (this includes table schemas)
pg_result* result = PQexec(conn, kColumnsQuery.c_str());
PGresult* result = PQexec(conn, kColumnsQuery.c_str());
ExecStatusType pq_status = PQresultStatus(result);
if (pq_status == PGRES_TUPLES_OK) {
InsertPgAttributeResult(result, resolver);
Expand Down Expand Up @@ -222,7 +223,7 @@ ORDER BY
}

static inline int32_t InsertPgAttributeResult(
pg_result* result, const std::shared_ptr<PostgresTypeResolver>& resolver) {
PGresult* result, const std::shared_ptr<PostgresTypeResolver>& resolver) {
int num_rows = PQntuples(result);
std::vector<std::pair<std::string, uint32_t>> columns;
uint32_t current_type_oid = 0;
Expand Down Expand Up @@ -254,7 +255,7 @@ static inline int32_t InsertPgAttributeResult(
}

static inline int32_t InsertPgTypeResult(
pg_result* result, const std::shared_ptr<PostgresTypeResolver>& resolver) {
PGresult* result, const std::shared_ptr<PostgresTypeResolver>& resolver) {
int num_rows = PQntuples(result);
PostgresTypeResolver::Item item;
int32_t n_added = 0;
Expand Down
63 changes: 40 additions & 23 deletions c/driver/postgresql/statement.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ AdbcStatusCode ResolvePostgresType(const PostgresTypeResolver& type_resolver,

/// Helper to manage bind parameters with a prepared statement
struct BindStream {
ErrorDetailsState* error_details_;
Handle<struct ArrowArrayStream> bind;
Handle<struct ArrowSchema> bind_schema;
struct ArrowSchemaView bind_schema_view;
Expand All @@ -150,7 +151,8 @@ struct BindStream {

struct ArrowError na_error;

explicit BindStream(struct ArrowArrayStream&& bind) {
explicit BindStream(struct ArrowArrayStream&& bind, ErrorDetailsState* error_details)
: error_details_(error_details) {
this->bind.value = std::move(bind);
std::memset(&na_error, 0, sizeof(na_error));
}
Expand Down Expand Up @@ -296,10 +298,11 @@ struct BindStream {
PGresult* result = PQprepare(conn, /*stmtName=*/"", query.c_str(),
/*nParams=*/bind_schema->n_children, param_types.data());
if (PQresultStatus(result) != PGRES_COMMAND_OK) {
SetError(error, "[libpq] Failed to prepare query: %s\nQuery was:%s",
PQerrorMessage(conn), query.c_str());
auto code = error_details_->SetError(
error, result, "[libpq] Failed to prepare query: %s\nQuery was:%s",
PQerrorMessage(conn), query.c_str());
PQclear(result);
return ADBC_STATUS_IO;
return code;
}
PQclear(result);
return ADBC_STATUS_OK;
Expand Down Expand Up @@ -582,6 +585,7 @@ void TupleReader::ResetQuery() {

// Clear the error builder
error_builder_.size = 0;
error_details_->Clear();

row_id_ = -1;
}
Expand Down Expand Up @@ -628,18 +632,14 @@ int TupleReader::GetNext(struct ArrowArray* out) {
result_ = PQgetResult(conn_);
const ExecStatusType pq_status = PQresultStatus(result_);
if (pq_status != PGRES_COMMAND_OK) {
const char* sqlstate = PQresultErrorField(result_, PG_DIAG_SQLSTATE);
StringBuilderAppend(&error_builder_, "[libpq] Query failed [%s]: %s",
PQresStatus(pq_status), PQresultErrorMessage(result_));

if (tmp.release != nullptr) {
tmp.release(&tmp);
}

if (sqlstate != nullptr && std::strcmp(sqlstate, "57014") == 0) {
return ECANCELED;
}
return EIO;
return AdbcStatusCodeToErrno(error_details_->SetDetail(/*error=*/nullptr, result_));
}

ResetQuery();
Expand Down Expand Up @@ -861,7 +861,7 @@ AdbcStatusCode PostgresStatement::ExecutePreparedStatement(
return ADBC_STATUS_NOT_IMPLEMENTED;
}

BindStream bind_stream(std::move(bind_));
BindStream bind_stream(std::move(bind_), &error_details_);
std::memset(&bind_, 0, sizeof(bind_));

RAISE_ADBC(bind_stream.Begin([&]() { return ADBC_STATUS_OK; }, error));
Expand Down Expand Up @@ -974,7 +974,7 @@ AdbcStatusCode PostgresStatement::ExecuteUpdateBulk(int64_t* rows_affected,
return ADBC_STATUS_INVALID_STATE;
}

BindStream bind_stream(std::move(bind_));
BindStream bind_stream(std::move(bind_), &error_details_);
std::memset(&bind_, 0, sizeof(bind_));
RAISE_ADBC(bind_stream.Begin(
[&]() -> AdbcStatusCode {
Expand Down Expand Up @@ -1041,6 +1041,8 @@ AdbcStatusCode PostgresStatement::GetOption(const char* key, char* value, size_t
}
} else if (std::strcmp(key, ADBC_POSTGRESQL_OPTION_BATCH_SIZE_HINT_BYTES) == 0) {
result = std::to_string(reader_.batch_size_hint_bytes_);
} else if (error_details_.GetOption(key, &result)) {
// do nothing
} else {
SetError(error, "[libpq] Unknown statement option '%s'", key);
return ADBC_STATUS_NOT_FOUND;
Expand All @@ -1056,8 +1058,19 @@ AdbcStatusCode PostgresStatement::GetOption(const char* key, char* value, size_t
AdbcStatusCode PostgresStatement::GetOptionBytes(const char* key, uint8_t* value,
size_t* length,
struct AdbcError* error) {
SetError(error, "[libpq] Unknown statement option '%s'", key);
return ADBC_STATUS_NOT_FOUND;
std::string result;
if (error_details_.GetOptionBytes(key, &result)) {
// do nothing
} else {
SetError(error, "[libpq] Unknown statement option '%s'", key);
return ADBC_STATUS_NOT_FOUND;
}

if (result.size() <= *length) {
std::memcpy(value, result.data(), result.size() + 1);
}
*length = static_cast<int64_t>(result.size());
return ADBC_STATUS_OK;
}

AdbcStatusCode PostgresStatement::GetOptionDouble(const char* key, double* value,
Expand All @@ -1072,6 +1085,8 @@ AdbcStatusCode PostgresStatement::GetOptionInt(const char* key, int64_t* value,
if (std::strcmp(key, ADBC_POSTGRESQL_OPTION_BATCH_SIZE_HINT_BYTES) == 0) {
*value = reader_.batch_size_hint_bytes_;
return ADBC_STATUS_OK;
} else if (error_details_.GetOptionInt(key, value)) {
return ADBC_STATUS_OK;
}
SetError(error, "[libpq] Unknown statement option '%s'", key);
return ADBC_STATUS_NOT_FOUND;
Expand Down Expand Up @@ -1175,22 +1190,24 @@ AdbcStatusCode PostgresStatement::SetupReader(struct AdbcError* error) {
PGresult* result = PQprepare(connection_->conn(), /*stmtName=*/"", query_.c_str(),
/*nParams=*/0, nullptr);
if (PQresultStatus(result) != PGRES_COMMAND_OK) {
SetError(error,
"[libpq] Failed to execute query: could not infer schema: failed to "
"prepare query: %s\nQuery was:%s",
PQerrorMessage(connection_->conn()), query_.c_str());
auto code = error_details_.SetError(
error, result,
"[libpq] Failed to execute query: could not infer schema: failed to "
"prepare query: %s\nQuery was: %s",
PQerrorMessage(connection_->conn()), query_.c_str());
PQclear(result);
return ADBC_STATUS_IO;
return code;
}
PQclear(result);
result = PQdescribePrepared(connection_->conn(), /*stmtName=*/"");
if (PQresultStatus(result) != PGRES_COMMAND_OK) {
SetError(error,
"[libpq] Failed to execute query: could not infer schema: failed to "
"describe prepared statement: %s\nQuery was:%s",
PQerrorMessage(connection_->conn()), query_.c_str());
auto code = error_details_.SetError(
error, result,
"[libpq] Failed to execute query: could not infer schema: failed to "
"describe prepared statement: %s\nQuery was: %s",
PQerrorMessage(connection_->conn()), query_.c_str());
PQclear(result);
return ADBC_STATUS_IO;
return code;
}

// Resolve the information from the PGresult into a PostgresType
Expand Down
13 changes: 11 additions & 2 deletions c/driver/postgresql/statement.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <nanoarrow/nanoarrow.h>

#include "common/utils.h"
#include "error.h"
#include "postgres_copy_reader.h"
#include "postgres_type.h"

Expand All @@ -40,8 +41,9 @@ class PostgresStatement;
/// \brief An ArrowArrayStream that reads tuples from a PGresult.
class TupleReader final {
public:
TupleReader(PGconn* conn)
TupleReader(PGconn* conn, ErrorDetailsState* error_details)
: conn_(conn),
error_details_(error_details),
result_(nullptr),
pgbuf_(nullptr),
copy_reader_(nullptr),
Expand Down Expand Up @@ -78,6 +80,7 @@ class TupleReader final {
static void ReleaseTrampoline(struct ArrowArrayStream* self);

PGconn* conn_;
ErrorDetailsState* error_details_;
PGresult* result_;
char* pgbuf_;
struct ArrowBufferView data_;
Expand All @@ -90,7 +93,10 @@ class TupleReader final {
class PostgresStatement {
public:
PostgresStatement()
: connection_(nullptr), query_(), prepared_(false), reader_(nullptr) {
: connection_(nullptr),
query_(),
prepared_(false),
reader_(nullptr, &error_details_) {
std::memset(&bind_, 0, sizeof(bind_));
}

Expand Down Expand Up @@ -136,6 +142,8 @@ class PostgresStatement {
struct AdbcError* error);
AdbcStatusCode SetupReader(struct AdbcError* error);

ErrorDetailsState* error_details() { return &error_details_; }

private:
std::shared_ptr<PostgresTypeResolver> type_resolver_;
std::shared_ptr<PostgresConnection> connection_;
Expand All @@ -158,6 +166,7 @@ class PostgresStatement {
IngestMode mode = IngestMode::kCreate;
} ingest_;

ErrorDetailsState error_details_;
TupleReader reader_;
};
} // namespace adbcpq
Loading

0 comments on commit 15bc1e7

Please sign in to comment.