Skip to content

Commit

Permalink
FairRootGroupGH-478: Optimize topology activation
Browse files Browse the repository at this point in the history
Modified: compress topology files before broadcasting to agents. Significantly improves performance for big topology activations. (FairRootGroupGH-478)
Modified: Improved performance of the Core transport when transferring binary attachments. (FairRootGroupGH-478)
Fixed: Multiple typos
  • Loading branch information
AnarManafov committed Apr 23, 2024
1 parent a27d9a6 commit f37bd9f
Show file tree
Hide file tree
Showing 42 changed files with 158 additions and 95 deletions.
6 changes: 6 additions & 0 deletions ReleaseNotes.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# DDS Release Notes

## v3.9 (2024-04-23)

- DDS general
- Modified: compress topology files before broadcasting to agents. Significantly improves performance for big topology activations. (GH-478)
- Modified: Improved performance of the Core transport when transferring binary attachments. (GH-478)

## v3.8 (2024-01-19)

- DDS general
Expand Down
2 changes: 1 addition & 1 deletion dds-agent-cmd/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ int main(int argc, char* argv[])
LOG(log_stdout) << "Files will be downloaded to \"~/.DDS/sessions/" << sid << "/log/agents\"";
break;
default:
LOG(log_stderr) << "Uknown command.";
LOG(log_stderr) << "Unknown command.";
return EXIT_FAILURE;
}

Expand Down
42 changes: 27 additions & 15 deletions dds-agent/src/CommanderChannel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ CCommanderChannel::CCommanderChannel(boost::asio::io_context& _service,
});

// Check free disk space (GH-392)
// Report avaliable disk space at start
// Report available disk space at start
uintmax_t nAvailable{ 0 };
isLowDiskSpace(&nAvailable);
LOG(info) << "Avaliable disk space: " << dds::misc::HumanReadable{ nAvailable };
LOG(info) << "Available disk space: " << dds::misc::HumanReadable{ nAvailable };
// create async timer
m_resourceMonitorTimer = make_unique<timer_t>(_service);
startResourceMonitor(_service, chrono::seconds(30));
Expand Down Expand Up @@ -166,7 +166,7 @@ bool CCommanderChannel::on_cmdREPLY(SCommandAttachmentImpl<cmdREPLY>::ptr_t _att
{
if (_attachment->m_statusCode == (uint16_t)SReplyCmd::EStatusCode::OK)
{
LOG(info) << "SM: Handshake successfull. PHID: " << this->m_protocolHeaderID;
LOG(info) << "SM: Handshake successful. PHID: " << this->m_protocolHeaderID;
return true;
}
else if (_attachment->m_statusCode == (uint16_t)SReplyCmd::EStatusCode::ERROR)
Expand Down Expand Up @@ -322,6 +322,18 @@ bool CCommanderChannel::on_cmdBINARY_ATTACHMENT_RECEIVED(
fs::rename(_attachment->m_receivedFilePath, destFilePath);
LOG(info) << "Received new topology file: " << destFilePath.generic_string();

// Decompressing the topology file
if (destFilePath.extension() == ".gz")
{
const fs::path gzipPath{ bp::search_path("gzip") };
stringstream ssCmd;
ssCmd << gzipPath.string() << " -d " << destFilePath;
string output;
execute(ssCmd.str(), chrono::seconds(60), &output);
// remove ".gz" extension
destFilePath = destFilePath.stem();
}

// Activating new topology
CTopoCore::Ptr_t topo{ make_shared<CTopoCore>() };
// Topology already validated on the commander, no need to validate it again
Expand Down Expand Up @@ -518,12 +530,12 @@ bool CCommanderChannel::on_cmdASSIGN_USER_TASK(SCommandAttachmentImpl<cmdASSIGN_
if (slot->m_collectionIndex != numeric_limits<uint32_t>::max())
ba::replace_all(slot->m_sUsrExe, "%collectionIndex%", to_string(slot->m_collectionIndex));

// If the user task was transfered, than replace "%DDS_DEFAULT_TASK_PATH%" with the real path
// If the user task was transferred, than replace "%DDS_DEFAULT_TASK_PATH%" with the real path
fs::path dir(CUserDefaults::instance().getSlotsRootDir());
dir /= to_string(_sender.m_ID);
dir += fs::path::preferred_separator;
ba::replace_all(slot->m_sUsrExe, "%DDS_DEFAULT_TASK_PATH%", dir.generic_string());
// If the user custom environment was transfered, than replace "%DDS_DEFAULT_TASK_PATH%" with the real path
// If the user custom environment was transferred, than replace "%DDS_DEFAULT_TASK_PATH%" with the real path
ba::replace_all(slot->m_sUsrEnv, "%DDS_DEFAULT_TASK_PATH%", dir.generic_string());

// Revoke drain of the write queue to start accept messages
Expand All @@ -550,7 +562,7 @@ bool CCommanderChannel::on_cmdASSIGN_USER_TASK(SCommandAttachmentImpl<cmdASSIGN_
pathAsset = dir;
pathAsset /= assetFileName.str();

// If local asset exists, we will overwrite it. No skiping.
// If local asset exists, we will overwrite it. No skipping.

slot->m_taskAssets.push_back(pathAsset);
break;
Expand Down Expand Up @@ -753,7 +765,7 @@ bool CCommanderChannel::on_cmdSTOP_USER_TASK(SCommandAttachmentImpl<cmdSTOP_USER
slot->m_pid,
[this, id]()
{
// Once child termionation is finished, send User task "Done" to the commander
// Once child termination is finished, send User task "Done" to the commander
pushMsg<cmdREPLY>(
SReplyCmd("Done", (uint16_t)SReplyCmd::EStatusCode::OK, 0, cmdSTOP_USER_TASK), id);
});
Expand Down Expand Up @@ -895,7 +907,7 @@ void CCommanderChannel::onNewUserTask(uint64_t _slotID, pid_t _pid)
if (WIFEXITED(status))
{
// NOTE: We are using a bash wrapper script for user tasks.
// According to bash, the exist status of child processes can be interpreted in the folloiwing
// According to bash, the exist status of child processes can be interpreted in the following
// way:
// - For the shell’s purposes, a command which exits with a zero exit status has succeeded.
// - A non-zero exit status indicates failure.
Expand Down Expand Up @@ -1001,8 +1013,8 @@ void CCommanderChannel::terminateChildrenProcesses(
enumChildProcesses(mainPid, vecChildren);

// the mainPid is never included to the list
// In case of the agent the reseaon is obviouse.
// In case of a task, since it is running via the DDS task wrapper it will exit autoamticlaly once children are out
// In case of the agent the reason is obvious.
// In case of a task, since it is running via the DDS task wrapper it will exit automatically once children are out
string sChildren;
pidContainer_t pidChildren;
for (const auto& i : vecChildren)
Expand Down Expand Up @@ -1088,7 +1100,7 @@ void CCommanderChannel::terminateChildrenProcesses(
else
{
// kill all child process of tasks if there are any
// We do it before terminating tasks to give parenrt task processes a change to read state of children -
// We do it before terminating tasks to give the parent task processes a chance to read state of children -
// otherwise we will get zombies if user tasks don't manage their children properly
LOG(info) << "Timeout is reached. Sending unconditional kill signal to all existing child processes...";
for (auto const& pid : _children)
Expand Down Expand Up @@ -1142,7 +1154,7 @@ void CCommanderChannel::taskExited(uint64_t _slotID, int _exitCode)
slot->m_pid = 0;
slot->m_taskID = 0;

// Drainning the Intercom write queue
// Draining the Intercom write queue
m_intercomChannel->drainWriteQueue(true, _slotID);

// Notify DDS commander
Expand All @@ -1151,7 +1163,7 @@ void CCommanderChannel::taskExited(uint64_t _slotID, int _exitCode)
catch (exception& _e)
{
LOG(fatal) << "Failed to remove user task on slot " << _slotID << " from the list of children: " << _e.what();
LOG(error) << "Can't send TASK_DONE. The coresponding slot is missing";
LOG(error) << "Can't send TASK_DONE. The corrsponding slot is missing";
}
}

Expand All @@ -1176,7 +1188,7 @@ void CCommanderChannel::stopChannel()
terminateChildrenProcesses(0, [&waitCondition]() { waitCondition.notifyAll(); });

// wait for termination to finish
// either it is finsihed or we procceed in 30 sec in anyway
// either it is finished or we proceed in 30 sec in anyway
waitCondition.waitUntil(std::chrono::system_clock::now() + std::chrono::seconds(30));

if (m_intercomChannel)
Expand Down Expand Up @@ -1350,7 +1362,7 @@ bool CCommanderChannel::isLowDiskSpace(uintmax_t* _available)
}
catch (exception& _e)
{
LOG(error) << "Failed getting avaliable disk space: " << _e.what();
LOG(error) << "Failed to get available disk space: " << _e.what();
}
return false;
}
2 changes: 1 addition & 1 deletion dds-agent/src/CommanderChannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ namespace dds
/// The function first sends a graceful SIGTERM to all children. After a defined timeout (5 sec) an
/// unconditional SIGKILL is sent.
/// \param[in] _parentPid The pid of the parent process, which children needs to
/// \param[in] _onCompleteSlot is a callback fucntion. It's called when termination of all child process is
/// \param[in] _onCompleteSlot is a callback function. It's called when termination of all child process is
/// finished.
void terminateChildrenProcesses(pid_t _parentPid, const terminateChildrenOnComplete_t& _onCompleteSlot);
void terminateChildrenProcesses(timerPtr_t& _timer,
Expand Down
4 changes: 2 additions & 2 deletions dds-commander/src/AgentChannel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ bool CAgentChannel::on_cmdREPLY_HOST_INFO(SCommandAttachmentImpl<cmdREPLY_HOST_I
<< "] has successfully connected. Startup time: " << m_info.m_startUpTime.count() << " ms.";

// Request agent to add Task Slots
// We get the number of slots from the agent. On submit each agent is assiugned to a fixed number of slots. Then
// when agent is up, we requast tyhe agent to actully active each slot.
// We get the number of slots from the agent. On submit each agent is assigned to a fixed number of slots. Then
// when agent is up, we request the agent to actually active each slot.
LOG(info) << "Requesting " << _attachment->m_slots << " task slots from " << m_info.m_id;
for (size_t i = 0; i < _attachment->m_slots; ++i)
{
Expand Down
68 changes: 53 additions & 15 deletions dds-commander/src/ConnectionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ void CConnectionManager::broadcastUpdateTopologyAndWait(weakChannelInfo_t::conta
dds::tools_api::SProgressResponseData progress(_cmd, 0, m_updateTopology.m_nofRequests, 0);
sendCustomCommandResponse(_channel, progress.toJSON());

// Broadcast message or binary to agents
// Broadcast a message or a binary to agents
size_t index = 0;
for (auto& agent : _agents)
{
Expand Down Expand Up @@ -713,10 +713,10 @@ void CConnectionManager::on_cmdUSER_TASK_DONE(const SSenderInfo& _sender,
}

// MARK: ToolsAPI - onTaskDone
// send task done ToolsAPI event to registred channels. A channel, whcih is expired of filed should be removed from
// send task done ToolsAPI event to registered channels. A channel, which is expired of filed should be removed from
// the list.
lock_guard<mutex> lock(m_mtxOnTaskDoneSubscribers);
// The loop always recalclates the end() iterator since we might delete expired elelemts from the list
// The loop always recalculates the end() iterator since we might delete expired elemets from the list
for (auto iter = m_onTaskDoneSubscribers.begin(); iter != m_onTaskDoneSubscribers.end(); ++iter)
{
if (auto ch = iter->first.lock())
Expand All @@ -726,7 +726,7 @@ void CConnectionManager::on_cmdUSER_TASK_DONE(const SSenderInfo& _sender,
response.m_taskID = _attachment->m_taskID;
response.m_exitCode = (WIFEXITED(_attachment->m_exitCode) ? WEXITSTATUS(_attachment->m_exitCode) : 0);
// NOTE: We are using a bash wrapper script for user tasks.
// According to bash, the exist status of child processes can be interpreted in the folloiwing way:
// According to bash, the exist status of child processes can be interpreted in the following way:
// - For the shell’s purposes, a command which exits with a zero exit status has succeeded.
// - A non-zero exit status indicates failure.
// This seemingly counter-intuitive scheme is used so there is one well-defined way to indicate success
Expand All @@ -744,7 +744,7 @@ void CConnectionManager::on_cmdUSER_TASK_DONE(const SSenderInfo& _sender,
}
else
{
// channel is expiored - removing it from the list
// channel is expired - removing it from the list
m_onTaskDoneSubscribers.erase(iter);
}
}
Expand All @@ -754,7 +754,7 @@ void CConnectionManager::on_cmdGET_PROP_LIST(const SSenderInfo& /*_sender*/,
SCommandAttachmentImpl<cmdGET_PROP_LIST>::ptr_t /*_attachment*/,
CAgentChannel::weakConnectionPtr_t /*_channel*/)
{
// FIXME: This command desn't work without CKeyValueManager
// FIXME: This command doesn't work without CKeyValueManager
}

void CConnectionManager::on_cmdGET_PROP_VALUES(const SSenderInfo& /*_sender*/,
Expand Down Expand Up @@ -823,7 +823,7 @@ void CConnectionManager::on_cmdCUSTOM_CMD(const SSenderInfo& _sender,
else
{
LOG(error) << "Failed to deliver. Channel is missing. CUSTOM_CMD senderID: " << _sender.m_ID
<< "; attachemnt: " << *_attachment;
<< "; attachment: " << *_attachment;
}
}
catch (boost::bad_lexical_cast&)
Expand Down Expand Up @@ -923,7 +923,7 @@ void CConnectionManager::on_cmdCUSTOM_CMD(const SSenderInfo& _sender,
ptr->pushMsg<cmdCUSTOM_CMD>(*_attachment, v.m_protocolHeaderID);
}

// Debug msg: there are too many of such messages if tasks intensivly use CC
// Debug msg: there are too many of such messages if tasks intensively use CC
/* stringstream ss;
ss << "Send custom command to " << channels.size() << " channels." << endl;
Expand Down Expand Up @@ -988,7 +988,7 @@ void CConnectionManager::processToolsAPIRequests(const SCustomCmdCmd& _cmd, CAge
else if (tag == "onTaskDone")
{
SOnTaskDoneRequestData info(data);
// add the given channel (_channel) to the list, which will be allerted whenever a task is exited
// add the given channel (_channel) to the list, which will be alerted whenever a task is exited
lock_guard<mutex> lock(m_mtxOnTaskDoneSubscribers);
m_onTaskDoneSubscribers.push_back({ _channel, info });
}
Expand Down Expand Up @@ -1036,7 +1036,7 @@ void CConnectionManager::submitAgents(const dds::tools_api::SSubmitRequestData&
m_SubmitAgents.m_requestID = _submitInfo.m_requestID;
m_SubmitAgents.zeroCounters();

// Generating a submissin ID
// Generating a submission ID
const boost::uuids::uuid submissionID{ boost::uuids::random_generator()() };
const string sSubmissionID{ boost::lexical_cast<std::string>(submissionID) };
LOG(info) << "Initializing an agent submit request with submissionID = " << sSubmissionID;
Expand All @@ -1051,7 +1051,7 @@ void CConnectionManager::submitAgents(const dds::tools_api::SSubmitRequestData&

// Create / re-pack WN package
// Include inline script, if present.
// For ssh plug-in inline script has a higher priorety, than the sxcript provided via the submit command
// For ssh plug-in inline script has a higher priority, than the script provided via the submit command
// (--env-config). Only the ssh plug-in supports it.
bool bNeedCustomEnv{ false };
string scriptFileName(pathWrkPackageDir.string());
Expand Down Expand Up @@ -1089,7 +1089,7 @@ void CConnectionManager::submitAgents(const dds::tools_api::SSubmitRequestData&
// pack worker package
sendToolsAPIMsg(_channel, _submitInfo.m_requestID, "Creating new worker package...", EMsgSeverity::info);

// Use a lightweightpackage when possible
// Use a lightweight package when possible
_createWnPkg(bNeedCustomEnv,
(_submitInfo.m_rms == "localhost"),
_submitInfo.m_slots,
Expand Down Expand Up @@ -1130,7 +1130,7 @@ void CConnectionManager::submitAgents(const dds::tools_api::SSubmitRequestData&
try
{
// Execute RMS plug-in and don't wait for it.
// Omnce plug-in is up it will connect back to the commander.
// Once plug-in is up it will connect back to the commander.
// We will report to user if it won't connect.
execute(ssCmd.str());
}
Expand Down Expand Up @@ -1258,7 +1258,7 @@ void CConnectionManager::updateTopology(const dds::tools_api::STopologyRequestDa
//
if (removedTasks.size() > 0)
{
LOG(info) << "Stoppoing removed tasks";
LOG(info) << "Stopping removed tasks";
weakChannelInfo_t::container_t agents;
// FIXME: Needs to be reviewed for the current architecture
// {
Expand Down Expand Up @@ -1338,8 +1338,46 @@ void CConnectionManager::updateTopology(const dds::tools_api::STopologyRequestDa
if (allAgents.size() == 0)
throw runtime_error("There are no active agents.");

string topFileDestName{ "topology.xml" };
fs::path copyTopoFile;
try
{
// compressing the topology.
// In some production cases the orig. file was more than 20MB. Sending such size to hundreds of agents
// might be resource consuming. In case of ALICE prod it was 300 agents (175K task slots) and 20 MB topo
// file.
LOG(info) << "Topology file uncompressed size: "
<< dds::misc::HumanReadable{ fs::file_size(topologyFile) } << " Compressing topology file...";
// make a copy of the orig. file
// The copy is located in the commander's working directory
const fs::path wrkDir(user_defaults_api::CUserDefaults::instance().getWrkDir());
copyTopoFile = wrkDir;
copyTopoFile /= "topology_agent_copy.xml";
fs::copy_file(topologyFile, copyTopoFile);
// compressing
const fs::path gzipPath{ bp::search_path("gzip") };
stringstream ssCmd;
ssCmd << gzipPath.string() << " -9 " << copyTopoFile;
string output;
execute(ssCmd.str(), chrono::seconds(60), &output);
copyTopoFile += ".gz";
topologyFile = copyTopoFile.string();
LOG(info) << "Topology file compressed size: "
<< dds::misc::HumanReadable{ fs::file_size(topologyFile) };
topFileDestName += ".gz";
}
catch (exception& e)
{
LOG(error) << "Failed to compress topology file. " << e.what();
LOG(info) << "Sending uncompressed topology...";
}

LOG(info) << "Broadcasting topology update with a file: " << topologyFile;
broadcastUpdateTopologyAndWait<cmdUPDATE_TOPOLOGY>(
allAgents, _channel, "Updating topology for agents...", topologyFile, "topology.xml");
allAgents, _channel, "Updating topology for agents...", topologyFile, topFileDestName);

if (!copyTopoFile.empty())
fs::remove(copyTopoFile);
}

//
Expand Down
2 changes: 1 addition & 1 deletion dds-commander/src/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ void CScheduler::makeScheduleImpl(CTopoCore& _topology,
m_schedule.clear();

size_t nofChannels{ _channels.size() };
// Map pair<host name, worker id> to vector of channel indeces.
// Map pair<host name, worker id> to vector of channel indexes.
// This is needed in order to reduce number of regex matches and speed up scheduling.
hostToChannelMap_t hostToChannelMap;
for (size_t iChannel = 0; iChannel < nofChannels; ++iChannel)
Expand Down
Loading

0 comments on commit f37bd9f

Please sign in to comment.