Skip to content

Commit

Permalink
[YottaDB#136] Reset connection-related state soon after connection ge…
Browse files Browse the repository at this point in the history
…ts reset; Not doing so could affect new connection when it gets established

This is an issue observed from a test failure (in the merge/tp_stress subtest) during internal
testing. The primary failure symptom was an assert failure in the source server

  %GTM-F-ASSERT, Assert failed in sr_unix/gtmsource_process_ops.c line 1167 for expression (FALSE)

The source server assert failed because it was expecting a REPL_INSTINFO message
(msgtype == 29) but instead got a REPL_CMP2UNCMP message (msgtype == 25).

The merge/tp_stress subtest induces a REPL_CMP2UNCMP situation using white-box code so the fact
that the receiver sent a REPL_CMP2UNCMP message is not surprising. But the timing of the send
is where the problem is.

The issue is that we maintain the fact that a CMP2UNCMP message needs to be sent in a static
variable "send_cmp2uncmp" and had set it to TRUE (in gtmrecv_poll_actions.c) but forgot to
clear it before moving on to the new connection. So this leftover static variable caused
the incorrect send and resulted in the assert failure of the source server. In pro, the
source server would have issued an error message and closed the connection and opened a new
connection with the receiver server. So the user would not notice any issues because of this
(in the sense, the source server will not go down) even though they can see evidence of an
unnecessary disconnect/reconnect sequence in the source server log.

As part of the connection reset, we currently reset a few static variables of the old connection
but do not reset the send_cmp2uncmp variable. There are a few other statics in use too and are
best cleared whenever a connection gets reset.

So that is how this fix proceeds. All statics related to the current connection are bundled up
in a conn_state_t structure and maintained in the "curr_conn_state" static variable. Anytime
a connection gets reset, the entire structure gets cleared.
  • Loading branch information
nars1 committed Jan 26, 2018
1 parent 57bf3f4 commit 1a17969
Showing 1 changed file with 72 additions and 56 deletions.
128 changes: 72 additions & 56 deletions sr_unix/gtmrecv_poll_actions.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
* Copyright (c) 2008-2017 Fidelity National Information *
* Services, Inc. and/or its subsidiaries. All rights reserved. *
* *
* Copyright (c) 2018 YottaDB LLC. and/or its subsidiaries. *
* All rights reserved. *
* *
* This source code contains the intellectual property *
* of its copyright holder(s), and is made available *
* under a license. If you do not know the terms of *
Expand Down Expand Up @@ -84,18 +87,27 @@ enum
STOP_POLL
};


/* The below structure defines variables that maintain state of the current connection (e.g. if an xoff needs to be sent etc.)
* All these need to be cleared if/when the current connection gets closed.
*/
typedef struct
{
boolean_t send_xoff;
boolean_t xoff_sent;
seq_num send_seqno;
boolean_t log_draining_msg;
boolean_t send_badtrans;
boolean_t send_cmp2uncmp;
boolean_t upd_shut_too_early_logged;
} conn_state_t;

int gtmrecv_poll_actions1(int *pending_data_len, int *buff_unprocessed, unsigned char *buffp)
{
static int report_cnt = 1;
static int next_report_at = 1;
static boolean_t send_xoff = FALSE;
static boolean_t xoff_sent = FALSE;
static seq_num send_seqno;
static boolean_t log_draining_msg = FALSE;
static boolean_t send_badtrans = FALSE;
static boolean_t send_cmp2uncmp = FALSE;
static boolean_t upd_shut_too_early_logged = FALSE;
static time_t last_reap_time = 0;
static conn_state_t curr_conn_state;
repl_msg_t xoff_msg;
repl_badtrans_msg_t bad_trans_msg;
boolean_t alert = FALSE, info = FALSE;
Expand Down Expand Up @@ -159,7 +171,7 @@ int gtmrecv_poll_actions1(int *pending_data_len, int *buff_unprocessed, unsigned
"INFO : Update process not running due to user initiated shutdown\n");
if (1 == report_cnt)
{
send_xoff = TRUE;
curr_conn_state.send_xoff = TRUE;
recvpool_ctl->old_jnl_seqno = recvpool_ctl->jnl_seqno;
recvpool_ctl->jnl_seqno = 0;
/* Even though we have identified that the update process is NOT alive, a waitpid on the update
Expand Down Expand Up @@ -200,42 +212,42 @@ int gtmrecv_poll_actions1(int *pending_data_len, int *buff_unprocessed, unsigned
* REPL_XOFF source side and drain the replication pipe
*/
onln_rlbk_flg_set = TRUE;
send_xoff = TRUE;
} else if (!send_cmp2uncmp && gtmrecv_send_cmp2uncmp)
curr_conn_state.send_xoff = TRUE;
} else if (!curr_conn_state.send_cmp2uncmp && gtmrecv_send_cmp2uncmp)
{
send_xoff = TRUE;
send_seqno = recvpool_ctl->jnl_seqno;
send_cmp2uncmp = TRUE;
} else if (!send_badtrans && upd_proc_local->bad_trans)
curr_conn_state.send_xoff = TRUE;
curr_conn_state.send_seqno = recvpool_ctl->jnl_seqno;
curr_conn_state.send_cmp2uncmp = TRUE;
} else if (!curr_conn_state.send_badtrans && upd_proc_local->bad_trans)
{
send_xoff = TRUE;
send_seqno = upd_proc_local->read_jnl_seqno;
send_badtrans = TRUE;
curr_conn_state.send_xoff = TRUE;
curr_conn_state.send_seqno = upd_proc_local->read_jnl_seqno;
curr_conn_state.send_badtrans = TRUE;
bad_trans_detected = TRUE;
} else if (!upd_proc_local->bad_trans && send_badtrans && 1 != report_cnt)
} else if (!upd_proc_local->bad_trans && curr_conn_state.send_badtrans && (1 != report_cnt))
{
send_badtrans = FALSE;
curr_conn_state.send_badtrans = FALSE;
bad_trans_detected = FALSE;
}
if (send_xoff && !xoff_sent)
if (curr_conn_state.send_xoff && !curr_conn_state.xoff_sent)
{ /* Send XOFF_ACK_ME if the receiver has a connection to the source. Do not attempt to send it if we dont even
* know the endianness of the remote side. In that case, we are guaranteed no initial handshake occurred and
* so no point sending the XOFF too. This saves us lots of trouble in case of cross-endian replication connections.
*/
assert((FD_INVALID != gtmrecv_sock_fd) || repl_connection_reset);
if ((FD_INVALID != gtmrecv_sock_fd) && remote_side->endianness_known)
{
send_seqno = upd_proc_local->read_jnl_seqno;
curr_conn_state.send_seqno = upd_proc_local->read_jnl_seqno;
if (!remote_side->cross_endian)
{
xoff_msg.type = REPL_XOFF_ACK_ME;
xoff_msg.len = MIN_REPL_MSGLEN;
memcpy((uchar_ptr_t)&xoff_msg.msg[0], (uchar_ptr_t)&send_seqno, SIZEOF(seq_num));
memcpy((uchar_ptr_t)&xoff_msg.msg[0], (uchar_ptr_t)&curr_conn_state.send_seqno, SIZEOF(seq_num));
} else
{
xoff_msg.type = GTM_BYTESWAP_32(REPL_XOFF_ACK_ME);
xoff_msg.len = GTM_BYTESWAP_32(MIN_REPL_MSGLEN);
temp_send_seqno = GTM_BYTESWAP_64(send_seqno);
temp_send_seqno = GTM_BYTESWAP_64(curr_conn_state.send_seqno);
memcpy((uchar_ptr_t)&xoff_msg.msg[0], (uchar_ptr_t)&temp_send_seqno, SIZEOF(seq_num));
}
REPL_SEND_LOOP(gtmrecv_sock_fd, &xoff_msg, MIN_REPL_MSGLEN, REPL_POLL_NOWAIT)
Expand All @@ -248,8 +260,8 @@ int gtmrecv_poll_actions1(int *pending_data_len, int *buff_unprocessed, unsigned
"Status = %d ; %s\n", status, STRERROR(status));
repl_close(&gtmrecv_sock_fd);
repl_connection_reset = TRUE;
xoff_sent = FALSE;
send_badtrans = FALSE;
/* Now that current connection is lost, clear all statics maintained for this connection */
memset(&curr_conn_state, 0, SIZEOF(curr_conn_state));

} else if (EREPL_SEND == repl_errno)
rts_error_csa(CSA_ARG(NULL) VARLSTCNT(7) ERR_REPLCOMM, 0, ERR_TEXT, 2,
Expand All @@ -264,27 +276,27 @@ int gtmrecv_poll_actions1(int *pending_data_len, int *buff_unprocessed, unsigned
}
} else
{
xoff_sent = TRUE;
log_draining_msg = TRUE;
curr_conn_state.xoff_sent = TRUE;
curr_conn_state.log_draining_msg = TRUE;
}
repl_log(gtmrecv_log_fp, TRUE, TRUE, "REPL_XOFF_ACK_ME sent due to upd shutdown/crash or bad trans "
"or ONLINE_ROLLBACK\n");
send_xoff = FALSE;
curr_conn_state.send_xoff = FALSE;
} else
{ /* Connection has been lost OR initial handshake needs to happen again, so no point sending XOFF/BADTRANS */
send_xoff = FALSE;
send_badtrans = FALSE;
curr_conn_state.send_xoff = FALSE;
curr_conn_state.send_badtrans = FALSE;
}
}
/* Drain pipe */
if (xoff_sent)
if (curr_conn_state.xoff_sent)
{
if (log_draining_msg)
if (curr_conn_state.log_draining_msg)
{ /* avoid multiple logs per instance */
repl_log(gtmrecv_log_fp, TRUE, TRUE, "REPL INFO - Draining replication pipe due to %s\n",
send_cmp2uncmp ? "CMP2UNCMP" : (send_badtrans ? "BAD_TRANS" :
curr_conn_state.send_cmp2uncmp ? "CMP2UNCMP" : (curr_conn_state.send_badtrans ? "BAD_TRANS" :
(onln_rlbk_flg_set ? "ONLINE_ROLLBACK" : "UPD shutdown/crash")));
log_draining_msg = FALSE;
curr_conn_state.log_draining_msg = FALSE;
}
if (0 != *buff_unprocessed)
{ /* Throw away the current contents of the buffer */
Expand Down Expand Up @@ -357,8 +369,8 @@ int gtmrecv_poll_actions1(int *pending_data_len, int *buff_unprocessed, unsigned
{
repl_log(gtmrecv_log_fp, TRUE, TRUE,
"REPL INFO - XOFF_ACK received. Drained replication pipe completely\n");
upd_shut_too_early_logged = FALSE;
xoff_sent = FALSE;
curr_conn_state.upd_shut_too_early_logged = FALSE;
curr_conn_state.xoff_sent = FALSE;
return_status = STOP_POLL;
}
} else if (SS_NORMAL == status)
Expand All @@ -380,11 +392,11 @@ int gtmrecv_poll_actions1(int *pending_data_len, int *buff_unprocessed, unsigned
; /* Empty Body */
}
*buff_unprocessed = 0; *pending_data_len = 0;
if (SS_NORMAL == status && info && !upd_shut_too_early_logged)
if (SS_NORMAL == status && info && !curr_conn_state.upd_shut_too_early_logged)
{
repl_log(gtmrecv_log_fp, TRUE, TRUE, "ALERT : User initiated shutdown of Update Process done "
"when there was data in the replication pipe\n");
upd_shut_too_early_logged = TRUE;
curr_conn_state.upd_shut_too_early_logged = TRUE;
}
return_status = CONTINUE_POLL;
}
Expand All @@ -398,8 +410,8 @@ int gtmrecv_poll_actions1(int *pending_data_len, int *buff_unprocessed, unsigned
"Status = %d ; %s\n", status, STRERROR(status));
repl_close(&gtmrecv_sock_fd);
repl_connection_reset = TRUE;
xoff_sent = FALSE;
send_badtrans = FALSE;
/* Now that current connection is lost, clear all statics maintained for this connection */
memset(&curr_conn_state, 0, SIZEOF(curr_conn_state));
return_status = STOP_POLL;
} else
rts_error_csa(CSA_ARG(NULL) VARLSTCNT(7) ERR_REPLCOMM, 0, ERR_TEXT, 2,
Expand All @@ -417,33 +429,35 @@ int gtmrecv_poll_actions1(int *pending_data_len, int *buff_unprocessed, unsigned
* the endianness of the other side. If not, no point in sending one anyways and saves us trouble in
* case of cross-endian replication connections.
*/
if ((STOP_POLL == return_status) && (send_badtrans || send_cmp2uncmp)
if ((STOP_POLL == return_status) && (curr_conn_state.send_badtrans || curr_conn_state.send_cmp2uncmp)
&& (FD_INVALID != gtmrecv_sock_fd) && remote_side->endianness_known)
{ /* Send REPL_BADTRANS or REPL_CMP2UNCMP message */
if (!remote_side->cross_endian)
{
bad_trans_msg.type = send_cmp2uncmp ? REPL_CMP2UNCMP : REPL_BADTRANS;
bad_trans_msg.type = curr_conn_state.send_cmp2uncmp ? REPL_CMP2UNCMP : REPL_BADTRANS;
bad_trans_msg.len = MIN_REPL_MSGLEN;
bad_trans_msg.start_seqno = send_seqno;
bad_trans_msg.start_seqno = curr_conn_state.send_seqno;
} else
{
bad_trans_msg.type = send_cmp2uncmp ? GTM_BYTESWAP_32(REPL_CMP2UNCMP) : GTM_BYTESWAP_32(REPL_BADTRANS);
bad_trans_msg.type = curr_conn_state.send_cmp2uncmp ? GTM_BYTESWAP_32(REPL_CMP2UNCMP) : GTM_BYTESWAP_32(REPL_BADTRANS);
bad_trans_msg.len = GTM_BYTESWAP_32(MIN_REPL_MSGLEN);
bad_trans_msg.start_seqno = GTM_BYTESWAP_64(send_seqno);
bad_trans_msg.start_seqno = GTM_BYTESWAP_64(curr_conn_state.send_seqno);
}
REPL_SEND_LOOP(gtmrecv_sock_fd, &bad_trans_msg, bad_trans_msg.len, REPL_POLL_NOWAIT)
; /* Empty Body */
if (SS_NORMAL == status)
{
if (send_cmp2uncmp)
repl_log(gtmrecv_log_fp, TRUE, TRUE, "REPL_CMP2UNCMP message sent with seqno %llu\n", send_seqno);
if (curr_conn_state.send_cmp2uncmp)
repl_log(gtmrecv_log_fp, TRUE, TRUE, "REPL_CMP2UNCMP message sent with seqno %llu\n",
curr_conn_state.send_seqno);
else
repl_log(gtmrecv_log_fp, TRUE, TRUE, "REPL_BADTRANS message sent with seqno %llu\n", send_seqno);
repl_log(gtmrecv_log_fp, TRUE, TRUE, "REPL_BADTRANS message sent with seqno %llu\n",
curr_conn_state.send_seqno);
} else
{
if (REPL_CONN_RESET(status) && EREPL_SEND == repl_errno)
{
if (send_cmp2uncmp)
if (curr_conn_state.send_cmp2uncmp)
{
repl_log(gtmrecv_log_fp, TRUE, TRUE, "Connection reset while sending REPL_CMP2UNCMP. "
"Status = %d ; %s\n", status, STRERROR(status));
Expand All @@ -454,6 +468,8 @@ int gtmrecv_poll_actions1(int *pending_data_len, int *buff_unprocessed, unsigned
}
repl_close(&gtmrecv_sock_fd);
repl_connection_reset = TRUE;
/* Now that current connection is lost, clear all statics maintained for this connection */
memset(&curr_conn_state, 0, SIZEOF(curr_conn_state));
return_status = STOP_POLL;
} else if (EREPL_SEND == repl_errno)
rts_error_csa(CSA_ARG(NULL) VARLSTCNT(7) ERR_REPLCOMM, 0, ERR_TEXT, 2,
Expand All @@ -465,15 +481,15 @@ int gtmrecv_poll_actions1(int *pending_data_len, int *buff_unprocessed, unsigned
LEN_AND_LIT("Error sending REPL_BADTRANS/REPL_CMP2UNCMP. Error in select"), status);
}
}
send_badtrans = FALSE;
if (send_cmp2uncmp)
curr_conn_state.send_badtrans = FALSE;
if (curr_conn_state.send_cmp2uncmp)
{
REPL_DPRINT1("gtmrecv_poll_actions : Setting gtmrecv_wait_for_jnl_seqno to TRUE because this receiver"
"server requested a fall-back from compressed to uncompressed operation\n");
gtmrecv_wait_for_jnl_seqno = TRUE;/* set this to TRUE to break out and go back to a fresh "do_main_loop" */
gtmrecv_bad_trans_sent = TRUE;
gtmrecv_send_cmp2uncmp = FALSE;
send_cmp2uncmp = FALSE;
curr_conn_state.send_cmp2uncmp = FALSE;
}
}
if ((upd_proc_local->bad_trans && bad_trans_detected) || onln_rlbk_flg_set
Expand Down Expand Up @@ -519,11 +535,11 @@ int gtmrecv_poll_actions1(int *pending_data_len, int *buff_unprocessed, unsigned
"upd restart\n");
gtmrecv_wait_for_jnl_seqno = TRUE;
report_cnt = next_report_at = 1;
if (send_xoff && (FD_INVALID == gtmrecv_sock_fd))
if (curr_conn_state.send_xoff && (FD_INVALID == gtmrecv_sock_fd))
{
/* Update start command was issued before connection was established,
* no point in sending XOFF. */
send_xoff = FALSE;
curr_conn_state.send_xoff = FALSE;
}
} else
{
Expand All @@ -543,8 +559,8 @@ int gtmrecv_poll_actions1(int *pending_data_len, int *buff_unprocessed, unsigned
recvpool_ctl->jnl_seqno);
repl_close(&gtmrecv_sock_fd);
repl_connection_reset = TRUE;
xoff_sent = FALSE;
send_badtrans = FALSE;
/* Now that current connection is lost, clear all statics maintained for this connection */
memset(&curr_conn_state, 0, SIZEOF(curr_conn_state));
upd_proc_local->onln_rlbk_flg = FALSE;
/* Before restarting afresh, sync the online rollback cycles. This way any future grab_lock that
* we do after restarting should not realize an unhandled online rollback. For receiver, it is
Expand Down

0 comments on commit 1a17969

Please sign in to comment.