Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-426 #432

Merged
merged 3 commits into from
Apr 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ Fixed: skip bad or non-session directories/files when performing clean and list

### dds-submit
Added: Users can specify a GroupName tag for each submission. This tag will be assigned to agents and can be used as a requirement in topologies. (GH-407)
Added: Users can provide a Submission Tag (--submission-tag). DDS RMS plug-ins will use this tag to name RMS jobs and directories. (GH-426)
Added: The command learned a new argument --env-config/-e. It can be used to define a custom environment script for each agent. (GH-430)

### dds-topology
Added: A new groupName requirement. It can be used on task and collection. (GH-407)
Expand All @@ -26,6 +28,7 @@ Added: Support for SessionID (GH-411)

### dds-slurm-plugin
Added: Support for SessionID (GH-411)
Modified: Replace array job submission with nodes requirement. (GH-430)

### dds-localhost-plugin
Added: Support for SessionID (GH-411)
Expand Down
31 changes: 24 additions & 7 deletions dds-commander/src/ConnectionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1034,31 +1034,47 @@ void CConnectionManager::submitAgents(const dds::tools_api::SSubmitRequestData&
fs::create_directories(pathWorkDirLocalFiles);

// Create / re-pack WN package
// Include inline script if present
// Only the ssh plug-in supports it.
string inlineShellScripCmds;
// Include inline script, if present.
// For ssh plug-in inline script has a higher priorety, than the sxcript provided via the submit command
// (--env-config). Only the ssh plug-in supports it.
bool bNeedCustomEnv{ false };
string scriptFileName(pathWrkPackageDir.string());
scriptFileName += "user_worker_env.sh";
smart_path(&scriptFileName);
if (_submitInfo.m_rms == "ssh" && !_submitInfo.m_config.empty())
{
string inlineShellScripCmds;
inlineShellScripCmds = CSSHConfigFile(_submitInfo.m_config).getBash();
LOG(info)
<< "Agent submitter config contains an inline shell script. It will be injected it into wrk. package";

string scriptFileName(pathWrkPackageDir.string());
scriptFileName += "user_worker_env.sh";
smart_path(&scriptFileName);
ofstream f_script(scriptFileName.c_str());
if (!f_script.is_open())
throw runtime_error("Can't open for writing: " + scriptFileName);

f_script << inlineShellScripCmds;
f_script.close();
bNeedCustomEnv = !inlineShellScripCmds.empty();
}
else if (!_submitInfo.m_envCfgFilePath.empty()) // Use the environment script provided by the user
{
if (!fs::exists(_submitInfo.m_envCfgFilePath))
{
LOG(error) << "Can't find custom envrionment script: " << _submitInfo.m_envCfgFilePath;
}
else
{
LOG(info) << "Adding using environment script to the WN package: " << _submitInfo.m_envCfgFilePath;
fs::copy(_submitInfo.m_envCfgFilePath, scriptFileName);
bNeedCustomEnv = true;
}
}

// pack worker package
sendToolsAPIMsg(_channel, _submitInfo.m_requestID, "Creating new worker package...", EMsgSeverity::info);

// Use a lightweightpackage when possible
_createWnPkg(!inlineShellScripCmds.empty(),
_createWnPkg(bNeedCustomEnv,
(_submitInfo.m_rms == "localhost"),
_submitInfo.m_slots,
_submitInfo.m_groupName,
Expand All @@ -1072,6 +1088,7 @@ void CConnectionManager::submitAgents(const dds::tools_api::SSubmitRequestData&
submitRequest.m_slots = _submitInfo.m_slots;
submitRequest.m_wrkPackagePath = CUserDefaults::instance().getWrkScriptPath(sSubmissionID);
submitRequest.m_groupName = _submitInfo.m_groupName;
submitRequest.m_submissionTag = _submitInfo.m_submissionTag;
m_SubmitAgents.m_strInitialSubmitRequest = submitRequest.toJSON();

string sPluginInfoMsg("RMS plug-in: ");
Expand Down
5 changes: 3 additions & 2 deletions dds-intercom-lib/src/Intercom.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,11 @@ namespace dds

uint32_t m_nInstances; ///< A number of instances.
uint32_t m_slots; ///< A number of task slots.
std::string m_cfgFilePath; ///< Path to the configuration file.
std::string m_cfgFilePath; ///< A path to the configuration file.
std::string m_id; ///< ID for communication with DDS commander.
std::string m_wrkPackagePath; ///< A full path of the agent worker package, which needs to be deployed.
std::string m_groupName; /// < Agent group name
std::string m_groupName; ///< Agent group name
std::string m_submissionTag; ///< Submission tag. It can be used by RMS to name dds jobs and direcrtories.
};

/// \brief Structure holds information of message notification.
Expand Down
10 changes: 6 additions & 4 deletions dds-intercom-lib/src/dds_rms_plugin_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ std::string SSubmit::toJSON()
pt.put<string>("dds.plug-in.submit.cfgFilePath", m_cfgFilePath);
pt.put<string>("dds.plug-in.submit.wrkPackagePath", m_wrkPackagePath);
pt.put<string>("dds.plug-in.submit.groupName", m_groupName);
pt.put<string>("dds.plug-in.submit.submissionTag", m_submissionTag);

stringstream json;
write_json(json, pt);
Expand All @@ -83,13 +84,14 @@ void SSubmit::fromPT(const boost::property_tree::ptree& _pt)
m_wrkPackagePath = pt.get<string>("submit.wrkPackagePath", "");
m_id = pt.get<string>("id");
m_groupName = pt.get<string>("submit.groupName", "");
m_submissionTag = pt.get<string>("submit.submissionTag", "");
}

bool SSubmit::operator==(const SSubmit& val) const
bool SSubmit::operator==(const SSubmit& _val) const
{
return (m_id == val.m_id) && (m_nInstances == val.m_nInstances) && (m_slots == val.m_slots) &&
(m_cfgFilePath == val.m_cfgFilePath) && (m_wrkPackagePath == val.m_wrkPackagePath) &&
(m_groupName == val.m_groupName);
return (m_id == _val.m_id) && (m_nInstances == _val.m_nInstances) && (m_slots == _val.m_slots) &&
(m_cfgFilePath == _val.m_cfgFilePath) && (m_wrkPackagePath == _val.m_wrkPackagePath) &&
(m_groupName == _val.m_groupName) && (m_submissionTag == _val.m_submissionTag);
}

///////////////////////////////////
Expand Down
8 changes: 6 additions & 2 deletions dds-intercom-lib/tests/Test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ BOOST_AUTO_TEST_CASE(test_protocol_parser_1)
<< "\"submit\":"
<< "{"
<< "\"nInstances\": 11,"
<< "\"cfgFilePath\": \"/path/to/cfg/dds_plugin.cfg\""
<< "\"wrkPackagePath\": \"/path/to/cfg/DDSWorker\""
<< "\"cfgFilePath\": \"/path/to/cfg/dds_plugin.cfg\","
<< "\"wrkPackagePath\": \"/path/to/cfg/DDSWorker\","
<< "\"groupName\": \"TestGroup\","
<< "\"submissionTag\": \"TestSubmissionTag\","
<< "},"
<< "\"message\":"
<< "{"
Expand All @@ -79,6 +81,8 @@ BOOST_AUTO_TEST_CASE(test_protocol_parser_1)
BOOST_CHECK(_submit.m_cfgFilePath == "/path/to/cfg/dds_plugin.cfg");
BOOST_CHECK(_submit.m_wrkPackagePath == "/path/to/cfg/DDSWorker");
BOOST_CHECK(_submit.m_id == "plug-in-id");
BOOST_CHECK(_submit.m_groupName == "TestGroup");
BOOST_CHECK(_submit.m_submissionTag == "TestSubmissionTag");
});

parser.onMessage(
Expand Down
50 changes: 47 additions & 3 deletions dds-submit/src/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "Version.h"

namespace bpo = boost::program_options;
namespace fs = boost::filesystem;

namespace dds
{
Expand All @@ -36,6 +37,8 @@ namespace dds
bool m_bListPlugins{ false };
boost::uuids::uuid m_sid = boost::uuids::nil_uuid();
std::string m_groupName;
std::string m_submissionTag;
std::string m_envCfgFilePath;
} SOptions_t;
//=============================================================================
inline std::ostream& operator<<(std::ostream& _stream, const SOptions& val)
Expand All @@ -62,9 +65,15 @@ namespace dds
"management system plug-in. Use "
"\"--list\" to find out names "
"of available RMS plug-ins.");
options.add_options()("config,c",
bpo::value<std::string>(&_options->m_sCfgFile),
"A plug-in's configuration file. It can be used to provide additional RMS options.");
options.add_options()(
"config,c",
bpo::value<std::string>(&_options->m_sCfgFile),
"A plug-in's configuration file. It can be used to provide additional RMS options. It should contain "
"only RMS options. To define custom environment per agent, use --env-config.");
options.add_options()("env-config,e",
bpo::value<std::string>(&_options->m_envCfgFilePath),
"A path to a user enironment script. Will be execeuted once per agent (valid for all "
"task slots of the agent).");
options.add_options()("path",
bpo::value<std::string>(&_options->m_sPath),
"A plug-in's directory search path. It can be used for external RMS plug-ins.");
Expand All @@ -78,6 +87,11 @@ namespace dds
options.add_options()("group-name,g",
bpo::value<std::string>(&_options->m_groupName)->default_value("common"),
"Defines a group name of agents of this submission. Default: \"common\"");
options.add_options()(
"submission-tag,t",
bpo::value<std::string>(&_options->m_submissionTag)->default_value("dds_agent_job"),
"It can be used to define a submission tag. DDS RMS plug-ins will use this tag to name DDS RMS jobs "
"and directories they create on the worker nodes. Default: \"dds_agent_job\"");

// Parsing command-line
bpo::variables_map vm;
Expand All @@ -86,8 +100,10 @@ namespace dds

dds::misc::conflicting_options(vm, "list", "rms");
dds::misc::conflicting_options(vm, "list", "config");
dds::misc::conflicting_options(vm, "list", "env-config");
dds::misc::conflicting_options(vm, "list", "slots");
dds::misc::conflicting_options(vm, "list", "group-name");
dds::misc::conflicting_options(vm, "list", "submission-tag");

// check for non-defaulted arguments
bpo::variables_map::const_iterator found =
Expand Down Expand Up @@ -131,6 +147,34 @@ namespace dds
}
}

if (vm.count("submission-tag"))
{
const unsigned int submissionTagLimit{ 256 };
const std::string submissionTagNotAllowedSymb{ " `\"@#%^&*()+=[]{};:\\|,.<>/$!?\t\r" };
if (_options->m_submissionTag.empty() ||
_options->m_submissionTag.find_first_of(submissionTagNotAllowedSymb) != std::string::npos ||
_options->m_submissionTag.size() > submissionTagLimit)
{
LOG(dds::misc::log_stderr)
<< "The submission-tag option can't be empty or longer than " << submissionTagLimit
<< " symbols and should not contain whitespaces or any special character such as: "
<< submissionTagNotAllowedSymb;
return false;
}
}

if (vm.count("env-config"))
{
fs::path envCfg{ _options->m_envCfgFilePath };
if (!fs::exists(envCfg))
{
LOG(dds::misc::log_stderr) << "Can't find environment configuration file: " << envCfg.native();
return false;
}
envCfg = fs::canonical(envCfg);
_options->m_envCfgFilePath = envCfg.native();
}

// RMS plug-ins are always lower cased
boost::to_lower(_options->m_sRMS);

Expand Down
2 changes: 2 additions & 0 deletions dds-submit/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ int main(int argc, char* argv[])
requestInfo.m_slots = options.m_slots;
requestInfo.m_pluginPath = options.m_sPath;
requestInfo.m_groupName = options.m_groupName;
requestInfo.m_submissionTag = options.m_submissionTag;
requestInfo.m_envCfgFilePath = options.m_envCfgFilePath;
SSubmitRequest::ptr_t requestPtr = SSubmitRequest::makeRequest(requestInfo);

requestPtr->setMessageCallback(
Expand Down
11 changes: 9 additions & 2 deletions dds-tools-lib/src/ToolsProtocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ void SSubmitRequestData::_toPT(boost::property_tree::ptree& _pt) const
_pt.put<string>("rms", m_rms);
_pt.put<string>("pluginPath", m_pluginPath);
_pt.put<string>("groupName", m_groupName);
_pt.put<string>("submissionTag", m_submissionTag);
_pt.put<string>("envCfgFilePath", m_envCfgFilePath);
}

void SSubmitRequestData::_fromPT(const boost::property_tree::ptree& _pt)
Expand All @@ -189,13 +191,16 @@ void SSubmitRequestData::_fromPT(const boost::property_tree::ptree& _pt)
m_rms = _pt.get<string>("rms", "");
m_pluginPath = _pt.get<string>("pluginPath", "");
m_groupName = _pt.get<string>("groupName", "");
m_submissionTag = _pt.get<string>("submissionTag", "");
m_envCfgFilePath = _pt.get<string>("envCfgFilePath", "");
}

bool SSubmitRequestData::operator==(const SSubmitRequestData& _val) const
{
return (SBaseData::operator==(_val) && m_rms == _val.m_rms && m_instances == _val.m_instances &&
m_slots == _val.m_slots && m_config == _val.m_config && m_pluginPath == _val.m_pluginPath &&
m_groupName == _val.m_groupName);
m_groupName == _val.m_groupName && m_submissionTag == _val.m_submissionTag &&
m_envCfgFilePath == _val.m_envCfgFilePath);
}

// We need to put function implementation in the same "dds::tools_api" namespace as a friend function declaration.
Expand All @@ -209,7 +214,9 @@ namespace dds
{
return _os << _data.defaultToString() << "; instances: " << _data.m_instances
<< "; slots: " << _data.m_slots << "; config: " << _data.m_config << "; rms: " << _data.m_rms
<< "; pluginPath: " << _data.m_pluginPath << "; groupName: " << _data.m_groupName;
<< "; pluginPath: " << _data.m_pluginPath << "; groupName: " << _data.m_groupName
<< "; submissionTag: " << _data.m_submissionTag
<< "; envCfgFilePath: " << _data.m_envCfgFilePath;
}
} // namespace tools_api
} // namespace dds
Expand Down
15 changes: 9 additions & 6 deletions dds-tools-lib/src/ToolsProtocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,15 @@ namespace dds
SSubmitRequestData();
SSubmitRequestData(const boost::property_tree::ptree& _pt);

std::string m_rms; ///< RMS.
uint32_t m_instances = 0; ///< Number of instances.
uint32_t m_slots = 0; /// < Number of task slots.
std::string m_config; ///< Path to the configuration file.
std::string m_pluginPath; ///< Optional. A plug-in's directory search path
std::string m_groupName; ///< A group name of agents.
std::string m_rms; ///< RMS.
uint32_t m_instances = 0; ///< Number of instances.
uint32_t m_slots = 0; /// < Number of task slots.
std::string m_config; ///< Path to the configuration file.
std::string m_pluginPath; ///< Optional. A plug-in's directory search path
std::string m_groupName; ///< A group name of agents.
std::string m_submissionTag; ///< A Submission Tag
std::string m_envCfgFilePath; //< A path to a user enironment script. Will be execeuted once per agent
//(valid for all task slots of the agent)

private:
friend SBaseData<SSubmitRequestData>;
Expand Down
2 changes: 1 addition & 1 deletion etc/DDSWorker.sh.in
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ PKG_VERSION=$(cat $WD/version)

# execute user's script if present
if [ -r $USER_SCRIPT ]; then
logMsg "Sourcing a user defined environment script..."
logMsg "Sourcing the user defined environment script..."
source $USER_SCRIPT
logMsg "Current environment: "
env
Expand Down
3 changes: 1 addition & 2 deletions plugins/dds-submit-slurm/src/dds-submit-slurm-worker
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ logMsg()
logMsg "Submitting DDS Job on the SLURM cluster..."
# Set execute access for job.slurm
chmod +x $RMS_SANDBOX/job.slurm
# --chdir overrides the current working directory
JOB_ID=$(sbatch --job-name dds_agent_ --chdir $RMS_SANDBOX $RMS_SANDBOX/job.slurm)
JOB_ID=$(sbatch $RMS_SANDBOX/job.slurm)
if (( $? != 0 )) ; then
logMsg "Error: Failed to submit SLURM job ($?)"
exit $?
Expand Down
18 changes: 7 additions & 11 deletions plugins/dds-submit-slurm/src/job.slurm.in
Original file line number Diff line number Diff line change
@@ -1,21 +1,17 @@
#!/usr/bin/env bash

#DDS_NEED_ARRAY
#DDS_CPU_PER_AGENT
#SBATCH --nodes=%DDS_NINSTANCES%
#SBATCH --no-kill
#SBATCH --ntasks-per-node=1

#SBATCH --cpus-per-task=%DDS_NSLOTS%

#SBATCH --job-name=%DDS_SUBMISSION_TAG%
#SBATCH --chdir=%DDS_JOB_ROOT_WRK_DIR%

#DDS_USER_OPTIONS

# create working dir
eval JOB_WRK_DIR=%DDS_AGENT_ROOT_WRK_DIR%/${SLURM_ARRAY_JOB_ID}_${SLURM_ARRAY_TASK_ID}
mkdir -p $JOB_WRK_DIR
cd $JOB_WRK_DIR

# copy DDS Scout script into the working dir.
cp %DDS_SCOUT% $JOB_WRK_DIR/

# execute DDS Scout
./DDSWorker.sh
srun --output=slurm-%j-%N.out /usr/bin/env bash -c 'eval JOB_WRK_DIR=%DDS_AGENT_ROOT_WRK_DIR%/${SLURM_JOB_NAME}_${SLURM_JOBID}_${SLURMD_NODENAME}; mkdir -p $JOB_WRK_DIR; cd $JOB_WRK_DIR; cp %DDS_SCOUT% $JOB_WRK_DIR/; ./DDSWorker.sh'

exit 0
Loading