Skip to content

Commit

Permalink
Support for system catalogs
Browse files Browse the repository at this point in the history
Full history: 699813b

refactor `dbListTables()` with `list_tables()`, now orders result by `table_type` and `table_name`
refactor `dbExistsTable()` with `list_tables()`
refactor `dbListObjects()` with `list_tables()`
merge `find_table()` code into `list_fields()`
`find_table()` isn't used anywhere else anymore (e.g. `exists_table()`)
simplify the "get current_schemas() as table" code
pass full `id` to `list_fields()`
align `dbExistsTable()` with `dbListFields()`
add some comments and whitespace
simplify `where_schema` in `list_tables()`
align `where_table` with `where_schema` in `list_tables()`
add `system_catalogs` argument to `dbConnect()`
add materialized view tests
`list_tables()`: query system catalogs if available
`list_fields()`: query system catalogs if available
  • Loading branch information
dpprdan authored and krlmlr committed May 1, 2024
1 parent 0f2c805 commit 3bc2409
Show file tree
Hide file tree
Showing 9 changed files with 221 additions and 21 deletions.
1 change: 1 addition & 0 deletions R/PqConnection.R
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ setClass("PqConnection",
slots = list(
ptr = "externalptr",
bigint = "character",
system_catalogs = "logical",
timezone = "character",
timezone_out = "character",
typnames = "data.frame"
Expand Down
14 changes: 12 additions & 2 deletions R/dbConnect_PqDriver.R
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@
#' @param check_interrupts Should user interrupts be checked during the query execution (before
#' first row of data is available)? Setting to `TRUE` allows interruption of queries
#' running too long.
#' @param system_catalogs Should `dbList*()` functions query the [`system
#' catalogs`](https://www.postgresql.org/docs/current/catalogs.html) (`TRUE`)
#' or the
#' [`information_schema`](https://www.postgresql.org/docs/current/information-schema.html)?
#' The `information_schema` does not contain PostgreSQL-specific information,
#' in particular [Materialized
#' Views](https://www.postgresql.org/docs/current/sql-creatematerializedview.html).
#' @param timezone Sets the timezone for the connection. The default is `"UTC"`.
#' If `NULL` then no timezone is set, which defaults to the server's time zone.
#' @param timezone_out The time zone returned to R, defaults to `timezone`.
Expand All @@ -60,7 +67,8 @@
dbConnect_PqDriver <- function(drv, dbname = NULL,
host = NULL, port = NULL, password = NULL, user = NULL, service = NULL, ...,
bigint = c("integer64", "integer", "numeric", "character"),
check_interrupts = FALSE, timezone = "UTC", timezone_out = NULL) {
check_interrupts = FALSE, system_catalogs = TRUE,
timezone = "UTC", timezone_out = NULL) {
opts <- unlist(list(
dbname = dbname, user = user, password = password,
host = host, port = as.character(port), service = service, client_encoding = "utf8", ...
Expand All @@ -70,6 +78,7 @@ dbConnect_PqDriver <- function(drv, dbname = NULL,
}
bigint <- match.arg(bigint)
stopifnot(is.logical(check_interrupts), all(!is.na(check_interrupts)), length(check_interrupts) == 1)
stopifnot(is.logical(system_catalogs))
if (!is.null(timezone)) {
stopifnot(is.character(timezone), all(!is.na(timezone)), length(timezone) == 1)
}
Expand All @@ -85,7 +94,8 @@ dbConnect_PqDriver <- function(drv, dbname = NULL,

# timezone is set later
conn <- new("PqConnection",
ptr = ptr, bigint = bigint, timezone = character(), typnames = data.frame()
ptr = ptr, bigint = bigint, system_catalogs = system_catalogs,
timezone = character(), typnames = data.frame()
)
on.exit(dbDisconnect(conn))

Expand Down
17 changes: 16 additions & 1 deletion R/dbConnect_RedshiftDriver.R
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,22 @@ dbConnect_RedshiftDriver <- function(drv, dbname = NULL,
host = NULL, port = NULL, password = NULL, user = NULL, service = NULL, ...,
bigint = c("integer64", "integer", "numeric", "character"),
check_interrupts = FALSE, timezone = "UTC") {
new("RedshiftConnection", callNextMethod())
new("RedshiftConnection",
callNextMethod(
drv = drv,
dbname = dbname,
host = host,
port = port,
password = password,
user = user,
service = service,
...,
bigint = bigint,
check_interrupts = check_interrupts,
system_catalogs = FALSE,
timezone = timezone
)
)
}

#' @rdname Redshift
Expand Down
14 changes: 7 additions & 7 deletions R/dbListObjects_PqConnection_ANY.R
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ dbListObjects_PqConnection_ANY <- function(conn, prefix = NULL, ...) {
}
query <- paste0(
"SELECT ", null_varchar, " AS schema, table_name AS table FROM ( \n",
list_tables(conn = conn, order_by = "table_type, table_name"),
list_tables(conn = conn, order_by = "table_type, table_name"),
") as table_query \n",
"UNION ALL\n",
"SELECT DISTINCT table_schema AS schema, ", null_varchar, " AS table FROM ( \n",
list_tables(conn = conn, where_schema = "true"),
list_tables(conn = conn, where_schema = "true"),
") as schema_query;"
)
} else {
Expand All @@ -38,11 +38,11 @@ dbListObjects_PqConnection_ANY <- function(conn, prefix = NULL, ...) {
)
query <- paste0(
"SELECT table_schema AS schema, table_name AS table FROM ( \n",
list_tables(
conn = conn,
where_schema = where_schema,
order_by = "table_type, table_name"
),
list_tables(
conn = conn,
where_schema = where_schema,
order_by = "table_type, table_name"
),
") as table_query"
)
}
Expand Down
94 changes: 83 additions & 11 deletions R/tables.R
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,21 @@ db_append_table <- function(conn, name, value, copy, warn) {

list_tables <- function(conn, where_schema = NULL, where_table = NULL, order_by = NULL) {

query <- paste0(
# information_schema.table docs: https://www.postgresql.org/docs/current/infoschema-tables.html
"SELECT table_schema, table_name \n",
"FROM information_schema.tables \n",
"WHERE TRUE \n" # dummy clause to be able to add additional ones with `AND`
)
if (conn@system_catalogs) {
query <- paste0(
"SELECT table_schema, table_name \n",
"FROM ( ", list_tables_from_system_catalog(), ") AS schema_tables \n",
"WHERE TRUE \n"
)
} else {
query <- paste0(
# information_schema.table docs:
# https://www.postgresql.org/docs/current/infoschema-tables.html
"SELECT table_schema, table_name \n",
"FROM information_schema.tables \n",
"WHERE TRUE \n" # dummy clause to be able to add additional ones with `AND`
)
}

if (is.null(where_schema)) {
# `true` in `current_schemas(true)` is necessary to get temporary tables
Expand All @@ -147,6 +156,38 @@ list_tables <- function(conn, where_schema = NULL, where_table = NULL, order_by
query
}

list_tables_from_system_catalog <- function() {
# This imitates (parts of) information_schema.tables, but includes materialized views
paste0(
# pg_class vs. information_schema: https://stackoverflow.com/a/24089729
# pg_class docs: https://www.postgresql.org/docs/current/catalog-pg-class.html
"SELECT n.nspname AS table_schema, cl.relname AS table_name, \n",
" CASE
WHEN (n.oid = pg_my_temp_schema()) THEN 'LOCAL TEMPORARY'
WHEN (cl.relkind IN ('r', 'p')) THEN 'BASE TABLE'
WHEN (cl.relkind = 'v') THEN 'VIEW'
WHEN (cl.relkind = 'f') THEN 'FOREIGN'
WHEN (cl.relkind = 'm') THEN 'MATVIEW'
ELSE NULL
END AS table_type \n",
"FROM pg_class AS cl \n",
"JOIN pg_namespace AS n ON cl.relnamespace = n.oid \n",
# include: r = ordinary table, v = view, m = materialized view,
# f = foreign table, p = partitioned table
"WHERE (cl.relkind IN ('r', 'v', 'm', 'f', 'p')) \n",
# do not return individual table partitions
" AND NOT cl.relispartition \n",
# do not return other people's temp schemas
" AND (NOT pg_is_other_temp_schema(n.oid)) \n",
# Return only objects (relations) which the current user may access
# https://www.postgresql.org/docs/current/functions-info.html
" AND (pg_has_role(cl.relowner, 'USAGE') \n",
" OR has_table_privilege(cl.oid, 'SELECT, INSERT, UPDATE, DELETE, TRUNCATE, REFERENCES, TRIGGER') \n",
" OR has_any_column_privilege(cl.oid, 'SELECT, INSERT, UPDATE, REFERENCES') \n",
" ) \n"
)
}

exists_table <- function(conn, id) {
name <- id@name
stopifnot("table" %in% names(name))
Expand All @@ -168,6 +209,14 @@ exists_table <- function(conn, id) {
}

list_fields <- function(conn, id) {
if (conn@system_catalogs) {
list_fields_from_system_catalog(conn, id)
} else {
list_fields_from_info_schema(conn, id)
}
}

list_fields_from_info_schema <- function(conn, id) {
name <- id@name

is_redshift <- is(conn, "RedshiftConnection")
Expand All @@ -185,7 +234,7 @@ list_fields <- function(conn, id) {
# as there cannot be multiple tables with the same name in a single schema
only_first <- FALSE

# or we have to look the table up in the schemas on the search path
# or we have to look the table up in the schemas on the search path
} else if (is_redshift) {
# A variant of the Postgres version that uses CTEs and generate_series()
# instead of generate_subscripts(), the latter is not supported on Redshift
Expand All @@ -211,10 +260,10 @@ list_fields <- function(conn, id) {
# How to unnest `current_schemas(true)` array with element number (works since v9.4):
# https://stackoverflow.com/a/8767450/2114932
query <- paste0(
"(",
"SELECT * FROM unnest(current_schemas(true)) WITH ORDINALITY AS tbl(table_schema, nr) \n",
"WHERE table_schema != 'pg_catalog'",
") schemas_on_path"
"(",
"SELECT * FROM unnest(current_schemas(true)) WITH ORDINALITY AS tbl(table_schema, nr) \n",
"WHERE table_schema != 'pg_catalog'",
") schemas_on_path"
)
only_first <- TRUE
}
Expand Down Expand Up @@ -252,6 +301,29 @@ list_fields <- function(conn, id) {
fields
}

list_fields_from_system_catalog <- function(conn, id) {
if (exists_table(conn, id)) {
# we know from exists_table() that id@name["table"] exists
# and the user has access priviledges
tname_str <- stats::na.omit(id@name[c("schema", "table")])
tname_qstr <- dbQuoteString(conn, paste(tname_str, collapse = "."))
# cast to `regclass` resolves the table name according to the current
# `search_path` https://dba.stackexchange.com/a/75124
query <-
paste0(
"SELECT attname \n",
"FROM pg_attribute \n",
"WHERE attrelid = ", tname_qstr, "::regclass \n",
" AND attnum > 0 \n",
" AND NOT attisdropped \n",
"ORDER BY attnum;"
)
dbGetQuery(conn, query)[[1]]
} else {
stop("Table ", dbQuoteIdentifier(conn, id), " not found.", call. = FALSE)
}
}

find_temp_schema <- function(conn, fail_if_missing = TRUE) {
if (!is.na(connection_get_temp_schema(conn@ptr)))
return(connection_get_temp_schema(conn@ptr))
Expand Down
7 changes: 7 additions & 0 deletions man/Postgres.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions tests/testthat/helper-with_matview.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#' Run an expression that creates a materialized view and clean up afterwards.
#'
#' @param con PqConnection. The database connection.
#' @param matview character. The materialized view name.
#' @param query character. A SELECT, TABLE, or VALUES command to populate the
#' materialized view.
#' @param expr expression. The R expression to execute.
#' @return the return value of the \code{expr}.
#' @seealso https://www.postgresql.org/docs/current/sql-creatematerializedview.html
with_matview <- function(con, matview, query, expr) {
on.exit(
DBI::dbExecute(con, paste0("DROP MATERIALIZED VIEW IF EXISTS ", matview)),
add = TRUE
)
dbExecute(
con,
paste0(
"CREATE MATERIALIZED VIEW IF NOT EXISTS ", matview,
" AS ", query
)
)

force(expr)
}
14 changes: 14 additions & 0 deletions tests/testthat/helper-with_schema.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#' Run an expression that creates a schema and clean up afterwards.
#'
#' @param con PqConnection. The database connection.
#' @param schema character. The schema name.
#' @param expr expression. The R expression to execute.
#' @return the return value of the \code{expr}.
with_schema <- function(con, schema, expr) {
on.exit(
DBI::dbExecute(con, paste0("DROP SCHEMA IF EXISTS ", schema)),
add = TRUE
)
DBI::dbExecute(con, paste0("CREATE SCHEMA IF NOT EXISTS ", schema))
force(expr)
}
57 changes: 57 additions & 0 deletions tests/testthat/test-list_matviews.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
if (postgresHasDefault()) {
with_database_connection(con = postgresDefault(system_catalogs = TRUE), {

skip_if(
is(con, "RedshiftConnection"),
"Redshift doesn't expose system catalogs"
)
skip_if_not(
dbExistsTable(con, Id(schema = "pg_catalog", table = "pg_class")),
"`pg_catalog`.`pg_class` not available"
)

test_that("Materialized View is listed", {
with_matview(con, "matview1", "SELECT 1 AS col1", {

expect_true(dbExistsTable(con, "matview1"))
expect_true("matview1" %in% dbListTables(con))

objects <- dbListObjects(con)
quoted_tables <-
vapply(
objects$table,
dbQuoteIdentifier,
conn = con,
character(1)
)
expect_true(dbQuoteIdentifier(con, "matview1") %in% quoted_tables)

expect_true("col1" %in% dbListFields(con, "matview1"))
})
})

test_that("Materialized View in custom schema is listed", {
with_schema(con, "matschema1", {
matview_id <- Id(schema = "matschema1", table = "matview1")
matview_chr <- "matschema1.matview1"

with_matview(con, matview_chr, "SELECT 1 AS col1", {

expect_true(dbExistsTable(con, matview_id))

objects <- dbListObjects(con, prefix = Id(schema = "matschema1"))
quoted_tables <-
vapply(
objects$table,
dbQuoteIdentifier,
conn = con,
character(1)
)
expect_true(dbQuoteIdentifier(con, matview_id) %in% quoted_tables)

expect_true("col1" %in% dbListFields(con, matview_id))
})
})
})
})
}

0 comments on commit 3bc2409

Please sign in to comment.