Skip to content

Commit

Permalink
Fix crash on corrupt MQTT packets
Browse files Browse the repository at this point in the history
When clients send certain corrupt MQTT packets, this was detected very
quickly and caused the thread to close and abandon the client while the
main loop was still busy registering it. This race condition could cause
the client destructor to be called in the main thread, instead of the
worker thread. That in turn, dereferenced nullptrs, trying to obtain
thread global objects.

It's fixed by MOVING the client to the thread, instead of copying.

Detected by Benedikt Brandmaier using rusty-FUME.
  • Loading branch information
halfgaar committed Aug 10, 2023
1 parent dbf02b9 commit eb3acf8
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 7 deletions.
7 changes: 6 additions & 1 deletion flashmqtestclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,12 @@ void FlashMQTestClient::connectClient(ProtocolVersion protocolVersion, bool clea
this->client = std::make_shared<Client>(sockfd, testServerWorkerThreadData, nullptr, false, false, reinterpret_cast<struct sockaddr*>(&servaddr), settings);
this->client->setClientProperties(protocolVersion, clientid, "user", false, 60);

testServerWorkerThreadData->giveClient(this->client);
{
// Hack to make it work with the rvalue argument. Because our test client retains ownership of 'client', we can get away
// with this. See git what caused this change.
std::shared_ptr<Client> dummyToMoveFrom = this->client;
testServerWorkerThreadData->giveClient(std::move(dummyToMoveFrom));
}

// This gets called in the test client's worker thread, but the STL container's minimal thread safety should be enough: only list manipulation is
// mutexed, elements within are not.
Expand Down
2 changes: 1 addition & 1 deletion mainapp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ void MainApp::start()

std::shared_ptr<Client> client = std::make_shared<Client>(fd, thread_data, clientSSL, listener->websocket, listener->isHaProxy(), addr, settings);

thread_data->giveClient(client);
thread_data->giveClient(std::move(client));

globalStats->socketConnects.inc();
}
Expand Down
8 changes: 4 additions & 4 deletions threaddata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -425,17 +425,17 @@ void ThreadData::removeQueuedClients()
}
}

void ThreadData::giveClient(std::shared_ptr<Client> client)
void ThreadData::giveClient(std::shared_ptr<Client> &&client)
{
const int fd = client->getFd();

queueClientNextKeepAliveCheckLocked(client, false);

{
std::lock_guard<std::mutex> locker(clients_by_fd_mutex);
clients_by_fd[fd] = client;
clients_by_fd[fd] = std::move(client); // We must give up ownership here, to avoid calling the client destructor in the main thread.
}

queueClientNextKeepAliveCheckLocked(client, false);

struct epoll_event ev;
memset(&ev, 0, sizeof (struct epoll_event));
ev.data.fd = fd;
Expand Down
2 changes: 1 addition & 1 deletion threaddata.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class ThreadData

void start(thread_f f);

void giveClient(std::shared_ptr<Client> client);
void giveClient(std::shared_ptr<Client> &&client);
std::shared_ptr<Client> getClient(int fd);
void removeClientQueued(const std::shared_ptr<Client> &client);
void removeClientQueued(int fd);
Expand Down

0 comments on commit eb3acf8

Please sign in to comment.