diff --git a/src/net/src/net_pubsub.cc b/src/net/src/net_pubsub.cc index ca9bc2f788..6354d16790 100644 --- a/src/net/src/net_pubsub.cc +++ b/src/net/src/net_pubsub.cc @@ -455,21 +455,22 @@ void* PubSubThread::ThreadMain() { auto it = pubsub_channel_.find(channel); if (it != pubsub_channel_.end()) { for (size_t i = 0; i < it->second.size(); i++) { - if (!IsReady(it->second[i]->fd())) { + auto& conn = it->second[i]; + if (!IsReady(conn->fd())) { continue; } std::string resp = ConstructPublishResp(it->first, channel, msg, false); - it->second[i]->WriteResp(resp); - WriteStatus write_status = it->second[i]->SendReply(); + conn->WriteResp(resp); + WriteStatus write_status = conn->SendReply(); if (write_status == kWriteHalf) { - net_multiplexer_->NetModEvent(it->second[i]->fd(), kReadable, kWritable); + net_multiplexer_->NetModEvent(conn->fd(), kReadable, kWritable); } else if (write_status == kWriteError) { channel_mutex_.unlock(); - MoveConnOut(it->second[i]); + MoveConnOut(conn); channel_mutex_.lock(); - CloseFd(it->second[i]); + CloseFd(conn); } else if (write_status == kWriteAll) { receivers++; } @@ -483,21 +484,22 @@ void* PubSubThread::ThreadMain() { if (pstd::stringmatchlen(it.first.c_str(), static_cast(it.first.size()), channel.c_str(), static_cast(channel.size()), 0)) { for (size_t i = 0; i < it.second.size(); i++) { - if (!IsReady(it.second[i]->fd())) { + auto& conn = it.second[i]; + if (!IsReady(conn->fd())) { continue; } std::string resp = ConstructPublishResp(it.first, channel, msg, true); - it.second[i]->WriteResp(resp); - WriteStatus write_status = it.second[i]->SendReply(); + conn->WriteResp(resp); + WriteStatus write_status = conn->SendReply(); if (write_status == kWriteHalf) { - net_multiplexer_->NetModEvent(it.second[i]->fd(), kReadable, kWritable); + net_multiplexer_->NetModEvent(conn->fd(), kReadable, kWritable); } else if (write_status == kWriteError) { pattern_mutex_.unlock(); - MoveConnOut(it.second[i]); + MoveConnOut(conn); pattern_mutex_.lock(); - CloseFd(it.second[i]); + CloseFd(conn); } else if (write_status == kWriteAll) { receivers++; } @@ -523,6 +525,7 @@ void* PubSubThread::ThreadMain() { net_multiplexer_->NetDelEvent(pfe->fd, 0); continue; } else { + in_conn = iter->second->conn; } }