From 6d30600b3d7bf32025228c8a33d8af044ef30e2a Mon Sep 17 00:00:00 2001 From: Anar Manafov Date: Thu, 2 Jun 2022 15:01:53 +0200 Subject: [PATCH] GH-442: Add the flags support for Submit requests dds-submit: Added: The command learned a new argument --enable-overbooking. The flag instructs DDS RMS plug-ing to not specify any CPU requirement for RMS jobs. (GH-442) dds-tools-api: Added: SSubmitRequestData supports flags. See SSubmitRequestData::setFlag and SSubmitRequestData::ESubmitRequestFlags. (GH-442) dds-slurm-plugin: Modified: The #SBATCH --cpus-per-task=%DDS_NSLOTS% requirment is now can be disiabled by providing the "enable-overbooking" flag (ToolsAPI or dds-submit). (GH-442) --- ReleaseNotes.md | 4 +- dds-commander/src/ConnectionManager.cpp | 1 + dds-intercom-lib/src/Intercom.h | 1 + .../src/dds_rms_plugin_protocol.cpp | 20 ++++---- dds-submit/src/Options.h | 17 +++++++ dds-submit/src/main.cpp | 1 + dds-tools-lib/src/ToolsProtocol.cpp | 50 +++++++++++++++---- dds-tools-lib/src/ToolsProtocol.h | 22 +++++++- dds-tools-lib/tests/TestProtocol.cpp | 2 + plugins/dds-submit-slurm/CMakeLists.txt | 1 + plugins/dds-submit-slurm/src/job.slurm.in | 2 + plugins/dds-submit-slurm/src/main.cpp | 13 +++++ 12 files changed, 113 insertions(+), 21 deletions(-) diff --git a/ReleaseNotes.md b/ReleaseNotes.md index d05da85b..758112c3 100644 --- a/ReleaseNotes.md +++ b/ReleaseNotes.md @@ -29,6 +29,7 @@ Added: Users can specify a GroupName tag for each submission. This tag will be a Added: Users can provide a Submission Tag (--submission-tag). DDS RMS plug-ins will use this tag to name RMS jobs and directories. (GH-426) Added: The command learned a new argument --env-config/-e. It can be used to define a custom environment script for each agent. (GH-430) Added: The command learned a new argument --min-instances. It can be used to provide the minimum number of agents to spawn. (GH-434) +Added: The command learned a new argument --enable-overbooking. The flag instructs DDS RMS plug-ing to not specify any CPU requirement for RMS jobs. (GH-442) ### dds-topology Fixed: Stability improvements. @@ -46,7 +47,7 @@ Added: Support for SubmissionID (GH-411) Added: Support of minimum number of agents to spawn. (GH-434) Modified: Replace array job submission with nodes requirement. (GH-430) Modified: Remove #SBATCH --ntasks-per-node=1. (GH-444) -Modified: Remove #SBATCH --cpus-per-task=%DDS_NSLOTS%. (GH-442) +Modified: The #SBATCH --cpus-per-task=%DDS_NSLOTS% requirment is now can be disiabled by providing the "enable-overbooking" flag (ToolsAPI or dds-submit). (GH-442) ### dds-localhost-plugin Added: Support for SubmissionID (GH-411) @@ -56,6 +57,7 @@ Modified: Logs of user processes which use Tools API are moved now to the DDS ro Modified: CSession::waitForNumAgents is renamed to CSession::waitForNumSlots. (GH-439) Added: An ability to unsubscribe from either individual events or all events of requests. (GH-382) Added: SAgentInfoResponseData provides the agent group name. (GH-415) +Added: SSubmitRequestData supports flags. See SSubmitRequestData::setFlag and SSubmitRequestData::ESubmitRequestFlags. (GH-442) ### dds-user-defaults Modified: Bumo the version to 0.5 diff --git a/dds-commander/src/ConnectionManager.cpp b/dds-commander/src/ConnectionManager.cpp index f7d589c0..9bd2aa22 100644 --- a/dds-commander/src/ConnectionManager.cpp +++ b/dds-commander/src/ConnectionManager.cpp @@ -1102,6 +1102,7 @@ void CConnectionManager::submitAgents(const dds::tools_api::SSubmitRequestData& submitRequest.m_nInstances = _submitInfo.m_instances; submitRequest.m_nMinInstances = _submitInfo.m_minInstances; submitRequest.m_slots = _submitInfo.m_slots; + submitRequest.m_flags = _submitInfo.m_flags; submitRequest.m_wrkPackagePath = CUserDefaults::instance().getWrkScriptPath(sSubmissionID); submitRequest.m_groupName = _submitInfo.m_groupName; submitRequest.m_submissionTag = _submitInfo.m_submissionTag; diff --git a/dds-intercom-lib/src/Intercom.h b/dds-intercom-lib/src/Intercom.h index 8ec3a6df..597030d6 100644 --- a/dds-intercom-lib/src/Intercom.h +++ b/dds-intercom-lib/src/Intercom.h @@ -166,6 +166,7 @@ namespace dds uint32_t m_nInstances; ///< A number of instances. uint32_t m_nMinInstances; ///< A minimum number of instances. uint32_t m_slots; ///< A number of task slots. + uint32_t m_flags; ///< Additional flags, see SSubmitRequestData::ESubmitRequestFlags std::string m_cfgFilePath; ///< A path to the configuration file. std::string m_id; ///< ID for communication with DDS commander. std::string m_wrkPackagePath; ///< A full path of the agent worker package, which needs to be deployed. diff --git a/dds-intercom-lib/src/dds_rms_plugin_protocol.cpp b/dds-intercom-lib/src/dds_rms_plugin_protocol.cpp index b0b4847d..d585d96a 100644 --- a/dds-intercom-lib/src/dds_rms_plugin_protocol.cpp +++ b/dds-intercom-lib/src/dds_rms_plugin_protocol.cpp @@ -54,9 +54,10 @@ std::string SSubmit::toJSON() ptree pt; pt.put("dds.plug-in.id", m_id); - pt.put("dds.plug-in.submit.nInstances", m_nInstances); - pt.put("dds.plug-in.submit.nMinInstances", m_nMinInstances); - pt.put("dds.plug-in.submit.slots", m_slots); + pt.put("dds.plug-in.submit.nInstances", m_nInstances); + pt.put("dds.plug-in.submit.nMinInstances", m_nMinInstances); + pt.put("dds.plug-in.submit.slots", m_slots); + pt.put("dds.plug-in.submit.flags", m_flags); pt.put("dds.plug-in.submit.cfgFilePath", m_cfgFilePath); pt.put("dds.plug-in.submit.wrkPackagePath", m_wrkPackagePath); pt.put("dds.plug-in.submit.groupName", m_groupName); @@ -79,9 +80,10 @@ void SSubmit::fromJSON(const std::string& _json) void SSubmit::fromPT(const boost::property_tree::ptree& _pt) { const ptree& pt = _pt.get_child("dds.plug-in"); - m_nInstances = pt.get("submit.nInstances", 0); - m_nMinInstances = pt.get("submit.nMinInstances", 0); - m_slots = pt.get("submit.slots", 0); + m_nInstances = pt.get("submit.nInstances", 0); + m_nMinInstances = pt.get("submit.nMinInstances", 0); + m_slots = pt.get("submit.slots", 0); + m_flags = pt.get("submit.flags", 0); m_cfgFilePath = pt.get("submit.cfgFilePath", ""); m_wrkPackagePath = pt.get("submit.wrkPackagePath", ""); m_id = pt.get("id"); @@ -92,9 +94,9 @@ void SSubmit::fromPT(const boost::property_tree::ptree& _pt) bool SSubmit::operator==(const SSubmit& _val) const { return (m_id == _val.m_id) && (m_nInstances == _val.m_nInstances) && (m_slots == _val.m_slots) && - (m_cfgFilePath == _val.m_cfgFilePath) && (m_wrkPackagePath == _val.m_wrkPackagePath) && - (m_groupName == _val.m_groupName) && (m_submissionTag == _val.m_submissionTag) && - (m_nMinInstances == _val.m_nMinInstances); + (m_flags == _val.m_flags) && (m_cfgFilePath == _val.m_cfgFilePath) && + (m_wrkPackagePath == _val.m_wrkPackagePath) && (m_groupName == _val.m_groupName) && + (m_submissionTag == _val.m_submissionTag) && (m_nMinInstances == _val.m_nMinInstances); } /////////////////////////////////// diff --git a/dds-submit/src/Options.h b/dds-submit/src/Options.h index 7ecb0ad3..751712c0 100644 --- a/dds-submit/src/Options.h +++ b/dds-submit/src/Options.h @@ -17,6 +17,7 @@ #include "ProtocolCommands.h" #include "SubmitCmd.h" #include "SysHelper.h" +#include "ToolsProtocol.h" #include "Version.h" namespace bpo = boost::program_options; @@ -35,6 +36,7 @@ namespace dds size_t m_number{ 0 }; size_t m_minInstances{ 0 }; size_t m_slots{ 0 }; + uint32_t m_flags{ 0 }; bool m_bListPlugins{ false }; boost::uuids::uuid m_sid = boost::uuids::nil_uuid(); std::string m_groupName; @@ -97,6 +99,13 @@ namespace dds "It can be used to define a submission tag. DDS RMS plug-ins will use this tag to name DDS RMS jobs " "and directories they create on the worker nodes."); + options.add_options()( + "enable-overbooking", + bpo::bool_switch()->default_value(false), + "The flag instructs DDS RMS plug-in to not specify any CPU requirement for RMS jobs. For example, the " + "SLURM plug-in will not add the \"#SBATCH --cpus-per-task\" option to the job script. Otherwise " + "DDS will try to require as many CPU per agent as tasks slots."); + // Parsing command-line bpo::variables_map vm; bpo::store(bpo::command_line_parser(_argc, _argv).options(options).run(), vm); @@ -185,6 +194,14 @@ namespace dds return false; } + // Flags + dds::tools_api::SSubmitRequestData::flagContainer_t flags; + dds::tools_api::SSubmitRequestData::setFlag( + &flags, + dds::tools_api::SSubmitRequestData::ESubmitRequestFlags::enable_overbooking, + vm["enable-overbooking"].as()); + _options->m_flags = flags.to_ulong(); + // RMS plug-ins are always lower cased boost::to_lower(_options->m_sRMS); diff --git a/dds-submit/src/main.cpp b/dds-submit/src/main.cpp index 2ae50913..09347516 100644 --- a/dds-submit/src/main.cpp +++ b/dds-submit/src/main.cpp @@ -92,6 +92,7 @@ int main(int argc, char* argv[]) requestInfo.m_instances = options.m_number; requestInfo.m_minInstances = options.m_minInstances; requestInfo.m_slots = options.m_slots; + requestInfo.m_flags = options.m_flags; requestInfo.m_pluginPath = options.m_sPath; requestInfo.m_groupName = options.m_groupName; requestInfo.m_submissionTag = options.m_submissionTag; diff --git a/dds-tools-lib/src/ToolsProtocol.cpp b/dds-tools-lib/src/ToolsProtocol.cpp index b7c9dc18..3871a64a 100644 --- a/dds-tools-lib/src/ToolsProtocol.cpp +++ b/dds-tools-lib/src/ToolsProtocol.cpp @@ -171,11 +171,36 @@ SSubmitRequestData::SSubmitRequestData(const boost::property_tree::ptree& _pt) fromPT(_pt); } +void SSubmitRequestData::setFlag(const ESubmitRequestFlags& _flag, bool _value) +{ + m_flagContainer.set((uint32_t)_flag, _value); + m_flags = m_flagContainer.to_ulong(); +} + +void SSubmitRequestData::setFlag(flagContainer_t* _flagContainer, const ESubmitRequestFlags& _flag, bool _value) +{ + if (_flagContainer == nullptr) + return; + + _flagContainer->set((uint32_t)_flag, _value); +} +bool SSubmitRequestData::isFlagEnabled(const ESubmitRequestFlags& _flag) const +{ + return (m_flagContainer.test((uint32_t)_flag)); +} + +bool SSubmitRequestData::isFlagEnabled(const uint32_t& _flagContainer, const ESubmitRequestFlags& _flag) +{ + flagContainer_t container{ _flagContainer }; + return (container.test(((uint32_t)_flag))); +} + void SSubmitRequestData::_toPT(boost::property_tree::ptree& _pt) const { - _pt.put("instances", m_instances); - _pt.put("minInstances", m_minInstances); - _pt.put("slots", m_slots); + _pt.put("instances", m_instances); + _pt.put("minInstances", m_minInstances); + _pt.put("slots", m_slots); + _pt.put("flags", m_flags); _pt.put("config", m_config); _pt.put("rms", m_rms); _pt.put("pluginPath", m_pluginPath); @@ -186,9 +211,13 @@ void SSubmitRequestData::_toPT(boost::property_tree::ptree& _pt) const void SSubmitRequestData::_fromPT(const boost::property_tree::ptree& _pt) { - m_instances = _pt.get("instances", 0); - m_minInstances = _pt.get("minInstances", 0); - m_slots = _pt.get("slots", 0); + m_instances = _pt.get("instances", 0); + m_minInstances = _pt.get("minInstances", 0); + m_slots = _pt.get("slots", 0); + + m_flags = _pt.get("flags", 0); + m_flagContainer = { m_flags }; + m_config = _pt.get("config", ""); m_rms = _pt.get("rms", ""); m_pluginPath = _pt.get("pluginPath", ""); @@ -200,9 +229,10 @@ void SSubmitRequestData::_fromPT(const boost::property_tree::ptree& _pt) bool SSubmitRequestData::operator==(const SSubmitRequestData& _val) const { return (SBaseData::operator==(_val) && m_rms == _val.m_rms && m_instances == _val.m_instances && - m_slots == _val.m_slots && m_config == _val.m_config && m_pluginPath == _val.m_pluginPath && - m_groupName == _val.m_groupName && m_submissionTag == _val.m_submissionTag && - m_envCfgFilePath == _val.m_envCfgFilePath && m_minInstances == _val.m_minInstances); + m_slots == _val.m_slots && m_flags == _val.m_flags && m_config == _val.m_config && + m_pluginPath == _val.m_pluginPath && m_groupName == _val.m_groupName && + m_submissionTag == _val.m_submissionTag && m_envCfgFilePath == _val.m_envCfgFilePath && + m_minInstances == _val.m_minInstances); } // We need to put function implementation in the same "dds::tools_api" namespace as a friend function declaration. @@ -216,7 +246,7 @@ namespace dds { return _os << _data.defaultToString() << "; instances: " << _data.m_instances << "; minInstances: " << _data.m_minInstances << "; slots: " << _data.m_slots - << "; config: " << _data.m_config << "; rms: " << _data.m_rms + << "; falgs: " << _data.m_flags << "; config: " << _data.m_config << "; rms: " << _data.m_rms << "; pluginPath: " << _data.m_pluginPath << "; groupName: " << _data.m_groupName << "; submissionTag: " << _data.m_submissionTag << "; envCfgFilePath: " << _data.m_envCfgFilePath; diff --git a/dds-tools-lib/src/ToolsProtocol.h b/dds-tools-lib/src/ToolsProtocol.h index 5f75143c..4b21ea66 100644 --- a/dds-tools-lib/src/ToolsProtocol.h +++ b/dds-tools-lib/src/ToolsProtocol.h @@ -77,13 +77,30 @@ namespace dds /// \brief Structure holds information of a submit request. struct SSubmitRequestData : SBaseRequestData { + /// \brief Additional flags of the SSubmitRequestData + /// Uee SSubmitRequestData::setFlag to set flags. + enum class ESubmitRequestFlags : uint32_t + { + enable_overbooking, + //---------- + size_value + }; + + using flagContainer_t = std::bitset<(uint32_t)ESubmitRequestFlags::size_value>; + SSubmitRequestData(); SSubmitRequestData(const boost::property_tree::ptree& _pt); + void setFlag(const ESubmitRequestFlags& _flag, bool _value); + static void setFlag(flagContainer_t* _flagContainer, const ESubmitRequestFlags& _flag, bool _value); + bool isFlagEnabled(const ESubmitRequestFlags& _flag) const; + static bool isFlagEnabled(const uint32_t& _flagContainer, const ESubmitRequestFlags& _flag); + std::string m_rms; ///< RMS. uint32_t m_instances = 0; ///< A number of instances. uint32_t m_minInstances = 0; ///< A minimum number of instances. - uint32_t m_slots = 0; /// < Number of task slots. + uint32_t m_slots = 0; ///< Number of task slots. + uint32_t m_flags = 0; ///< Additional flags, see SSubmitRequestData::ESubmitRequestFlags std::string m_config; ///< A path to the configuration file. std::string m_pluginPath; ///< Optional. A plug-in's directory search path std::string m_groupName; ///< A group name of agents. @@ -102,6 +119,9 @@ namespace dds bool operator==(const SSubmitRequestData& _val) const; /// \brief Ostream operator. friend std::ostream& operator<<(std::ostream& _os, const SSubmitRequestData& _data); + + private: + flagContainer_t m_flagContainer; }; /// \brief Request class of submit. diff --git a/dds-tools-lib/tests/TestProtocol.cpp b/dds-tools-lib/tests/TestProtocol.cpp index b82f527f..83fe95ae 100644 --- a/dds-tools-lib/tests/TestProtocol.cpp +++ b/dds-tools-lib/tests/TestProtocol.cpp @@ -96,6 +96,7 @@ BOOST_AUTO_TEST_CASE(test_dds_tools_protocol) testData.m_config = "string"; testData.m_pluginPath = "string"; testData.m_requestID = 123; + testData.setFlag(SSubmitRequestData::ESubmitRequestFlags::enable_overbooking, true); SSubmitRequestData data(child.second); @@ -104,6 +105,7 @@ BOOST_AUTO_TEST_CASE(test_dds_tools_protocol) ss << data; BOOST_CHECK(data == testData); + BOOST_CHECK(data.isFlagEnabled(SSubmitRequestData::ESubmitRequestFlags::enable_overbooking)); } else if (tag == "topology") { diff --git a/plugins/dds-submit-slurm/CMakeLists.txt b/plugins/dds-submit-slurm/CMakeLists.txt index dc6aef5a..a104f66e 100644 --- a/plugins/dds-submit-slurm/CMakeLists.txt +++ b/plugins/dds-submit-slurm/CMakeLists.txt @@ -9,6 +9,7 @@ target_link_libraries(${PROJECT_NAME} PUBLIC dds_misc_lib dds_user_defaults_lib + dds_tools_lib dds_intercom_lib dds_pipe_log_engine_lib Boost::boost diff --git a/plugins/dds-submit-slurm/src/job.slurm.in b/plugins/dds-submit-slurm/src/job.slurm.in index d10ac580..e119cfb9 100755 --- a/plugins/dds-submit-slurm/src/job.slurm.in +++ b/plugins/dds-submit-slurm/src/job.slurm.in @@ -6,6 +6,8 @@ #SBATCH --job-name=%DDS_SUBMISSION_TAG% #SBATCH --chdir=%DDS_JOB_ROOT_WRK_DIR% +#DDS_AGENT_CPU_REQUIREMENT + #DDS_USER_OPTIONS # execute DDS Scout diff --git a/plugins/dds-submit-slurm/src/main.cpp b/plugins/dds-submit-slurm/src/main.cpp index ce128af1..9a938f81 100644 --- a/plugins/dds-submit-slurm/src/main.cpp +++ b/plugins/dds-submit-slurm/src/main.cpp @@ -21,6 +21,7 @@ #include "PipeLogEngine.h" #include "Process.h" #include "SysHelper.h" +#include "ToolsProtocol.h" #include "UserDefaults.h" using namespace std; @@ -160,6 +161,18 @@ int main(int argc, char* argv[]) // Replace %DDS_NSLOTS% boost::replace_all(sSrcScript, "%DDS_NSLOTS%", to_string(_submit.m_slots)); + // #DDS_AGENT_CPU_REQUIREMENT + if (!dds::tools_api::SSubmitRequestData::isFlagEnabled( + _submit.m_flags, + dds::tools_api::SSubmitRequestData::ESubmitRequestFlags::enable_overbooking)) + { + stringstream ss; + ss << "#SBATCH --cpus-per-task=" << _submit.m_slots; + boost::replace_all(sSrcScript, "#DDS_AGENT_CPU_REQUIREMENT", ss.str()); + } + else + boost::replace_all(sSrcScript, "#DDS_AGENT_CPU_REQUIREMENT", ""); + // Replace %DDS_SUBMISSION_TAG% boost::replace_all(sSrcScript, "%DDS_SUBMISSION_TAG%", _submit.m_submissionTag);