Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(c/driver/postgresql): TimestampTz write #868

Merged
merged 8 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 65 additions & 17 deletions c/driver/postgresql/statement.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ struct BindStream {
// XXX: this assumes fixed-length fields only - will need more
// consideration to deal with variable-length fields

bool has_tz_field = false;
std::string tz_setting;
Comment on lines +148 to +149
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agh, I was going to ask for this to be std::optional...but we can't do that


struct ArrowError na_error;

explicit BindStream(struct ArrowArrayStream&& bind) {
Expand Down Expand Up @@ -217,12 +220,6 @@ struct BindStream {
param_lengths[i] = 0;
break;
case ArrowType::NANOARROW_TYPE_TIMESTAMP:
if (strcmp("", bind_schema_fields[i].timezone)) {
SetError(error, "[libpq] Field #%" PRIi64 "%s%s%s",
static_cast<int64_t>(i + 1), " (\"", bind_schema->children[i]->name,
"\") has unsupported type code timestamp with timezone");
return ADBC_STATUS_NOT_IMPLEMENTED;
}
type_id = PostgresTypeId::kTimestamp;
param_lengths[i] = 8;
break;
Expand Down Expand Up @@ -255,6 +252,45 @@ struct BindStream {

AdbcStatusCode Prepare(PGconn* conn, const std::string& query,
struct AdbcError* error) {
// tz-aware timestamps require special handling to set the timezone to UTC
// prior to sending over the binary protocol; must be reset after execute
for (int64_t col = 0; col < bind_schema->n_children; col++) {
if ((bind_schema_fields[col].type == ArrowType::NANOARROW_TYPE_TIMESTAMP) &&
(strcmp("", bind_schema_fields[col].timezone))) {
has_tz_field = true;

PGresult* begin_result = PQexec(conn, "BEGIN");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to hoist this out of the loop? Or else, only do this if has_tz_field was previously false. Otherwise, it looks like we'll try to begin the transaction twice.

(Also: I wonder if we shouldn't always begin/end a transaction when dealing with multiple bind parameters...)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+how does this interact with auto-commit off?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the break at the end of the loop to make sure this only happens once, but that could be more clearly signaled. Could even just set has_tz_field within the loop then check if (has_tz_field) right after before potentially starting a transaction

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as autocommit goes I do think you are right about that being murky. I'm not sure how to best resolve that yet either - beginning / ending the transaction here within the BindStream class is probably at odds with most of the autocommit stuff being managed via the connection.

Are there tests already for autocommit in the ADBC suite? I was looking for a reference on how those are expected to behave. Wasn't sure if I was overlooking those of if it is something that hasn't been heavily implemented yet

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I missed that little break at the end.

There are some simple tests that test the options, but not really the semantics.

At least here, I think you 'just' need to check if autocommit is off, if so, there's no need to BEGIN (you're already in a transaction), and you can still commit at the end (~though maybe semantically we shouldn't? This isn't a feature that's normally available in other APIs so I'm not sure what people expect)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any ideas on how to best forward the autocommit setting to the BindStream class? Right now I only see that as a private member of the connection, so it is pretty far removed from this

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A statement already holds a connection, so we can just add a getter to the PostgresConnection class

if (PQresultStatus(begin_result) != PGRES_COMMAND_OK) {
SetError(error, "[libpq] Failed to begin transaction for timezone data: %s",
PQerrorMessage(conn));
PQclear(begin_result);
return ADBC_STATUS_IO;
}
PQclear(begin_result);

PGresult* get_tz_result = PQexec(conn, "SELECT current_setting('TIMEZONE')");
if (PQresultStatus(get_tz_result) != PGRES_TUPLES_OK) {
SetError(error, "[libpq] Could not query current timezone: %s",
PQerrorMessage(conn));
PQclear(get_tz_result);
return ADBC_STATUS_IO;
}

tz_setting = std::string(PQgetvalue(get_tz_result, 0, 0));
PQclear(get_tz_result);

PGresult* set_utc_result = PQexec(conn, "SET TIME ZONE 'UTC'");
if (PQresultStatus(set_utc_result) != PGRES_COMMAND_OK) {
SetError(error, "[libpq] Failed to set time zone to UTC: %s",
PQerrorMessage(conn));
PQclear(set_utc_result);
return ADBC_STATUS_IO;
}
PQclear(set_utc_result);
break;
}
}

PGresult* result = PQprepare(conn, /*stmtName=*/"", query.c_str(),
/*nParams=*/bind_schema->n_children, param_types.data());
if (PQresultStatus(result) != PGRES_COMMAND_OK) {
Expand Down Expand Up @@ -349,12 +385,6 @@ struct BindStream {
}
case ArrowType::NANOARROW_TYPE_TIMESTAMP: {
int64_t val = array_view->children[col]->buffer_views[1].data.as_int64[row];
if (strcmp("", bind_schema_fields[col].timezone)) {
SetError(error, "[libpq] Column #%" PRIi64 "%s%s%s", col + 1, " (\"",
PQfname(result, col),
"\") has unsupported type code timestamp with timezone");
return ADBC_STATUS_NOT_IMPLEMENTED;
}

// 2000-01-01 00:00:00.000000 in microseconds
constexpr int64_t kPostgresTimestampEpoch = 946684800000000;
Expand Down Expand Up @@ -418,6 +448,26 @@ struct BindStream {
PQclear(result);
}
if (rows_affected) *rows_affected += array->length;

if (has_tz_field) {
std::string reset_query = "SET TIME ZONE '" + tz_setting + "'";
PGresult* reset_tz_result = PQexec(conn, reset_query.c_str());
if (PQresultStatus(reset_tz_result) != PGRES_COMMAND_OK) {
SetError(error, "[libpq] Failed to reset time zone: %s", PQerrorMessage(conn));
PQclear(reset_tz_result);
return ADBC_STATUS_IO;
}
PQclear(reset_tz_result);

PGresult* commit_result = PQexec(conn, "COMMIT");
if (PQresultStatus(commit_result) != PGRES_COMMAND_OK) {
SetError(error, "[libpq] Failed to commit transaction: %s",
PQerrorMessage(conn));
PQclear(commit_result);
return ADBC_STATUS_IO;
}
PQclear(commit_result);
}
}
return ADBC_STATUS_OK;
}
Expand Down Expand Up @@ -664,12 +714,10 @@ AdbcStatusCode PostgresStatement::CreateBulkTable(
break;
case ArrowType::NANOARROW_TYPE_TIMESTAMP:
if (strcmp("", source_schema_fields[i].timezone)) {
SetError(error, "[libpq] Field #%" PRIi64 "%s%s%s", static_cast<int64_t>(i + 1),
" (\"", source_schema.children[i]->name,
"\") has unsupported type for ingestion timestamp with timezone");
return ADBC_STATUS_NOT_IMPLEMENTED;
create += " TIMESTAMPTZ";
} else {
create += " TIMESTAMP";
}
create += " TIMESTAMP";
break;
default:
SetError(error, "%s%" PRIu64 "%s%s%s%s", "[libpq] Field #",
Expand Down
3 changes: 3 additions & 0 deletions c/driver/sqlite/sqlite_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ class SqliteStatementTest : public ::testing::Test,
void TestSqlIngestTimestamp() {
GTEST_SKIP() << "Cannot ingest TIMESTAMP (not implemented)";
}
void TestSqlIngestTimestampTz() {
GTEST_SKIP() << "Cannot ingest TIMESTAMP WITH TIMEZONE (not implemented)";
}

protected:
SqliteQuirks quirks_;
Expand Down
3 changes: 3 additions & 0 deletions c/driver_manager/adbc_driver_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,9 @@ class SqliteStatementTest : public ::testing::Test,
void TestSqlIngestTimestamp() {
GTEST_SKIP() << "Cannot ingest TIMESTAMP (not implemented)";
}
void TestSqlIngestTimestampTz() {
GTEST_SKIP() << "Cannot ingest TIMESTAMP WITH TIMEZONE (not implemented)";
}

protected:
SqliteQuirks quirks_;
Expand Down
33 changes: 26 additions & 7 deletions c/validation/adbc_validation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1095,7 +1095,7 @@ void StatementTest::TestSqlIngestBinary() {
}

template <enum ArrowTimeUnit TU>
void StatementTest::TestSqlIngestTemporalType() {
void StatementTest::TestSqlIngestTemporalType(const char* timezone) {
if (!quirks()->supports_bulk_ingest()) {
GTEST_SKIP();
}
Expand All @@ -1112,8 +1112,7 @@ void StatementTest::TestSqlIngestTemporalType() {
// changes to allow for various time units to be tested
ArrowSchemaInit(&schema.value);
ArrowSchemaSetTypeStruct(&schema.value, 1);
ArrowSchemaSetTypeDateTime(schema->children[0], NANOARROW_TYPE_TIMESTAMP, TU,
/*timezone=*/nullptr);
ArrowSchemaSetTypeDateTime(schema->children[0], NANOARROW_TYPE_TIMESTAMP, TU, timezone);
ArrowSchemaSetName(schema->children[0], "col");
ASSERT_THAT(MakeBatch<int64_t>(&schema.value, &array.value, &na_error, values),
IsOkErrno());
Expand Down Expand Up @@ -1144,6 +1143,10 @@ void StatementTest::TestSqlIngestTemporalType() {
::testing::AnyOf(::testing::Eq(values.size()), ::testing::Eq(-1)));

ASSERT_NO_FATAL_FAILURE(reader.GetSchema());

// postgres does not receive/store/send the timezone, just the UTC integer
// value; we may still want to update CompareSchema to explicitly check for UTC
// with TIMESTAMP WITH TIMEZONE and naive for TIMESTAMP
ASSERT_NO_FATAL_FAILURE(CompareSchema(&reader.schema.value,
{{"col", NANOARROW_TYPE_TIMESTAMP, NULLABLE}}));

Expand All @@ -1165,10 +1168,26 @@ void StatementTest::TestSqlIngestTemporalType() {
}

void StatementTest::TestSqlIngestTimestamp() {
ASSERT_NO_FATAL_FAILURE(TestSqlIngestTemporalType<NANOARROW_TIME_UNIT_SECOND>());
ASSERT_NO_FATAL_FAILURE(TestSqlIngestTemporalType<NANOARROW_TIME_UNIT_MICRO>());
ASSERT_NO_FATAL_FAILURE(TestSqlIngestTemporalType<NANOARROW_TIME_UNIT_MILLI>());
ASSERT_NO_FATAL_FAILURE(TestSqlIngestTemporalType<NANOARROW_TIME_UNIT_NANO>());
ASSERT_NO_FATAL_FAILURE(TestSqlIngestTemporalType<NANOARROW_TIME_UNIT_SECOND>(nullptr));
ASSERT_NO_FATAL_FAILURE(TestSqlIngestTemporalType<NANOARROW_TIME_UNIT_MICRO>(nullptr));
ASSERT_NO_FATAL_FAILURE(TestSqlIngestTemporalType<NANOARROW_TIME_UNIT_MILLI>(nullptr));
ASSERT_NO_FATAL_FAILURE(TestSqlIngestTemporalType<NANOARROW_TIME_UNIT_NANO>(nullptr));
}

void StatementTest::TestSqlIngestTimestampTz() {
ASSERT_NO_FATAL_FAILURE(TestSqlIngestTemporalType<NANOARROW_TIME_UNIT_SECOND>("UTC"));
ASSERT_NO_FATAL_FAILURE(TestSqlIngestTemporalType<NANOARROW_TIME_UNIT_MICRO>("UTC"));
ASSERT_NO_FATAL_FAILURE(TestSqlIngestTemporalType<NANOARROW_TIME_UNIT_MILLI>("UTC"));
ASSERT_NO_FATAL_FAILURE(TestSqlIngestTemporalType<NANOARROW_TIME_UNIT_NANO>("UTC"));

ASSERT_NO_FATAL_FAILURE(
TestSqlIngestTemporalType<NANOARROW_TIME_UNIT_SECOND>("America/Los_Angeles"));
ASSERT_NO_FATAL_FAILURE(
TestSqlIngestTemporalType<NANOARROW_TIME_UNIT_MICRO>("America/Los_Angeles"));
ASSERT_NO_FATAL_FAILURE(
TestSqlIngestTemporalType<NANOARROW_TIME_UNIT_MILLI>("America/Los_Angeles"));
ASSERT_NO_FATAL_FAILURE(
TestSqlIngestTemporalType<NANOARROW_TIME_UNIT_NANO>("America/Los_Angeles"));
}

void StatementTest::TestSqlIngestAppend() {
Expand Down
4 changes: 3 additions & 1 deletion c/validation/adbc_validation.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ class StatementTest {

// Temporal
void TestSqlIngestTimestamp();
void TestSqlIngestTimestampTz();

// ---- End Type-specific tests ----------------

Expand Down Expand Up @@ -274,7 +275,7 @@ class StatementTest {
void TestSqlIngestNumericType(ArrowType type);

template <enum ArrowTimeUnit TU>
void TestSqlIngestTemporalType();
void TestSqlIngestTemporalType(const char* timezone);
};

#define ADBCV_TEST_STATEMENT(FIXTURE) \
Expand All @@ -295,6 +296,7 @@ class StatementTest {
TEST_F(FIXTURE, SqlIngestString) { TestSqlIngestString(); } \
TEST_F(FIXTURE, SqlIngestBinary) { TestSqlIngestBinary(); } \
TEST_F(FIXTURE, SqlIngestTimestamp) { TestSqlIngestTimestamp(); } \
TEST_F(FIXTURE, SqlIngestTimestampTz) { TestSqlIngestTimestampTz(); } \
TEST_F(FIXTURE, SqlIngestAppend) { TestSqlIngestAppend(); } \
TEST_F(FIXTURE, SqlIngestErrors) { TestSqlIngestErrors(); } \
TEST_F(FIXTURE, SqlIngestMultipleConnections) { TestSqlIngestMultipleConnections(); } \
Expand Down
Loading