From 963f869a76db21d1a431904a9e57c7adc0a1930e Mon Sep 17 00:00:00 2001 From: neil Date: Sat, 20 May 2023 15:58:54 -0700 Subject: [PATCH] Rebase with yb::master up to commit d5d857e36f73ade9e69b69503dc04db521278e5b --- .../catalog/yb_catalog/yb_catalog_version.c | 66 ++++++++++++++++++- .../src/backend/executor/ybcModifyTable.c | 28 ++++++++ src/postgres/src/backend/libpq/pqcomm.c | 20 ++++-- .../src/backend/postmaster/postmaster.c | 4 -- src/postgres/src/backend/tcop/postgres.c | 29 ++++++++ src/postgres/src/backend/tcop/pquery.c | 14 ++-- .../src/backend/utils/cache/relcache.c | 13 ++++ src/postgres/src/backend/utils/misc/guc.c | 30 +++++++++ .../src/backend/utils/misc/pg_yb_utils.c | 23 ++++++- .../src/include/catalog/yb_catalog_version.h | 3 +- src/postgres/src/include/libpq/libpq.h | 2 + src/postgres/src/include/pg_yb_utils.h | 5 +- src/postgres/src/include/tcop/pquery.h | 8 +++ 13 files changed, 224 insertions(+), 21 deletions(-) diff --git a/src/postgres/src/backend/catalog/yb_catalog/yb_catalog_version.c b/src/postgres/src/backend/catalog/yb_catalog/yb_catalog_version.c index 3c0b93e635ef..33793159f46b 100644 --- a/src/postgres/src/backend/catalog/yb_catalog/yb_catalog_version.c +++ b/src/postgres/src/backend/catalog/yb_catalog/yb_catalog_version.c @@ -16,6 +16,8 @@ #include "access/htup_details.h" #include "access/sysattr.h" #include "catalog/catalog.h" +#include "catalog/namespace.h" +#include "catalog/pg_authid_d.h" #include "catalog/pg_database.h" #include "catalog/pg_namespace_d.h" #include "catalog/pg_proc.h" @@ -78,12 +80,69 @@ uint64_t YbGetMasterCatalogVersion() /* Modify Catalog Version */ +static void +YbCallSQLIncrementCatalogVersions(bool is_breaking_change) +{ + List* names = + list_make2(makeString("pg_catalog"), + makeString("yb_increment_all_db_catalog_versions")); + FuncCandidateList clist = FuncnameGetCandidates( + names, + -1 /* nargs */, + NIL /* argnames */, + false /* expand_variadic */, + false /* expand_defaults */, + false /* include_out_arguments */, + false /* missing_ok */); + /* We expect exactly one candidate. */ + Assert(clist && clist->next == NULL); + Oid functionId = clist->oid; + FmgrInfo flinfo; + FunctionCallInfoBaseData fcinfo; + fmgr_info(functionId, &flinfo); + InitFunctionCallInfoData(fcinfo, &flinfo, 1, InvalidOid, NULL, NULL); + fcinfo.args[0].value = BoolGetDatum(is_breaking_change); + fcinfo.args[0].isnull = false; + + // Save old values and set new values to enable the call. + bool saved = yb_non_ddl_txn_for_sys_tables_allowed; + yb_non_ddl_txn_for_sys_tables_allowed = true; + Oid save_userid; + int save_sec_context; + GetUserIdAndSecContext(&save_userid, &save_sec_context); + SetUserIdAndSecContext(BOOTSTRAP_SUPERUSERID, + SECURITY_RESTRICTED_OPERATION); + PG_TRY(); + { + FunctionCallInvoke(&fcinfo); + /* Restore old values. */ + yb_non_ddl_txn_for_sys_tables_allowed = saved; + SetUserIdAndSecContext(save_userid, save_sec_context); + } + PG_CATCH(); + { + /* Restore old values. */ + yb_non_ddl_txn_for_sys_tables_allowed = saved; + SetUserIdAndSecContext(save_userid, save_sec_context); + PG_RE_THROW(); + } + PG_END_TRY(); +} + static void YbIncrementMasterDBCatalogVersionTableEntryImpl( - Oid db_oid, bool is_breaking_change) + Oid db_oid, bool is_breaking_change, bool is_global_ddl) { Assert(YbGetCatalogVersionType() == CATALOG_VERSION_CATALOG_TABLE); + if (is_global_ddl) + { + Assert(YBIsDBCatalogVersionMode()); + /* Call yb_increment_all_db_catalog_versions(is_breaking_change). */ + YbCallSQLIncrementCatalogVersions(is_breaking_change); + return; + } + YBCPgStatement update_stmt = NULL; YBCPgTypeAttrs type_attrs = { 0 }; YBCPgExpr yb_expr; @@ -166,7 +225,8 @@ YbIncrementMasterDBCatalogVersionTableEntryImpl( RelationClose(rel); } -bool YbIncrementMasterCatalogVersionTableEntry(bool is_breaking_change) +bool YbIncrementMasterCatalogVersionTableEntry(bool is_breaking_change, + bool is_global_ddl) { if (YbGetCatalogVersionType() != CATALOG_VERSION_CATALOG_TABLE) return false; @@ -175,7 +235,7 @@ bool YbIncrementMasterCatalogVersionTableEntry(bool is_breaking_change) */ YbIncrementMasterDBCatalogVersionTableEntryImpl( YBIsDBCatalogVersionMode() ? MyDatabaseId : Template1DbOid, - is_breaking_change); + is_breaking_change, is_global_ddl); return true; } diff --git a/src/postgres/src/backend/executor/ybcModifyTable.c b/src/postgres/src/backend/executor/ybcModifyTable.c index 4c9932e235e9..1bbea26be6c9 100644 --- a/src/postgres/src/backend/executor/ybcModifyTable.c +++ b/src/postgres/src/backend/executor/ybcModifyTable.c @@ -26,6 +26,10 @@ #include "access/htup_details.h" #include "access/sysattr.h" #include "access/xact.h" +#include "catalog/indexing.h" +#include "catalog/pg_authid_d.h" +#include "catalog/pg_auth_members_d.h" +#include "catalog/pg_tablespace_d.h" #include "catalog/pg_type.h" #include "catalog/pg_yb_role_profile.h" #include "catalog/pg_yb_role_profile_d.h" @@ -203,6 +207,30 @@ static void YBCExecWriteStmt(YBCPgStatement ybc_stmt, // TODO(shane) also update the shared memory catalog version here. YbUpdateCatalogCacheVersion(YbGetCatalogCacheVersion() + 1); } + + if (YBIsDBCatalogVersionMode() && + RelationGetForm(rel)->relisshared && + RelationSupportsSysCache(RelationGetRelid(rel)) && + !(*YBCGetGFlags()->ysql_disable_global_impact_ddl_statements)) + { + /* NOTE: relisshared implies that rel is a system relation. */ + Assert(IsSystemRelation(rel)); + Assert(/* pg_authid */ + RelationGetRelid(rel) == AuthIdRelationId || + RelationGetRelid(rel) == AuthIdRolnameIndexId || + + /* pg_auth_members */ + RelationGetRelid(rel) == AuthMemRelationId || + RelationGetRelid(rel) == AuthMemRoleMemIndexId || + RelationGetRelid(rel) == AuthMemMemRoleIndexId || + + /* pg_database */ + RelationGetRelid(rel) == DatabaseRelationId || + + /* pg_tablespace */ + RelationGetRelid(rel) == TableSpaceRelationId); + YbSetIsGlobalDDL(); + } } /* diff --git a/src/postgres/src/backend/libpq/pqcomm.c b/src/postgres/src/backend/libpq/pqcomm.c index 0e6b773f08a6..40d2c915d2b7 100644 --- a/src/postgres/src/backend/libpq/pqcomm.c +++ b/src/postgres/src/backend/libpq/pqcomm.c @@ -1051,11 +1051,9 @@ pq_getbyte(void) * Same as pq_getbyte() except we don't advance the pointer. * -------------------------------- */ -int -pq_peekbyte(void) +static int +pq_peekbyte_impl(void) { - Assert(PqCommReadingMsg); - while (PqRecvPointer >= PqRecvLength) { if (pq_recvbuf()) /* If nothing in buffer, then recv some */ @@ -1064,6 +1062,20 @@ pq_peekbyte(void) return (unsigned char) PqRecvBuffer[PqRecvPointer]; } +int +pq_peekbyte(void) +{ + Assert(PqCommReadingMsg); + + return pq_peekbyte_impl(); +} + +int +yb_pq_peekbyte_no_msg_reading_status_check(void) +{ + return pq_peekbyte_impl(); +} + /* -------------------------------- * pq_getbyte_if_available - get a single byte from connection, * if available diff --git a/src/postgres/src/backend/postmaster/postmaster.c b/src/postgres/src/backend/postmaster/postmaster.c index 39bc2b561249..bc6e9793ea83 100644 --- a/src/postgres/src/backend/postmaster/postmaster.c +++ b/src/postgres/src/backend/postmaster/postmaster.c @@ -614,10 +614,6 @@ PostmasterMain(int argc, char *argv[]) IsPostmasterEnvironment = true; - if (YBIsEnabledInPostgresEnvVar()) { - YBCStatementTimeoutPtr = &StatementTimeout; - } - /* * Start our win32 signal implementation */ diff --git a/src/postgres/src/backend/tcop/postgres.c b/src/postgres/src/backend/tcop/postgres.c index 36acab2e8223..6096dd2cb413 100644 --- a/src/postgres/src/backend/tcop/postgres.c +++ b/src/postgres/src/backend/tcop/postgres.c @@ -473,6 +473,33 @@ SocketBackend(StringInfo inBuf) */ if (pq_getmessage(inBuf, maxmsglen)) return EOF; /* suitable message already logged */ + + if (IsYugaByteEnabled()) + { + switch(qtype) + { + case 'E': + switch (yb_pg_batch_detection_mechanism) + { + case ASSUME_ALL_BATCH_EXECUTIONS: + YbSetIsBatchedExecution(true); + break; + case DETECT_BY_PEEKING: + if (!YbIsBatchedExecution() && + yb_pq_peekbyte_no_msg_reading_status_check() != + 'S') + YbSetIsBatchedExecution(true); + break; + } + break; + case 'S': + YbSetIsBatchedExecution(false); + break; + default: + break; + } + } + RESUME_CANCEL_INTERRUPTS(); return qtype; @@ -493,7 +520,9 @@ ReadCommand(StringInfo inBuf) if (whereToSendOutput == DestRemote) result = SocketBackend(inBuf); else + { result = InteractiveBackend(inBuf); + } return result; } diff --git a/src/postgres/src/backend/tcop/pquery.c b/src/postgres/src/backend/tcop/pquery.c index 4ae8769b6453..354dda59454a 100644 --- a/src/postgres/src/backend/tcop/pquery.c +++ b/src/postgres/src/backend/tcop/pquery.c @@ -38,6 +38,8 @@ */ Portal ActivePortal = NULL; +int yb_pg_batch_detection_mechanism; + static void ProcessQuery(PlannedStmt *plan, const char *sourceText, ParamListInfo params, @@ -1236,13 +1238,13 @@ PortalRunMulti(Portal portal, if (altdest->mydest == DestRemoteExecute) altdest = None_Receiver; - if (IsYugaByteEnabled()) + if (IsYugaByteEnabled() && + !IsTransactionBlock() && + !YbIsBatchedExecution() && + list_length(portal->stmts) == 1) { - if (!IsTransactionBlock() && list_length(portal->stmts) == 1) - { - PlannedStmt *pstmt = linitial_node(PlannedStmt, portal->stmts); - is_single_row_modify_txn = YBCIsSingleRowModify(pstmt); - } + PlannedStmt *pstmt = linitial_node(PlannedStmt, portal->stmts); + is_single_row_modify_txn = YBCIsSingleRowModify(pstmt); } /* diff --git a/src/postgres/src/backend/utils/cache/relcache.c b/src/postgres/src/backend/utils/cache/relcache.c index c329c73eebae..a5ac30241ee4 100644 --- a/src/postgres/src/backend/utils/cache/relcache.c +++ b/src/postgres/src/backend/utils/cache/relcache.c @@ -1727,6 +1727,12 @@ YBUpdateRelationsAttributes(bool sys_relations_update_required) SysScanDesc scandesc = systable_beginscan( attrel, InvalidOid, false /* indexOk */, NULL, 0, NULL); YbAttrProcessorState state = {0}; + MemoryContext per_tuple_memory_context = + (*YBCGetGFlags()->ysql_disable_per_tuple_memory_context_in_update_relattrs) ? + NULL : AllocSetContextCreate(GetCurrentMemoryContext(), + "PerTupleContext", ALLOCSET_DEFAULT_SIZES); + if (per_tuple_memory_context) + MemoryContextSwitchTo(per_tuple_memory_context); HeapTuple htup; while (HeapTupleIsValid(htup = systable_getnext(scandesc))) { @@ -1736,8 +1742,15 @@ YBUpdateRelationsAttributes(bool sys_relations_update_required) YbStartNewAttrProcessing( &state, sys_relations_update_required, attrel, htup); } + if (per_tuple_memory_context) + MemoryContextReset(per_tuple_memory_context); } YbCompleteAttrProcessing(&state); + if (per_tuple_memory_context) + { + MemoryContextSwitchTo(per_tuple_memory_context->parent); + MemoryContextDelete(per_tuple_memory_context); + } systable_endscan(scandesc); table_close(attrel, AccessShareLock); } diff --git a/src/postgres/src/backend/utils/misc/guc.c b/src/postgres/src/backend/utils/misc/guc.c index cd52ab18e6bd..b4cc0a8a93ac 100644 --- a/src/postgres/src/backend/utils/misc/guc.c +++ b/src/postgres/src/backend/utils/misc/guc.c @@ -118,6 +118,7 @@ /* Yugabyte includes */ #include "commands/copy.h" #include "executor/ybcModifyTable.h" +#include "tcop/pquery.h" #include "pg_yb_utils.h" #ifndef PG_KRB_SRVTAB @@ -582,6 +583,13 @@ static struct config_enum_entry recovery_init_sync_method_options[] = { {NULL, 0, false} }; +const struct config_enum_entry yb_pg_batch_detection_mechanism_options[] = { + {"detect_by_peeking", DETECT_BY_PEEKING, false}, + {"assume_all_batch_executions", ASSUME_ALL_BATCH_EXECUTIONS, false}, + {"ignore_batch_delete_and_update_may_fail", IGNORE_BATCH_DELETE_AND_UPDATE_MAY_FAIL, false}, + {NULL, 0, false} +}; + static struct config_enum_entry shared_memory_options[] = { #ifndef WIN32 {"sysv", SHMEM_TYPE_SYSV, false}, @@ -5720,6 +5728,28 @@ static struct config_enum ConfigureNamesEnum[] = NULL, NULL, NULL }, + { + {"yb_pg_batch_detection_mechanism", PGC_SIGHUP, COMPAT_OPTIONS_CLIENT, + gettext_noop("The drivers use message protocol to communicate " + "with PG. The driver does not inform PG in advance " + "about a Batch execution. We need to identify a batch " + "because in that case the single-shard optimization " + "should be disabled. Postgres drivers pipeline " + "messages and we exploit this to peek the message " + "following 'Execute' to detect a batch. This may " + "lead to some unforeseen bugs, so this GUC provides " + "a way to disable the single-shard optimization " + "completely or go back to the behavior before " + "#16446 was fixed."), + NULL, + GUC_SUPERUSER_ONLY + }, + &yb_pg_batch_detection_mechanism, + DETECT_BY_PEEKING, + yb_pg_batch_detection_mechanism_options, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, 0, NULL, NULL, NULL, NULL diff --git a/src/postgres/src/backend/utils/misc/pg_yb_utils.c b/src/postgres/src/backend/utils/misc/pg_yb_utils.c index e038f3f62e79..b87e512c5fe0 100644 --- a/src/postgres/src/backend/utils/misc/pg_yb_utils.c +++ b/src/postgres/src/backend/utils/misc/pg_yb_utils.c @@ -653,6 +653,7 @@ YBInitPostgresBackend( * TODO: do we really need to DB name / username here? */ HandleYBStatus(YBCPgInitSession(db_name ? db_name : user_name)); + YBCSetTimeout(StatementTimeout, NULL); } } @@ -1248,6 +1249,7 @@ typedef struct DdlTransactionState { MemoryContext mem_context; bool is_catalog_version_increment; bool is_breaking_catalog_change; + bool is_global_ddl; NodeTag original_node_tag; } DdlTransactionState; @@ -1315,6 +1317,10 @@ YBGetDdlNestingLevel() return ddl_transaction_state.nesting_level; } +void YbSetIsGlobalDDL() { + ddl_transaction_state.is_global_ddl = true; +} + void YBIncrementDdlNestingLevel(bool is_catalog_version_increment, bool is_breaking_catalog_change) @@ -1361,18 +1367,21 @@ YBDecrementDdlNestingLevel() YBResetEnableNonBreakingDDLMode(); bool is_catalog_version_increment = ddl_transaction_state.is_catalog_version_increment; bool is_breaking_catalog_change = ddl_transaction_state.is_breaking_catalog_change; + bool is_global_ddl = ddl_transaction_state.is_global_ddl; /* - * Reset the two flags to false prior to executing + * Reset these flags to false prior to executing * YbIncrementMasterCatalogVersionTableEntry() such that * even when it throws an exception we still reset the flags. */ ddl_transaction_state.is_catalog_version_increment = false; ddl_transaction_state.is_breaking_catalog_change = false; + ddl_transaction_state.is_global_ddl = false; + const bool increment_done = is_catalog_version_increment && YBCPgHasWriteOperationsInDdlTxnMode() && YbIncrementMasterCatalogVersionTableEntry( - is_breaking_catalog_change); + is_breaking_catalog_change, is_global_ddl); HandleYBStatus(YBCPgExitSeparateDdlTxnMode()); @@ -3303,3 +3312,13 @@ uint32_t YbGetNumberOfDatabases() Assert(num_databases > 0); return num_databases; } + +static bool yb_is_batched_execution = false; + +bool YbIsBatchedExecution() { + return yb_is_batched_execution; +} + +void YbSetIsBatchedExecution(bool value) { + yb_is_batched_execution = value; +} diff --git a/src/postgres/src/include/catalog/yb_catalog_version.h b/src/postgres/src/include/catalog/yb_catalog_version.h index 1883c1da1350..fda951aa3fc1 100644 --- a/src/postgres/src/include/catalog/yb_catalog_version.h +++ b/src/postgres/src/include/catalog/yb_catalog_version.h @@ -35,7 +35,8 @@ extern YbCatalogVersionType yb_catalog_version_type; extern uint64_t YbGetMasterCatalogVersion(); /* Send a request to increment the master catalog version. */ -extern bool YbIncrementMasterCatalogVersionTableEntry(bool is_breaking_change); +extern bool YbIncrementMasterCatalogVersionTableEntry(bool is_breaking_change, + bool is_global_ddl); /* Send a request to create the master catalog version for the given database. */ extern void YbCreateMasterDBCatalogVersionTableEntry(Oid db_oid); diff --git a/src/postgres/src/include/libpq/libpq.h b/src/postgres/src/include/libpq/libpq.h index 4808e27860f1..26566cfc30a2 100644 --- a/src/postgres/src/include/libpq/libpq.h +++ b/src/postgres/src/include/libpq/libpq.h @@ -87,6 +87,8 @@ extern bool pq_check_connection(void); extern bool pq_buffer_has_data(void); extern int pq_putbytes(const char *s, size_t len); +extern int yb_pq_peekbyte_no_msg_reading_status_check(void); + /* * prototypes for functions in be-secure.c */ diff --git a/src/postgres/src/include/pg_yb_utils.h b/src/postgres/src/include/pg_yb_utils.h index fdb161b71a46..fbfbd9c92b2b 100644 --- a/src/postgres/src/include/pg_yb_utils.h +++ b/src/postgres/src/include/pg_yb_utils.h @@ -492,7 +492,6 @@ extern int yb_wait_for_backends_catalog_version_timeout; //------------------------------------------------------------------------------ // GUC variables needed by YB via their YB pointers. extern int StatementTimeout; -extern int *YBCStatementTimeoutPtr; //------------------------------------------------------------------------------ // YB Debug utils. @@ -561,6 +560,7 @@ extern const char* YbBitmapsetToString(Bitmapset *bms); bool YBIsInitDbAlreadyDone(); int YBGetDdlNestingLevel(); +void YbSetIsGlobalDDL(); void YBIncrementDdlNestingLevel(bool is_catalog_version_increment, bool is_breaking_catalog_change); void YBDecrementDdlNestingLevel(); @@ -772,6 +772,9 @@ void GetStatusMsgAndArgumentsByCode( const char **msg_buf, size_t *msg_nargs, const char ***msg_args, const char **detail_buf, size_t *detail_nargs, const char ***detail_args); +bool YbIsBatchedExecution(); +void YbSetIsBatchedExecution(bool value); + #define HandleYBStatus(status) \ HandleYBStatusAtErrorLevel(status, ERROR) diff --git a/src/postgres/src/include/tcop/pquery.h b/src/postgres/src/include/tcop/pquery.h index f9a6882ecb0a..9526f28ace9b 100644 --- a/src/postgres/src/include/tcop/pquery.h +++ b/src/postgres/src/include/tcop/pquery.h @@ -19,6 +19,14 @@ struct PlannedStmt; /* avoid including plannodes.h here */ +enum yb_pg_batch_detection_mechanism_options +{ + DETECT_BY_PEEKING = 0, + ASSUME_ALL_BATCH_EXECUTIONS, + IGNORE_BATCH_DELETE_AND_UPDATE_MAY_FAIL +}; + +extern int yb_pg_batch_detection_mechanism; extern PGDLLIMPORT Portal ActivePortal;