Skip to content

Commit

Permalink
Discard deferred and unsent messages on unplanned disconnect.
Browse files Browse the repository at this point in the history
The change causes discard_input_queue() to be called in Accelio's
on_disconnect_event() handler, as well as on mark_down().

Signed-off-by: Matt Benjamin <[email protected]>
  • Loading branch information
Matt Benjamin committed Dec 23, 2014
1 parent 9524391 commit 6a5515a
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 2 deletions.
30 changes: 28 additions & 2 deletions src/msg/xio/XioConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -503,20 +503,46 @@ int XioConnection::flush_input_queue(uint32_t flags) {
int XioConnection::discard_input_queue(uint32_t flags)
{
Message::Queue disc_q;
XioSubmit::Queue deferred_q;

if (! (flags & CState::OP_FLAG_LOCKED))
pthread_spin_lock(&sp);

/* the two send queues contain different objects:
* - anything on the mqueue is a Message
* - anything on the requeue is an XioMsg
*/
Message::Queue::const_iterator i1 = disc_q.end();
disc_q.splice(i1, outgoing.mqueue);

XioSubmit::Queue::const_iterator i2 = deferred_q.end();
deferred_q.splice(i2, outgoing.requeue);

if (! (flags & CState::OP_FLAG_LOCKED))
pthread_spin_unlock(&sp);

// mqueue
int ix, q_size = disc_q.size();
for (ix = 0; ix < q_size; ++ix) {
Message::Queue::iterator q_iter = disc_q.begin();
Message* m = &(*q_iter);
disc_q.erase(q_iter);
m->put();
}

// requeue
q_size = deferred_q.size();
for (ix = 0; ix < q_size; ++ix) {
XioSubmit::Queue::iterator q_iter = deferred_q.begin();
XioSubmit* xs = &(*q_iter);
assert(xs->type == XioSubmit::OUTGOING_MSG);
XioMsg* xmsg = static_cast<XioMsg*>(xs);
deferred_q.erase(q_iter);
// release once for each chained xio_msg
for (ix = 0; ix < int(xmsg->hdr.msg_cnt); ++ix)
xmsg->put();
}

return 0;
}

Expand Down Expand Up @@ -575,8 +601,8 @@ int XioConnection::_mark_down(uint32_t flags)
// Accelio disconnect
xio_disconnect(conn);

// XXX always discrd input--but are we in startup? ie, should mark_down
// be forcing a disconnect?
/* XXX this will almost certainly be called again from
* on_disconnect_event() */
discard_input_queue(flags|CState::OP_FLAG_LOCKED);

if (! (flags & CState::OP_FLAG_LOCKED))
Expand Down
1 change: 1 addition & 0 deletions src/msg/xio/XioConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ class XioConnection : public Connection
int on_disconnect_event() {
connected.set(false);
pthread_spin_lock(&sp);
discard_input_queue(CState::OP_FLAG_LOCKED);
if (!conn)
this->put();
pthread_spin_unlock(&sp);
Expand Down

0 comments on commit 6a5515a

Please sign in to comment.