diff --git a/src/postgres/src/backend/access/transam/xact.c b/src/postgres/src/backend/access/transam/xact.c index f52bc4fef24f..15877f46aa31 100644 --- a/src/postgres/src/backend/access/transam/xact.c +++ b/src/postgres/src/backend/access/transam/xact.c @@ -3053,6 +3053,45 @@ void CommitTransactionCommand(void) { TransactionState s = CurrentTransactionState; + /* Update the session parameter to the shared memory */ + switch (s->blockState) + { + case TBLOCK_END: /* COMMIT received */ + case TBLOCK_STARTED: /* running single-query transaction */ + { + /* every query will be treated as a single transaction */ + /* Copy the session parameter from the local memory to the shared memory */ + ereport(WARNING, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("Updating the shared memory"))); + + YbUpdateSharedMemory(); + YbCleanChangedSessionParameter(); + } + break; + case TBLOCK_BEGIN: + { + ereport(WARNING, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("Reset the changed session parameters"))); + YbCleanChangedSessionParameter(); + break; + } + case TBLOCK_SUBBEGIN: /* starting a subtransaction */ + case TBLOCK_SUBINPROGRESS: /* live subtransaction */ + case TBLOCK_SUBRELEASE: /* RELEASE received */ + case TBLOCK_SUBCOMMIT: /* COMMIT received while TBLOCK_SUBINPROGRESS */ + case TBLOCK_SUBABORT: /* failed subxact, awaiting ROLLBACK */ + case TBLOCK_SUBABORT_END: /* failed subxact, ROLLBACK received */ + case TBLOCK_SUBABORT_PENDING: /* live subxact, ROLLBACK received */ + case TBLOCK_SUBRESTART: /* live subxact, ROLLBACK TO received */ + case TBLOCK_SUBABORT_RESTART: /* failed subxact, ROLLBACK TO received */ + /* do nothing for sub transaction since */ + break; + default: + /* do nothing */ + break; + } switch (s->blockState) { @@ -6214,4 +6253,4 @@ void YbClearCurrentTransactionId() { CurrentTransactionState->transactionId = InvalidTransactionId; MyPgXact->xid = InvalidTransactionId; -} +} \ No newline at end of file diff --git a/src/postgres/src/backend/postmaster/postmaster.c b/src/postgres/src/backend/postmaster/postmaster.c index 47e36654f6bc..7d049f113c16 100644 --- a/src/postgres/src/backend/postmaster/postmaster.c +++ b/src/postgres/src/backend/postmaster/postmaster.c @@ -73,6 +73,7 @@ #include #include #include +#include #include #include #include @@ -127,6 +128,8 @@ #include "utils/dynamic_loader.h" #include "utils/memutils.h" #include "utils/pidfile.h" +#include "utils/guc.h" +#include "utils/guc_tables.h" #include "utils/ps_status.h" #include "utils/timeout.h" #include "utils/varlena.h" @@ -2613,7 +2616,6 @@ ClosePostmasterPorts(bool am_syslogger) #endif } - /* * reset_shared -- reset shared memory and semaphores */ @@ -2629,17 +2631,6 @@ reset_shared(int port) * objects if the postmaster crashes and is restarted. */ CreateSharedMemoryAndSemaphores(port); - - /* - * Create shared Memory for the `SESSION PARAMETERS` - * Each parameter will be having an array of struct `shared_parameter` - * - */ - - /* todo-yb --> add the Isyugabytedb() function */ - int shmid; - shmid=shmget((key_t)2345, 1024, 0666|IPC_CREAT); /* todo-yb --> Check whether it works without session paramerter */ - } diff --git a/src/postgres/src/backend/utils/misc/guc.c b/src/postgres/src/backend/utils/misc/guc.c index d0501f53d0d0..846d1bc1c8d2 100644 --- a/src/postgres/src/backend/utils/misc/guc.c +++ b/src/postgres/src/backend/utils/misc/guc.c @@ -82,6 +82,7 @@ #include "utils/builtins.h" #include "utils/bytea.h" #include "utils/guc_tables.h" +#include "utils/shared_memory.h" #include "utils/float.h" #include "utils/memutils.h" #include "utils/pg_locale.h" @@ -93,7 +94,7 @@ #include "utils/tzparser.h" #include "utils/varlena.h" #include "utils/xml.h" - +#include #include "pg_yb_utils.h" #ifndef PG_KRB_SRVTAB @@ -526,6 +527,8 @@ int tcp_keepalives_idle; int tcp_keepalives_interval; int tcp_keepalives_count; +char yb_session_client_id[20]; +struct list_changed_parameters *yb_changed_session_parameter = NULL; /* * SSL renegotiation was been removed in PostgreSQL 9.5, but we tolerate it * being set to zero (meaning never renegotiate) for backward compatibility. @@ -6230,6 +6233,34 @@ BeginReportingGUCOptions(void) } } +/* + * YbUpdateSessionParameter + * Updates the changes regarding the session parameter + */ +void YbUpdateSessionParameter(const char* parameter_name){ + if(strcmp(yb_session_client_id,"")==0) + return ; /* No client id */ + + /* Update the local parameter */ + + ereport(WARNING, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("Updating the value %s",parameter_name))); + + struct list_changed_parameters *temp; + /* Check whether the session parameter has alredy changed in the on going transaction */ + + for(temp = yb_changed_session_parameter ; temp!=NULL; temp = temp->next) + if(strcmp(temp->parameter_name,parameter_name)==0) + return ; /* the parameter name already exist in the list of changed session parameter */ + + + temp = (struct list_changed_parameters *) malloc(sizeof(struct list_changed_parameters)); + strcpy(temp->parameter_name,parameter_name); + temp->next = yb_changed_session_parameter; + yb_changed_session_parameter = temp; +} + /* * ReportGUCOption: if appropriate, transmit option value to frontend */ @@ -7636,7 +7667,7 @@ set_config_option(const char *name, const char *value, #undef newval } } - + if (changeVal && (record->flags & GUC_REPORT)) ReportGUCOption(record); @@ -8324,6 +8355,83 @@ AlterSystemSetConfigFile(AlterSystemStmt *altersysstmt) LWLockRelease(AutoFileLock); } +/* + * YbHandleSetClientId - Handles SET client_id + * 1. Assign the yb_session_client_id + * 2. Reset all parameters + * + */ +void YbHandleSetClientId(const char * client_id) +{ + int yb_client_id = atoi(client_id); + + ResetAllOptions(); + + /* if the client_id is -1 */ + if(yb_client_id<0) + { + int new_client_id = yb_shmem_get(); + if(new_client_id > 0 ) + ereport(WARNING,(errcode(ERRCODE_PROTOCOL_VIOLATION),errmsg("shmkey=%d",new_client_id))); + }else{ + strcpy(yb_session_client_id,client_id); + /* Get the Key */ + int shared_memory_key; + int array_len; + if( (shared_memory_key = shmget((key_t)yb_client_id, YB_SHMEM_ARR_LEN, 0666)) < 0 ){ + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("Error at shmget"))); + + return ; + } + + char *shared_memory_pointor ; + + if( (shared_memory_pointor = shmat(shared_memory_key, NULL, 0)) == (char *) -1){ + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("Error at shmat"))); + + return ; + } + /* process the header */ + /* client_id */ + Assert(strcmp(client_id,shared_memory_pointor)==0); /* Both values must match */ + shared_memory_pointor+=20; + + /* array_len */ + array_len = atoi(shared_memory_pointor); /* max length of the array */ + shared_memory_pointor+=20; + + /* database */ + ereport(WARNING,(errcode(ERRCODE_PROTOCOL_VIOLATION),errmsg("database =%s",shared_memory_pointor))); + shared_memory_pointor+=20; + + /* role */ + ereport(WARNING,(errcode(ERRCODE_PROTOCOL_VIOLATION),errmsg("role =%s",shared_memory_pointor))); + shared_memory_pointor+=20; + + int i; + for(i=0; iname,"client_id")==0) + { + YbHandleSetClientId(ExtractSetVariableArgs(stmt)); /* Handle the client_id */ + return; + } switch (stmt->kind) { @@ -8362,13 +8479,21 @@ ExecSetVariableStmt(VariableSetStmt *stmt, bool isTopLevel) case VAR_SET_CURRENT: if (stmt->is_local) WarnNoTransactionBlock(isTopLevel, "SET LOCAL"); - (void) set_config_option(stmt->name, + + if(set_config_option(stmt->name, ExtractSetVariableArgs(stmt), (superuser() || YbDbAdminCanSet ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION, - action, true, 0, false); + action, true, 0, false) == 1) + { + YbUpdateSessionParameter(stmt->name); + ereport(WARNING, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("Updating ...."))); + } check_reserved_prefixes(stmt->name); + break; case VAR_SET_MULTI: @@ -8497,12 +8622,18 @@ SetPGVariable(const char *name, List *args, bool is_local) char *argstring = flatten_set_variable_args(name, args); /* Note SET DEFAULT (argstring == NULL) is equivalent to RESET */ - (void) set_config_option(name, + if(set_config_option(name, argstring, (superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION, is_local ? GUC_ACTION_LOCAL : GUC_ACTION_SET, - true, 0, false); + true, 0, false)==1 && is_local == 0) + { + YbUpdateSessionParameter(name); + ereport(WARNING, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("Updating ...."))); + } } /* @@ -8540,12 +8671,16 @@ set_config_by_name(PG_FUNCTION_ARGS) is_local = PG_GETARG_BOOL(2); /* Note SET DEFAULT (argstring == NULL) is equivalent to RESET */ - (void) set_config_option(name, + if(set_config_option(name, value, (superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION, is_local ? GUC_ACTION_LOCAL : GUC_ACTION_SET, - true, 0, false); + true, 0, false)==1){ + YbUpdateSessionParameter(name); + ereport(WARNING, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("Updating ....")));} /* get the new current value */ new_value = GetConfigOptionByName(name, NULL, false); @@ -8554,7 +8689,6 @@ set_config_by_name(PG_FUNCTION_ARGS) PG_RETURN_TEXT_P(cstring_to_text(new_value)); } - /* * Common code for DefineCustomXXXVariable subroutines: allocate the * new variable's config struct and fill in generic fields. @@ -10924,6 +11058,114 @@ GUCArrayReset(ArrayType *array) return newarray; } +/* + * YbUpdateSharedMemory + * Copy the changed session parameter value to the shared memory + * 1. Check for `yb_session_client_id` if not present then exit + * 2. Copy the context from local memory to shared memory + * 3. Delete the local context. + * 4. Reset `yb_session_client_id` + * + * NOTE: This function will only be called on `COMMIT` or `SUBCOMMIT`. + */ +void YbUpdateSharedMemory(){ + if(strcmp(yb_session_client_id,"")==0) + { + /* yb_changed_session_parameter can only be present if yb_session_client_id is set */ + Assert(yb_changed_session_parameter==NULL); + return; + } + int client_id = atoi(yb_session_client_id); + ereport(WARNING, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("Processing ..."))); + /* Get the Key */ + + int shared_memory_key; + if( (shared_memory_key = shmget((key_t)client_id, YB_SHAMEM_SIZE, 0666)) < 0 ){ + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("Error at shmget"))); + + return ; + } + + char *list_shared_parameter; + + if( (list_shared_parameter = shmat(shared_memory_key, NULL, 0)) == (char *) -1){ + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("Error at shmat"))); + + return ; + } + int i; + bool found =0 ; + /* debug::-- working fine till here */ + Assert(strcmp(yb_session_client_id,list_shared_parameter)==0); + list_shared_parameter += 20; + + const int array_size = atoi(list_shared_parameter); + list_shared_parameter += 60; /* ignore database and role */ + char *reset_val = list_shared_parameter; + for(struct list_changed_parameters *temp = yb_changed_session_parameter; temp!=NULL; temp=temp->next) + { + list_shared_parameter = reset_val; + char *parameter_name = temp->parameter_name; + + ereport(WARNING, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("Processing %s ", parameter_name))); + + for(i=0; inext; + free(yb_changed_session_parameter); + yb_changed_session_parameter = temp; + } +} + /* * Validate a proposed option setting for GUCArrayAdd/Delete/Reset. * @@ -11949,4 +12191,113 @@ check_backoff_multiplier(double *multiplier, void **extra, GucSource source) return true; } +/* + * yb_shmem_insert + * + * compares the size of the shared memory available and required. + * resize the memory + * compares and insert session parameters and values in the shared_memory + * + * Returns --> + * 0 --> Successfull update + * -1 --> Update failed due to memory issue at shmget + * -2 --> Update failed due to other issues at shmget + * -3 --> Reading failed at shmat + * + */ +int yb_shmem_insert(char **list_parameters, const unsigned int ipc_key ){ + int array_length; + struct yb_shared_parameter *list_shared_parameter; + /* get the list len */ + int shared_memory_key; + if( (shared_memory_key = shmget((key_t)ipc_key, sizeof(int), 0666)) < 0 ){ + perror("Failed to get the memory size: Unable to use shmget:"); + if(errno == YB_SHMEM_ALLOC_ERR_CODE) + return -1; + else + return -2; + + } + int *array_length_ptr; + if( (array_length_ptr = shmat(shared_memory_key, NULL, 0)) == NULL ){ + perror("Failed to READ memory size:"); + return -3 ; + } + array_length = *array_length_ptr; + shmdt(array_length_ptr); + + /* get the list */ + void *shared_mem_pointer; + if( (shared_memory_key = shmget((key_t)ipc_key, sizeof(int)+sizeof(struct yb_shared_parameter)*array_length, 0666)) < 0 ){ + perror("Failed to get the memory size: Unable to use shmget:"); + if(errno == YB_SHMEM_ALLOC_ERR_CODE) + return -1; + else + return -2; + + } + if( (shared_mem_pointer = shmat(shared_memory_key, NULL, 0)) == NULL){ + perror("Failed to READ memory size:"); + return -3; + } + shared_mem_pointer += sizeof(int); /* skip the mem size */ + list_shared_parameter = (struct yb_shared_parameter*) shared_mem_pointer; + if(list_shared_parameter == NULL) + return 0; + /* + * compare the names and consider the upgradation of the parameter (can be optimised ) + */ + /* + * update values that are already present + */ + /* + * resize the shared memory if needed + */ + /* + * add new space + */ + return 1; +} + +int yb_shmem_get() +{ + + /* + * structure of the array size + * | array_len | parameter_name_1 | parameter_value_1 | parameter_name_2 | ..... + * + */ + int shmem_key = KEY_MIN_SHAMEM; + int ipc_key; + char *shared_mem_pointer; + //struct yb_shared_parameter *list_shared_parameter; + /* get the shmem_key */ + while( (ipc_key = shmget((key_t)shmem_key,YB_SHAMEM_SIZE, 0666|IPC_EXCL|IPC_CREAT))<0 ) + { + shmem_key++; + /* it will fail if the key exist */ + } + /* we got a unique key */ + + + /* get the memory pointer*/ + if( (shared_mem_pointer = shmat(ipc_key, NULL, 0)) == NULL){ + perror("Failed to READ memory size:"); + return -3; + } + + /* Header file */ + sprintf(shared_mem_pointer,"%d",shmem_key); + shared_mem_pointer += 20 ; + sprintf(shared_mem_pointer,"%d",YB_SHMEM_ARR_LEN); + shared_mem_pointer += 20 ; + strcpy(shared_mem_pointer,"yugabyte"); /* database */ + shared_mem_pointer += 20 ; + strcpy(shared_mem_pointer,"yugabyte_role"); /* user */ + shared_mem_pointer += 20 ; + + return shmem_key; +} + + #include "guc-file.c" diff --git a/src/postgres/src/include/utils/guc.h b/src/postgres/src/include/utils/guc.h index 466745a9d216..645f6f5f6bb6 100644 --- a/src/postgres/src/include/utils/guc.h +++ b/src/postgres/src/include/utils/guc.h @@ -405,6 +405,8 @@ extern ArrayType *GUCArrayAdd(ArrayType *array, const char *name, const char *va extern ArrayType *GUCArrayDelete(ArrayType *array, const char *name); extern ArrayType *GUCArrayReset(ArrayType *array); +extern void YbUpdateSharedMemory(); +extern void YbCleanChangedSessionParameter(); #ifdef EXEC_BACKEND extern void write_nondefault_variables(GucContext context); extern void read_nondefault_variables(void); @@ -455,4 +457,21 @@ extern void assign_search_path(const char *newval, void *extra); extern bool check_wal_buffers(int *newval, void **extra, GucSource source); extern void assign_xlog_sync_method(int new_sync_method, void *extra); +#define YB_SHMEM_ARR_LEN 10 + +#define YB_SHAMEM_SIZE (40* YB_SHMEM_ARR_LEN +8) /* + * Strarting memory size (in bytes) of shared memory + * for storing the `session_parameter` of a given client + */ + +#define KEY_MIN_SHAMEM 2346 /* Smallest key value that will be used for shared memory */ +#define MAX_PARAM_NAME_LEN 30 /* MAX Length of the parameter name */ +#define MAX_PARAM_VALUE_LEN 20 /* MAX Length of the parameter size */ + +#define YB_SHMEM_ALLOC_ERR_CODE 22 +extern int yb_shmem_create(); /* creates a shared memory block for a client id */ +/* wrapper functions */ +extern int yb_shmem_remove(); /* remove the shared memory block for a client id */ +extern int yb_shmem_get(); /* remove the shared memory block for a client id */ +extern int yb_shmem_insert(char **list_parameters, const unsigned int icp_id ); /* insert a given */ #endif /* GUC_H */ diff --git a/src/postgres/src/include/utils/guc_tables.h b/src/postgres/src/include/utils/guc_tables.h index c2c922985aae..aa93e2d5ca1a 100644 --- a/src/postgres/src/include/utils/guc_tables.h +++ b/src/postgres/src/include/utils/guc_tables.h @@ -13,7 +13,7 @@ */ #ifndef GUC_TABLES_H #define GUC_TABLES_H 1 - +#define SHAMEM_SIZE 7 #include "utils/guc.h" /* @@ -268,6 +268,18 @@ struct config_enum void *reset_extra; }; +struct yb_shared_parameter +{ + char yb_proxy_client_id[6]; /* client_id generated by the yb_proxy */ + char parameter_value[20]; /* value of the parameter */ + char parameter_name[20]; /* value of the parameter */ +}; + +struct list_changed_parameters +{ + char parameter_name[20]; /* name of the session parameter changed */ + struct list_changed_parameters *next; /* next element in the list */ +}; /* constant tables corresponding to enums above and in guc.h */ extern const char *const config_group_names[]; extern const char *const config_type_names[];