Skip to content

Commit

Permalink
FairRootGroupGH-442: Add the flags support for Submit requests
Browse files Browse the repository at this point in the history
dds-submit: Added: The command learned a new argument --enable-overbooking. The flag instructs DDS RMS plug-ing to not specify any CPU requirement for RMS jobs. (FairRootGroupGH-442)
dds-tools-api: Added: SSubmitRequestData supports flags. See SSubmitRequestData::setFlag and SSubmitRequestData::ESubmitRequestFlags. (FairRootGroupGH-442)
dds-slurm-plugin: Modified: The #SBATCH --cpus-per-task=%DDS_NSLOTS% requirment is now can be disiabled by providing the "enable-overbooking" flag (ToolsAPI or dds-submit). (FairRootGroupGH-442)
  • Loading branch information
AnarManafov committed Jun 2, 2022
1 parent 49fd88c commit 6d30600
Show file tree
Hide file tree
Showing 12 changed files with 113 additions and 21 deletions.
4 changes: 3 additions & 1 deletion ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Added: Users can specify a GroupName tag for each submission. This tag will be a
Added: Users can provide a Submission Tag (--submission-tag). DDS RMS plug-ins will use this tag to name RMS jobs and directories. (GH-426)
Added: The command learned a new argument --env-config/-e. It can be used to define a custom environment script for each agent. (GH-430)
Added: The command learned a new argument --min-instances. It can be used to provide the minimum number of agents to spawn. (GH-434)
Added: The command learned a new argument --enable-overbooking. The flag instructs DDS RMS plug-ing to not specify any CPU requirement for RMS jobs. (GH-442)

### dds-topology
Fixed: Stability improvements.
Expand All @@ -46,7 +47,7 @@ Added: Support for SubmissionID (GH-411)
Added: Support of minimum number of agents to spawn. (GH-434)
Modified: Replace array job submission with nodes requirement. (GH-430)
Modified: Remove #SBATCH --ntasks-per-node=1. (GH-444)
Modified: Remove #SBATCH --cpus-per-task=%DDS_NSLOTS%. (GH-442)
Modified: The #SBATCH --cpus-per-task=%DDS_NSLOTS% requirment is now can be disiabled by providing the "enable-overbooking" flag (ToolsAPI or dds-submit). (GH-442)

### dds-localhost-plugin
Added: Support for SubmissionID (GH-411)
Expand All @@ -56,6 +57,7 @@ Modified: Logs of user processes which use Tools API are moved now to the DDS ro
Modified: CSession::waitForNumAgents is renamed to CSession::waitForNumSlots. (GH-439)
Added: An ability to unsubscribe from either individual events or all events of requests. (GH-382)
Added: SAgentInfoResponseData provides the agent group name. (GH-415)
Added: SSubmitRequestData supports flags. See SSubmitRequestData::setFlag and SSubmitRequestData::ESubmitRequestFlags. (GH-442)

### dds-user-defaults
Modified: Bumo the version to 0.5
Expand Down
1 change: 1 addition & 0 deletions dds-commander/src/ConnectionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1102,6 +1102,7 @@ void CConnectionManager::submitAgents(const dds::tools_api::SSubmitRequestData&
submitRequest.m_nInstances = _submitInfo.m_instances;
submitRequest.m_nMinInstances = _submitInfo.m_minInstances;
submitRequest.m_slots = _submitInfo.m_slots;
submitRequest.m_flags = _submitInfo.m_flags;
submitRequest.m_wrkPackagePath = CUserDefaults::instance().getWrkScriptPath(sSubmissionID);
submitRequest.m_groupName = _submitInfo.m_groupName;
submitRequest.m_submissionTag = _submitInfo.m_submissionTag;
Expand Down
1 change: 1 addition & 0 deletions dds-intercom-lib/src/Intercom.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ namespace dds
uint32_t m_nInstances; ///< A number of instances.
uint32_t m_nMinInstances; ///< A minimum number of instances.
uint32_t m_slots; ///< A number of task slots.
uint32_t m_flags; ///< Additional flags, see SSubmitRequestData::ESubmitRequestFlags
std::string m_cfgFilePath; ///< A path to the configuration file.
std::string m_id; ///< ID for communication with DDS commander.
std::string m_wrkPackagePath; ///< A full path of the agent worker package, which needs to be deployed.
Expand Down
20 changes: 11 additions & 9 deletions dds-intercom-lib/src/dds_rms_plugin_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ std::string SSubmit::toJSON()
ptree pt;

pt.put<string>("dds.plug-in.id", m_id);
pt.put<int>("dds.plug-in.submit.nInstances", m_nInstances);
pt.put<int>("dds.plug-in.submit.nMinInstances", m_nMinInstances);
pt.put<int>("dds.plug-in.submit.slots", m_slots);
pt.put<uint32_t>("dds.plug-in.submit.nInstances", m_nInstances);
pt.put<uint32_t>("dds.plug-in.submit.nMinInstances", m_nMinInstances);
pt.put<uint32_t>("dds.plug-in.submit.slots", m_slots);
pt.put<uint32_t>("dds.plug-in.submit.flags", m_flags);
pt.put<string>("dds.plug-in.submit.cfgFilePath", m_cfgFilePath);
pt.put<string>("dds.plug-in.submit.wrkPackagePath", m_wrkPackagePath);
pt.put<string>("dds.plug-in.submit.groupName", m_groupName);
Expand All @@ -79,9 +80,10 @@ void SSubmit::fromJSON(const std::string& _json)
void SSubmit::fromPT(const boost::property_tree::ptree& _pt)
{
const ptree& pt = _pt.get_child("dds.plug-in");
m_nInstances = pt.get<int>("submit.nInstances", 0);
m_nMinInstances = pt.get<int>("submit.nMinInstances", 0);
m_slots = pt.get<int>("submit.slots", 0);
m_nInstances = pt.get<uint32_t>("submit.nInstances", 0);
m_nMinInstances = pt.get<uint32_t>("submit.nMinInstances", 0);
m_slots = pt.get<uint32_t>("submit.slots", 0);
m_flags = pt.get<uint32_t>("submit.flags", 0);
m_cfgFilePath = pt.get<string>("submit.cfgFilePath", "");
m_wrkPackagePath = pt.get<string>("submit.wrkPackagePath", "");
m_id = pt.get<string>("id");
Expand All @@ -92,9 +94,9 @@ void SSubmit::fromPT(const boost::property_tree::ptree& _pt)
bool SSubmit::operator==(const SSubmit& _val) const
{
return (m_id == _val.m_id) && (m_nInstances == _val.m_nInstances) && (m_slots == _val.m_slots) &&
(m_cfgFilePath == _val.m_cfgFilePath) && (m_wrkPackagePath == _val.m_wrkPackagePath) &&
(m_groupName == _val.m_groupName) && (m_submissionTag == _val.m_submissionTag) &&
(m_nMinInstances == _val.m_nMinInstances);
(m_flags == _val.m_flags) && (m_cfgFilePath == _val.m_cfgFilePath) &&
(m_wrkPackagePath == _val.m_wrkPackagePath) && (m_groupName == _val.m_groupName) &&
(m_submissionTag == _val.m_submissionTag) && (m_nMinInstances == _val.m_nMinInstances);
}

///////////////////////////////////
Expand Down
17 changes: 17 additions & 0 deletions dds-submit/src/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "ProtocolCommands.h"
#include "SubmitCmd.h"
#include "SysHelper.h"
#include "ToolsProtocol.h"
#include "Version.h"

namespace bpo = boost::program_options;
Expand All @@ -35,6 +36,7 @@ namespace dds
size_t m_number{ 0 };
size_t m_minInstances{ 0 };
size_t m_slots{ 0 };
uint32_t m_flags{ 0 };
bool m_bListPlugins{ false };
boost::uuids::uuid m_sid = boost::uuids::nil_uuid();
std::string m_groupName;
Expand Down Expand Up @@ -97,6 +99,13 @@ namespace dds
"It can be used to define a submission tag. DDS RMS plug-ins will use this tag to name DDS RMS jobs "
"and directories they create on the worker nodes.");

options.add_options()(
"enable-overbooking",
bpo::bool_switch()->default_value(false),
"The flag instructs DDS RMS plug-in to not specify any CPU requirement for RMS jobs. For example, the "
"SLURM plug-in will not add the \"#SBATCH --cpus-per-task\" option to the job script. Otherwise "
"DDS will try to require as many CPU per agent as tasks slots.");

// Parsing command-line
bpo::variables_map vm;
bpo::store(bpo::command_line_parser(_argc, _argv).options(options).run(), vm);
Expand Down Expand Up @@ -185,6 +194,14 @@ namespace dds
return false;
}

// Flags
dds::tools_api::SSubmitRequestData::flagContainer_t flags;
dds::tools_api::SSubmitRequestData::setFlag(
&flags,
dds::tools_api::SSubmitRequestData::ESubmitRequestFlags::enable_overbooking,
vm["enable-overbooking"].as<bool>());
_options->m_flags = flags.to_ulong();

// RMS plug-ins are always lower cased
boost::to_lower(_options->m_sRMS);

Expand Down
1 change: 1 addition & 0 deletions dds-submit/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ int main(int argc, char* argv[])
requestInfo.m_instances = options.m_number;
requestInfo.m_minInstances = options.m_minInstances;
requestInfo.m_slots = options.m_slots;
requestInfo.m_flags = options.m_flags;
requestInfo.m_pluginPath = options.m_sPath;
requestInfo.m_groupName = options.m_groupName;
requestInfo.m_submissionTag = options.m_submissionTag;
Expand Down
50 changes: 40 additions & 10 deletions dds-tools-lib/src/ToolsProtocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,36 @@ SSubmitRequestData::SSubmitRequestData(const boost::property_tree::ptree& _pt)
fromPT(_pt);
}

void SSubmitRequestData::setFlag(const ESubmitRequestFlags& _flag, bool _value)
{
m_flagContainer.set((uint32_t)_flag, _value);
m_flags = m_flagContainer.to_ulong();
}

void SSubmitRequestData::setFlag(flagContainer_t* _flagContainer, const ESubmitRequestFlags& _flag, bool _value)
{
if (_flagContainer == nullptr)
return;

_flagContainer->set((uint32_t)_flag, _value);
}
bool SSubmitRequestData::isFlagEnabled(const ESubmitRequestFlags& _flag) const
{
return (m_flagContainer.test((uint32_t)_flag));
}

bool SSubmitRequestData::isFlagEnabled(const uint32_t& _flagContainer, const ESubmitRequestFlags& _flag)
{
flagContainer_t container{ _flagContainer };
return (container.test(((uint32_t)_flag)));
}

void SSubmitRequestData::_toPT(boost::property_tree::ptree& _pt) const
{
_pt.put<int>("instances", m_instances);
_pt.put<int>("minInstances", m_minInstances);
_pt.put<int>("slots", m_slots);
_pt.put<uint32_t>("instances", m_instances);
_pt.put<uint32_t>("minInstances", m_minInstances);
_pt.put<uint32_t>("slots", m_slots);
_pt.put<uint32_t>("flags", m_flags);
_pt.put<string>("config", m_config);
_pt.put<string>("rms", m_rms);
_pt.put<string>("pluginPath", m_pluginPath);
Expand All @@ -186,9 +211,13 @@ void SSubmitRequestData::_toPT(boost::property_tree::ptree& _pt) const

void SSubmitRequestData::_fromPT(const boost::property_tree::ptree& _pt)
{
m_instances = _pt.get<int>("instances", 0);
m_minInstances = _pt.get<int>("minInstances", 0);
m_slots = _pt.get<int>("slots", 0);
m_instances = _pt.get<uint32_t>("instances", 0);
m_minInstances = _pt.get<uint32_t>("minInstances", 0);
m_slots = _pt.get<uint32_t>("slots", 0);

m_flags = _pt.get<uint32_t>("flags", 0);
m_flagContainer = { m_flags };

m_config = _pt.get<string>("config", "");
m_rms = _pt.get<string>("rms", "");
m_pluginPath = _pt.get<string>("pluginPath", "");
Expand All @@ -200,9 +229,10 @@ void SSubmitRequestData::_fromPT(const boost::property_tree::ptree& _pt)
bool SSubmitRequestData::operator==(const SSubmitRequestData& _val) const
{
return (SBaseData::operator==(_val) && m_rms == _val.m_rms && m_instances == _val.m_instances &&
m_slots == _val.m_slots && m_config == _val.m_config && m_pluginPath == _val.m_pluginPath &&
m_groupName == _val.m_groupName && m_submissionTag == _val.m_submissionTag &&
m_envCfgFilePath == _val.m_envCfgFilePath && m_minInstances == _val.m_minInstances);
m_slots == _val.m_slots && m_flags == _val.m_flags && m_config == _val.m_config &&
m_pluginPath == _val.m_pluginPath && m_groupName == _val.m_groupName &&
m_submissionTag == _val.m_submissionTag && m_envCfgFilePath == _val.m_envCfgFilePath &&
m_minInstances == _val.m_minInstances);
}

// We need to put function implementation in the same "dds::tools_api" namespace as a friend function declaration.
Expand All @@ -216,7 +246,7 @@ namespace dds
{
return _os << _data.defaultToString() << "; instances: " << _data.m_instances
<< "; minInstances: " << _data.m_minInstances << "; slots: " << _data.m_slots
<< "; config: " << _data.m_config << "; rms: " << _data.m_rms
<< "; falgs: " << _data.m_flags << "; config: " << _data.m_config << "; rms: " << _data.m_rms
<< "; pluginPath: " << _data.m_pluginPath << "; groupName: " << _data.m_groupName
<< "; submissionTag: " << _data.m_submissionTag
<< "; envCfgFilePath: " << _data.m_envCfgFilePath;
Expand Down
22 changes: 21 additions & 1 deletion dds-tools-lib/src/ToolsProtocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,30 @@ namespace dds
/// \brief Structure holds information of a submit request.
struct SSubmitRequestData : SBaseRequestData<SSubmitRequestData>
{
/// \brief Additional flags of the SSubmitRequestData
/// Uee SSubmitRequestData::setFlag to set flags.
enum class ESubmitRequestFlags : uint32_t
{
enable_overbooking,
//----------
size_value
};

using flagContainer_t = std::bitset<(uint32_t)ESubmitRequestFlags::size_value>;

SSubmitRequestData();
SSubmitRequestData(const boost::property_tree::ptree& _pt);

void setFlag(const ESubmitRequestFlags& _flag, bool _value);
static void setFlag(flagContainer_t* _flagContainer, const ESubmitRequestFlags& _flag, bool _value);
bool isFlagEnabled(const ESubmitRequestFlags& _flag) const;
static bool isFlagEnabled(const uint32_t& _flagContainer, const ESubmitRequestFlags& _flag);

std::string m_rms; ///< RMS.
uint32_t m_instances = 0; ///< A number of instances.
uint32_t m_minInstances = 0; ///< A minimum number of instances.
uint32_t m_slots = 0; /// < Number of task slots.
uint32_t m_slots = 0; ///< Number of task slots.
uint32_t m_flags = 0; ///< Additional flags, see SSubmitRequestData::ESubmitRequestFlags
std::string m_config; ///< A path to the configuration file.
std::string m_pluginPath; ///< Optional. A plug-in's directory search path
std::string m_groupName; ///< A group name of agents.
Expand All @@ -102,6 +119,9 @@ namespace dds
bool operator==(const SSubmitRequestData& _val) const;
/// \brief Ostream operator.
friend std::ostream& operator<<(std::ostream& _os, const SSubmitRequestData& _data);

private:
flagContainer_t m_flagContainer;
};

/// \brief Request class of submit.
Expand Down
2 changes: 2 additions & 0 deletions dds-tools-lib/tests/TestProtocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ BOOST_AUTO_TEST_CASE(test_dds_tools_protocol)
testData.m_config = "string";
testData.m_pluginPath = "string";
testData.m_requestID = 123;
testData.setFlag(SSubmitRequestData::ESubmitRequestFlags::enable_overbooking, true);

SSubmitRequestData data(child.second);

Expand All @@ -104,6 +105,7 @@ BOOST_AUTO_TEST_CASE(test_dds_tools_protocol)
ss << data;

BOOST_CHECK(data == testData);
BOOST_CHECK(data.isFlagEnabled(SSubmitRequestData::ESubmitRequestFlags::enable_overbooking));
}
else if (tag == "topology")
{
Expand Down
1 change: 1 addition & 0 deletions plugins/dds-submit-slurm/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ target_link_libraries(${PROJECT_NAME}
PUBLIC
dds_misc_lib
dds_user_defaults_lib
dds_tools_lib
dds_intercom_lib
dds_pipe_log_engine_lib
Boost::boost
Expand Down
2 changes: 2 additions & 0 deletions plugins/dds-submit-slurm/src/job.slurm.in
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#SBATCH --job-name=%DDS_SUBMISSION_TAG%
#SBATCH --chdir=%DDS_JOB_ROOT_WRK_DIR%

#DDS_AGENT_CPU_REQUIREMENT

#DDS_USER_OPTIONS

# execute DDS Scout
Expand Down
13 changes: 13 additions & 0 deletions plugins/dds-submit-slurm/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "PipeLogEngine.h"
#include "Process.h"
#include "SysHelper.h"
#include "ToolsProtocol.h"
#include "UserDefaults.h"

using namespace std;
Expand Down Expand Up @@ -160,6 +161,18 @@ int main(int argc, char* argv[])
// Replace %DDS_NSLOTS%
boost::replace_all(sSrcScript, "%DDS_NSLOTS%", to_string(_submit.m_slots));

// #DDS_AGENT_CPU_REQUIREMENT
if (!dds::tools_api::SSubmitRequestData::isFlagEnabled(
_submit.m_flags,
dds::tools_api::SSubmitRequestData::ESubmitRequestFlags::enable_overbooking))
{
stringstream ss;
ss << "#SBATCH --cpus-per-task=" << _submit.m_slots;
boost::replace_all(sSrcScript, "#DDS_AGENT_CPU_REQUIREMENT", ss.str());
}
else
boost::replace_all(sSrcScript, "#DDS_AGENT_CPU_REQUIREMENT", "");

// Replace %DDS_SUBMISSION_TAG%
boost::replace_all(sSrcScript, "%DDS_SUBMISSION_TAG%", _submit.m_submissionTag);

Expand Down

0 comments on commit 6d30600

Please sign in to comment.