Skip to content

Commit

Permalink
[#18930][#18933][#18931] YSQL: Enable CRUD syntax for Publication
Browse files Browse the repository at this point in the history
Summary:
This diff enables the CRUD syntax for Publications in YSQL. This change is part of the project to add a YSQL API for CDC via the PG logical replication mechanism.

In PG, a Publication determines which tables should be streamed via logical replication. A user can choose to create a Publication on:
1. All tables of a database
2. All tables of a schema
3. List of tables explicitly provided in the command

In YB, a Publication will be equivalent to the configuration of a CDC stream i.e. it will specify which tables the user is interested in streaming via CDC.

The information about Publication will be kept in PG system catalog tables and the schema will be **same** as PG:
1. pg_publication: Metadata of a Publication
2. pg_publication_rel: (puboid, reloid) pair for each Publication-Table pair
3. pg_publication_tables: View over pg_publication_rel which expands the list of tables if the Publication was created for all the tables (future) of a database

The only difference between the YSQL and PG semantics are due to a few CDC limitations.

**Limitations**
- CDC does not support choosing a subset of records (insert, update, delete, truncate). So we return an error with `ERRCODE_FEATURE_NOT_SUPPORTED` error code if the user tries to specify that either during `Create` or `Alter`
- CDC does not support tables without user defined primary key. This is handled differently depending on whether the Publication is being created on all tables (`FOR ALL TABLES`) or an explicit list (`FOR TABLE t1, t2`) is provided
    - `FOR ALL TABLES`: These tables are skipped. We also filter them out in the `pg_publication_tables` view
    - `FOR TABLE t1, t2, ...`: We return an `ERRCODE_FEATURE_NOT_SUPPORTED` error

**Upgrade\Rollback safety:**
These changes cannot be rolled back. As a result, all the commands are disabled during upgrade using an autoflag `yb_enable_replication_commands` (LocalPersisted) and will only be enabled once the user has committed to the new version.

The autoflag is `LocalPersisted` since the YSQL API changes would modify the yb-master sys-catalog information for CDC stream in the future for Replication Slot. So for consistency, we will disable all CDC YSQL API commands during upgrade.
Jira: DB-7781, DB-7784, DB-7782

Test Plan:
New regress tests

`./yb_build.sh --java-test 'org.yb.pgsql.TestPgRegressPublication'`

Reviewers: skumar, aagrawal, asrinivasan, dsrinivasan

Reviewed By: dsrinivasan

Subscribers: hsunder, jason, yql

Differential Revision: https://phorge.dev.yugabyte.com/D28721
  • Loading branch information
dr0pdb committed Sep 27, 2023
1 parent fb98e56 commit db1432d
Show file tree
Hide file tree
Showing 18 changed files with 699 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright (c) YugabyteDB, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//
package org.yb.pgsql;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.yb.YBTestRunner;

/**
* Runs the pg_regress publication-related tests on YB code.
*/
@RunWith(value = YBTestRunner.class)
public class TestPgRegressPublication extends BasePgSQLTest {
@Override
public int getTestMethodTimeoutSec() {
return 1800;
}

@Test
public void testPgRegressPublication() throws Exception {
runPgRegressTest("yb_publication_schedule");
}
}
14 changes: 14 additions & 0 deletions src/postgres/src/backend/catalog/index.c
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,20 @@ relationHasPrimaryKey(Relation rel)
return result;
}

/*
* YBRelationHasPrimaryKey
* See whether an existing relation has a primary key.
*
* Caller must have suitable lock on the relation.
*
* Note: It is just a wrapper over the relationHasPrimaryKey function above.
*/
bool
YBRelationHasPrimaryKey(Relation rel)
{
return relationHasPrimaryKey(rel);
}

/*
* index_check_primary_key
* Apply special checks needed before creating a PRIMARY KEY index
Expand Down
60 changes: 60 additions & 0 deletions src/postgres/src/backend/catalog/pg_publication.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@
#include "utils/rel.h"
#include "utils/syscache.h"

/* YB includes. */
#include "pg_yb_utils.h"

static Datum yb_pg_relation_is_publishable(PG_FUNCTION_ARGS, Oid relid);

/*
* Check if relation can be in given publication and throws appropriate
* error if not.
Expand Down Expand Up @@ -82,6 +87,13 @@ check_publication_add_relation(Relation targetrel)
errmsg("table \"%s\" cannot be replicated",
RelationGetRelationName(targetrel)),
errdetail("Temporary and unlogged relations cannot be replicated.")));

if (IsYugaByteEnabled() && !YBRelationHasPrimaryKey(targetrel))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("table \"%s\" cannot be replicated",
RelationGetRelationName(targetrel)),
errdetail("Replicating tables without primary key is not yet supported.")));
}

/*
Expand Down Expand Up @@ -129,6 +141,11 @@ pg_relation_is_publishable(PG_FUNCTION_ARGS)
HeapTuple tuple;
bool result;

if (IsYugaByteEnabled())
{
return yb_pg_relation_is_publishable(fcinfo, relid);
}

tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
if (!tuple)
PG_RETURN_NULL();
Expand Down Expand Up @@ -340,6 +357,19 @@ GetAllTablesPublicationRelations(void)
Oid relid = HeapTupleGetOid(tuple);
Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);

if (IsYugaByteEnabled())
{
Relation rel;

rel = heap_open(relid, AccessShareLock);
if (yb_is_publishable_relation(rel))
result = lappend_oid(result, relid);
heap_close(rel, AccessShareLock);

/* Skip the below is_publishable_class call. */
continue;
}

if (is_publishable_class(relid, relForm))
result = lappend_oid(result, relid);
}
Expand Down Expand Up @@ -497,3 +527,33 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)

SRF_RETURN_DONE(funcctx);
}

static Datum
yb_pg_relation_is_publishable(PG_FUNCTION_ARGS, Oid relid)
{
Relation rel;
HeapTuple tuple;
bool result;

tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
if (!tuple)
PG_RETURN_NULL();

rel = heap_open(relid, AccessShareLock);
result = yb_is_publishable_relation(rel);
heap_close(rel, AccessShareLock);

ReleaseSysCache(tuple);
PG_RETURN_BOOL(result);
}

/*
* Similar to is_publishable_relation with additional check for user defined
* primary key.
*/
bool
yb_is_publishable_relation(Relation rel)
{
return is_publishable_class(RelationGetRelid(rel), rel->rd_rel) &&
YBRelationHasPrimaryKey(rel);
}
57 changes: 57 additions & 0 deletions src/postgres/src/backend/commands/publicationcmds.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@
#include "utils/syscache.h"
#include "utils/varlena.h"

/* YB includes. */
#include "catalog/index.h"
#include "pg_yb_utils.h"

/* Same as MAXNUMMESSAGES in sinvaladt.c */
#define MAX_RELCACHE_INVAL_MSGS 4096

Expand Down Expand Up @@ -140,6 +144,13 @@ parse_publication_options(List *options,
ObjectAddress
CreatePublication(CreatePublicationStmt *stmt)
{
if (IsYugaByteEnabled() && !yb_enable_replication_commands)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("CreatePublication is unavailable"),
errdetail("yb_enable_replication_commands is false or a "
"system upgrade is in progress")));

Relation rel;
ObjectAddress myself;
Oid puboid;
Expand Down Expand Up @@ -190,6 +201,14 @@ CreatePublication(CreatePublicationStmt *stmt)
&publish_update, &publish_delete,
&publish_truncate);

if (IsYugaByteEnabled() && !(publish_insert && publish_update &&
publish_delete && publish_truncate))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Publishing only a subset of DML commands is not yet supported"),
errhint("See https://github.com/yugabyte/yugabyte-db/issues/"
"19250. React with thumbs up to raise its priority")));

values[Anum_pg_publication_puballtables - 1] =
BoolGetDatum(stmt->for_all_tables);
values[Anum_pg_publication_pubinsert - 1] =
Expand Down Expand Up @@ -254,6 +273,14 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
&publish_update, &publish_delete,
&publish_truncate);

if (IsYugaByteEnabled() && !(publish_insert && publish_update &&
publish_delete && publish_truncate))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Publishing only a subset of DML commands is not yet supported"),
errhint("See https://github.com/yugabyte/yugabyte-db/issues/"
"19250. React with thumbs up to raise its priority")));

/* Everything ok, form a new tuple. */
memset(values, 0, sizeof(values));
memset(nulls, false, sizeof(nulls));
Expand Down Expand Up @@ -401,6 +428,12 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
void
AlterPublication(AlterPublicationStmt *stmt)
{
if (IsYugaByteEnabled() && !yb_enable_replication_commands)
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("AlterPublication is unavailable"),
errdetail("yb_enable_replication_commands is false or a"
" system upgrade is in progress")));

Relation rel;
HeapTuple tup;

Expand Down Expand Up @@ -436,6 +469,12 @@ AlterPublication(AlterPublicationStmt *stmt)
void
RemovePublicationById(Oid pubid)
{
if (IsYugaByteEnabled() && !yb_enable_replication_commands)
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("RemovePublicationById is unavailable"),
errdetail("yb_enable_replication_commands is false or a"
" system upgrade is in progress")));

Relation rel;
HeapTuple tup;

Expand All @@ -459,6 +498,12 @@ RemovePublicationById(Oid pubid)
void
RemovePublicationRelById(Oid proid)
{
if (IsYugaByteEnabled() && !yb_enable_replication_commands)
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("RemovePublicationRelById is unavailable"),
errdetail("yb_enable_replication_commands is false or a"
" system upgrade is in progress")));

Relation rel;
HeapTuple tup;
Form_pg_publication_rel pubrel;
Expand Down Expand Up @@ -701,6 +746,12 @@ AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
ObjectAddress
AlterPublicationOwner(const char *name, Oid newOwnerId)
{
if (IsYugaByteEnabled() && !yb_enable_replication_commands)
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("AlterPublicationOwner is unavailable"),
errdetail("yb_enable_replication_commands is false or a"
" system upgrade is in progress")));

Oid subid;
HeapTuple tup;
Relation rel;
Expand Down Expand Up @@ -734,6 +785,12 @@ AlterPublicationOwner(const char *name, Oid newOwnerId)
void
AlterPublicationOwner_oid(Oid subid, Oid newOwnerId)
{
if (IsYugaByteEnabled() && !yb_enable_replication_commands)
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("AlterPublicationOwner_oid is unavailable"),
errdetail("yb_enable_replication_commands is false or a"
" system upgrade is in progress")));

HeapTuple tup;
Relation rel;

Expand Down
12 changes: 3 additions & 9 deletions src/postgres/src/backend/parser/gram.y
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,7 @@ stmt :
| AlterOpFamilyStmt
| AlterOwnerStmt
| AlterPolicyStmt
| AlterPublicationStmt
| AlterRoleSetStmt
| AlterRoleStmt
| AlterSeqStmt
Expand All @@ -939,6 +940,7 @@ stmt :
| CreateOpClassStmt
| CreateOpFamilyStmt
| CreatePolicyStmt
| CreatePublicationStmt
| CreateRoleStmt
| CreateSchemaStmt
| CreateStatsStmt
Expand Down Expand Up @@ -1019,14 +1021,12 @@ stmt :
| AlterSystemStmt { parser_ybc_not_support(@1, "This statement"); }
| AlterTblSpcStmt { parser_ybc_signal_unsupported(@1, "This statement", 1153); }
| AlterCompositeTypeStmt { parser_ybc_not_support(@1, "This statement"); }
| AlterPublicationStmt { parser_ybc_not_support(@1, "This statement"); }
| AlterSubscriptionStmt { parser_ybc_not_support(@1, "This statement"); }
| AlterTSDictionaryStmt { parser_ybc_not_support(@1, "This statement"); }
| ClusterStmt { parser_ybc_not_support(@1, "This statement"); }
| CreateAmStmt { parser_ybc_not_support(@1, "This statement"); }
| CreateAssertStmt { parser_ybc_not_support(@1, "This statement"); }
| CreateConversionStmt { parser_ybc_not_support(@1, "This statement"); }
| CreatePublicationStmt { parser_ybc_not_support(@1, "This statement"); }
| CreateSubscriptionStmt { parser_ybc_not_support(@1, "This statement"); }
| CreateTransformStmt { parser_ybc_not_support(@1, "This statement"); }
| DropAssertStmt { parser_ybc_not_support(@1, "This statement"); }
Expand Down Expand Up @@ -6923,7 +6923,7 @@ drop_type_name:
{
$$ = OBJECT_FDW;
}
| PUBLICATION { parser_ybc_not_support(@1, "DROP PUBLICATION"); $$ = OBJECT_PUBLICATION; }
| PUBLICATION { $$ = OBJECT_PUBLICATION; }
| SCHEMA { $$ = OBJECT_SCHEMA; }
| SERVER { $$ = OBJECT_FOREIGN_SERVER; }
| TABLEGROUP
Expand Down Expand Up @@ -9448,7 +9448,6 @@ RenameStmt: ALTER AGGREGATE aggregate_with_argtypes RENAME TO name
}
| ALTER PUBLICATION name RENAME TO name
{
parser_ybc_not_support(@1, "ALTER PUBLICATION");
RenameStmt *n = makeNode(RenameStmt);
n->renameType = OBJECT_PUBLICATION;
n->object = (Node *) makeString($3);
Expand Down Expand Up @@ -10437,7 +10436,6 @@ AlterOwnerStmt: ALTER AGGREGATE aggregate_with_argtypes OWNER TO RoleSpec
CreatePublicationStmt:
CREATE PUBLICATION name opt_publication_for_tables opt_definition
{
parser_ybc_not_support(@1, "CREATE PUBLICATION");
CreatePublicationStmt *n = makeNode(CreatePublicationStmt);
n->pubname = $3;
n->options = $5;
Expand Down Expand Up @@ -10486,15 +10484,13 @@ publication_for_tables:
AlterPublicationStmt:
ALTER PUBLICATION name SET definition
{
parser_ybc_not_support(@1, "ALTER PUBLICATION <name>");
AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
n->pubname = $3;
n->options = $5;
$$ = (Node *)n;
}
| ALTER PUBLICATION name ADD_P TABLE relation_expr_list
{
parser_ybc_not_support(@1, "ALTER PUBLICATION ADD TABLE");
AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
n->pubname = $3;
n->tables = $6;
Expand All @@ -10503,7 +10499,6 @@ AlterPublicationStmt:
}
| ALTER PUBLICATION name SET TABLE relation_expr_list
{
parser_ybc_not_support(@1, "ALTER PUBLICATION SET TABLE");
AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
n->pubname = $3;
n->tables = $6;
Expand All @@ -10512,7 +10507,6 @@ AlterPublicationStmt:
}
| ALTER PUBLICATION name DROP TABLE relation_expr_list
{
parser_ybc_not_support(@1, "ALTER PUBLICATION DROP TABLE");
AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
n->pubname = $3;
n->tables = $6;
Expand Down
11 changes: 11 additions & 0 deletions src/postgres/src/backend/utils/misc/guc.c
Original file line number Diff line number Diff line change
Expand Up @@ -2071,6 +2071,17 @@ static struct config_bool ConfigureNamesBool[] =
NULL, NULL, NULL
},

{
{"yb_enable_replication_commands", PGC_SUSET, REPLICATION,
gettext_noop("Enable the replication commands for Publication and Replication Slots."),
NULL,
GUC_NOT_IN_SAMPLE
},
&yb_enable_replication_commands,
true,
NULL, NULL, NULL
},

{
{"ysql_upgrade_mode", PGC_SUSET, DEVELOPER_OPTIONS,
gettext_noop("Enter a special mode designed specifically for YSQL cluster upgrades. "
Expand Down
2 changes: 2 additions & 0 deletions src/postgres/src/include/catalog/index.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,4 +190,6 @@ typedef enum
YB_INDEX_PERM_INDEX_UNUSED = 12,
} YBIndexPermissions;

extern bool YBRelationHasPrimaryKey(Relation rel);

#endif /* INDEX_H */
2 changes: 2 additions & 0 deletions src/postgres/src/include/catalog/pg_publication.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,6 @@ extern char *get_publication_name(Oid pubid);

extern Datum pg_get_publication_tables(PG_FUNCTION_ARGS);

extern bool yb_is_publishable_relation(Relation rel);

#endif /* PG_PUBLICATION_H */
Loading

0 comments on commit db1432d

Please sign in to comment.