Skip to content

Commit

Permalink
fix:MoveConnOut(it->second[i]) will cause CloseFd(it->second[i]) to g…
Browse files Browse the repository at this point in the history
…o out of bounds (#2539)

* fix:moveconn

Co-authored-by: chejinge <[email protected]>
  • Loading branch information
chejinge and brother-jin committed Apr 7, 2024
1 parent 578c356 commit 6218b5e
Showing 1 changed file with 15 additions and 12 deletions.
27 changes: 15 additions & 12 deletions src/net/src/net_pubsub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -455,21 +455,22 @@ void* PubSubThread::ThreadMain() {
auto it = pubsub_channel_.find(channel);
if (it != pubsub_channel_.end()) {
for (size_t i = 0; i < it->second.size(); i++) {
if (!IsReady(it->second[i]->fd())) {
auto& conn = it->second[i];
if (!IsReady(conn->fd())) {
continue;
}
std::string resp = ConstructPublishResp(it->first, channel, msg, false);
it->second[i]->WriteResp(resp);
WriteStatus write_status = it->second[i]->SendReply();
conn->WriteResp(resp);
WriteStatus write_status = conn->SendReply();
if (write_status == kWriteHalf) {
net_multiplexer_->NetModEvent(it->second[i]->fd(), kReadable, kWritable);
net_multiplexer_->NetModEvent(conn->fd(), kReadable, kWritable);
} else if (write_status == kWriteError) {
channel_mutex_.unlock();

MoveConnOut(it->second[i]);
MoveConnOut(conn);

channel_mutex_.lock();
CloseFd(it->second[i]);
CloseFd(conn);
} else if (write_status == kWriteAll) {
receivers++;
}
Expand All @@ -483,21 +484,22 @@ void* PubSubThread::ThreadMain() {
if (pstd::stringmatchlen(it.first.c_str(), static_cast<int32_t>(it.first.size()), channel.c_str(),
static_cast<int32_t>(channel.size()), 0)) {
for (size_t i = 0; i < it.second.size(); i++) {
if (!IsReady(it.second[i]->fd())) {
auto& conn = it.second[i];
if (!IsReady(conn->fd())) {
continue;
}
std::string resp = ConstructPublishResp(it.first, channel, msg, true);
it.second[i]->WriteResp(resp);
WriteStatus write_status = it.second[i]->SendReply();
conn->WriteResp(resp);
WriteStatus write_status = conn->SendReply();
if (write_status == kWriteHalf) {
net_multiplexer_->NetModEvent(it.second[i]->fd(), kReadable, kWritable);
net_multiplexer_->NetModEvent(conn->fd(), kReadable, kWritable);
} else if (write_status == kWriteError) {
pattern_mutex_.unlock();

MoveConnOut(it.second[i]);
MoveConnOut(conn);

pattern_mutex_.lock();
CloseFd(it.second[i]);
CloseFd(conn);
} else if (write_status == kWriteAll) {
receivers++;
}
Expand All @@ -523,6 +525,7 @@ void* PubSubThread::ThreadMain() {
net_multiplexer_->NetDelEvent(pfe->fd, 0);
continue;
} else {

in_conn = iter->second->conn;
}
}
Expand Down

0 comments on commit 6218b5e

Please sign in to comment.