Skip to content

Commit

Permalink
supported multiple dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
justanhduc committed Feb 6, 2021
1 parent 7ed9ea8 commit 8163767
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 102 deletions.
6 changes: 5 additions & 1 deletion client.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ void c_new_job() {
m.u.newjob.label_size = 0;
m.u.newjob.store_output = command_line.store_output;
m.u.newjob.do_depend = command_line.do_depend;
m.u.newjob.depend_on = command_line.depend_on;
m.u.newjob.depend_on_size = command_line.depend_on_size;
m.u.newjob.should_keep_finished = command_line.should_keep_finished;
m.u.newjob.command_size = strlen(new_command) + 1; /* add null */
m.u.newjob.wait_enqueuing = command_line.wait_enqueuing;
Expand All @@ -88,6 +88,10 @@ void c_new_job() {
/* Send the message */
send_msg(server_socket, &m);

/* send dependencies */
if (command_line.do_depend)
send_ints(server_socket, command_line.depend_on, command_line.depend_on_size);

/* Send the command */
send_bytes(server_socket, new_command, m.u.newjob.command_size);

Expand Down
177 changes: 97 additions & 80 deletions jobs.c
Original file line number Diff line number Diff line change
Expand Up @@ -438,76 +438,88 @@ int s_newjob(int s, struct Msg *m) {
p->notify_errorlevel_to = 0;
p->notify_errorlevel_to_size = 0;
p->do_depend = m->u.newjob.do_depend;
p->depend_on = -1; /* By default. May be overriden in the next conditions */
p->depend_on_size = m->u.newjob.depend_on_size;

/* this error level here is used internally to decide whether a job should be run or not
* so it only matters whether the error level is 0 or not.
* thus, summing the absolute error levels of all dependencies is sufficient.*/
p->dependency_errorlevel = 0;
warning("here\n");
if (m->u.newjob.do_depend == 1) {
int *depend_on;
p->depend_on = (int*) malloc(sizeof(int));
depend_on = recv_ints(s);
/* Depend on the last queued job. */

/* As we already have 'p' in the queue,
* neglect it during the find_last_jobid_in_queue() */
if (m->u.newjob.depend_on == -1) {
p->depend_on = find_last_jobid_in_queue(p->jobid);

/* We don't trust the last jobid in the queue (running or queued)
* if it's not the last added job. In that case, let
* the next control flow handle it as if it could not
* do_depend on any still queued job. */
if (last_finished_jobid > p->depend_on)
p->depend_on = -1;

/* If it's queued still without result, let it know
* its result to p when it finishes. */
if (p->depend_on != -1) {
for (int idx = 0; idx < p->depend_on_size; idx++) {
/* As we already have 'p' in the queue,
* neglect it during the find_last_jobid_in_queue() */
if (depend_on[idx] == -1) {
p->depend_on[idx] = find_last_jobid_in_queue(p->jobid);

/* We don't trust the last jobid in the queue (running or queued)
* if it's not the last added job. In that case, let
* the next control flow handle it as if it could not
* do_depend on any still queued job. */
if (last_finished_jobid > p->depend_on[idx])
p->depend_on[idx] = -1;

/* If it's queued still without result, let it know
* its result to p when it finishes. */
if (p->depend_on[idx] != -1) {
struct Job *depended_job;
depended_job = findjob(p->depend_on[idx]);
if (depended_job != 0)
add_notify_errorlevel_to(depended_job, p->jobid);
else
warning("The jobid %i is queued to do_depend on the jobid %i"
" suddenly non existent in the queue", p->jobid,
p->depend_on[idx]);
} else /* Otherwise take the finished job, or the last_errorlevel */
{
if (depend_on[idx] == -1) {
int ljobid = find_last_stored_jobid_finished();
p->depend_on[idx] = ljobid;

/* If we have a newer result stored, use it */
/* NOTE:
* Reading this now, I don't know how ljobid can be
* greater than last_finished_jobid */
if (last_finished_jobid < ljobid) {
struct Job *parent;
parent = find_finished_job(ljobid);
if (!parent)
error("jobid %i suddenly disappeared from the finished list",
ljobid);
p->dependency_errorlevel += abs(parent->result.errorlevel);
} else
p->dependency_errorlevel += abs(last_errorlevel);
}
}
} else {
/* The user decided what's the job this new job depends on */
struct Job *depended_job;
depended_job = findjob(p->depend_on);

p->depend_on[idx] = depend_on[idx];

depended_job = findjob(p->depend_on[idx]);
if (depended_job != 0)
add_notify_errorlevel_to(depended_job, p->jobid);
else
warning("The jobid %i is queued to do_depend on the jobid %i"
" suddenly non existent in the queue", p->jobid,
p->depend_on);
} else /* Otherwise take the finished job, or the last_errorlevel */
{
if (m->u.newjob.depend_on == -1) {
int ljobid = find_last_stored_jobid_finished();
p->depend_on = ljobid;

/* If we have a newer result stored, use it */
/* NOTE:
* Reading this now, I don't know how ljobid can be
* greater than last_finished_jobid */
if (last_finished_jobid < ljobid) {
struct Job *parent;
parent = find_finished_job(ljobid);
if (!parent)
error("jobid %i suddenly disappeared from the finished list",
ljobid);
p->dependency_errorlevel = parent->result.errorlevel;
} else
p->dependency_errorlevel = last_errorlevel;
}
}
} else {
/* The user decided what's the job this new job depends on */
struct Job *depended_job;

p->depend_on = m->u.newjob.depend_on;

depended_job = findjob(p->depend_on);
if (depended_job != 0)
add_notify_errorlevel_to(depended_job, p->jobid);
else {
struct Job *parent;
parent = find_finished_job(p->depend_on);
if (parent) {
p->dependency_errorlevel = parent->result.errorlevel;
} else {
/* We consider as if the job not found
didn't finish well */
p->dependency_errorlevel = -1;
else {
struct Job *parent;
parent = find_finished_job(p->depend_on[idx]);
if (parent) {
p->dependency_errorlevel += abs(parent->result.errorlevel);
} else {
/* We consider as if the job not found
didn't finish well */
p->dependency_errorlevel += 1;
}
}
}
}
}
free(depend_on);
} else
p->depend_on = NULL; /* By default. May be overriden in the next conditions */


pinfo_init(&p->info);
Expand Down Expand Up @@ -550,7 +562,6 @@ int s_newjob(int s, struct Msg *m) {
"Environment:\n%s", ptr);
free(ptr);
}

return p->jobid;
}

Expand Down Expand Up @@ -640,17 +651,23 @@ int next_run_job() {
}
}

if (p->depend_on >= 0) {
struct Job *do_depend_job = get_job(p->depend_on);
/* We won't try to run any job do_depending on an unfinished
* job */
if (do_depend_job != NULL &&
(do_depend_job->state == QUEUED || do_depend_job->state == RUNNING ||
do_depend_job->state == ALLOCATING)) {
/* Next try */
p = p->next;
continue;
if (p->do_depend) {
int ready = 1;
for (int i = 0; i < p->depend_on_size; i++) {
struct Job *do_depend_job = get_job(p->depend_on[i]);
/* We won't try to run any job do_depending on an unfinished
* job */
if (do_depend_job != NULL &&
(do_depend_job->state == QUEUED || do_depend_job->state == RUNNING ||
do_depend_job->state == ALLOCATING)) {
/* Next try */
p = p->next;
ready = 0;
break;
}
}
if (ready != 1)
continue;
}

if (free_slots >= p->num_slots) {
Expand Down Expand Up @@ -711,8 +728,6 @@ static void new_finished_job(struct Job *j) {
}
p->next = j;
p->next->next = 0;

return;
}

static int job_is_in_state(int jobid, enum Jobstate state) {
Expand Down Expand Up @@ -909,8 +924,12 @@ void s_job_info(int s, int jobid) {
send_msg(s, &m);
pinfo_dump(&p->info, s);
fd_nprintf(s, 100, "Command: ");
if (p->depend_on != -1)
fd_nprintf(s, 100, "[%i]&& ", p->depend_on);
if (p->depend_on) {
fd_nprintf(s, 100, "[%i,", p->depend_on[0]);
for (int i = 1; i < p->depend_on_size; i++)
fd_nprintf(s, 100, ",%i", p->depend_on[i]);
fd_nprintf(s, 100, "]&& ");
}
write(s, p->command, strlen(p->command));
fd_nprintf(s, 100, "\n");
fd_nprintf(s, 100, "Slots required: %i\n", p->num_slots);
Expand Down Expand Up @@ -1010,7 +1029,7 @@ void notify_errorlevel(struct Job *p) {
struct Job *notified;
notified = get_job(p->notify_errorlevel_to[i]);
if (notified) {
notified->dependency_errorlevel = p->result.errorlevel;
notified->dependency_errorlevel += abs(p->result.errorlevel);
}
}
}
Expand Down Expand Up @@ -1375,8 +1394,6 @@ void s_move_urgent(int s, int jobid) {
tmp1->next = p->next;
p->next = firstjob->next;
firstjob->next = p;


send_urgent_ok(s);
}

Expand Down
38 changes: 28 additions & 10 deletions list.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ static char *print_noresult(const struct Job *p) {
const char *output_filename;
int maxlen;
char *line;
/* 18 chars should suffice for a string like "[int]&& " */
char dependstr[18] = "";
/* 20 chars should suffice for a string like "[int,int,..]&& " */
char dependstr[20] = "";

jobstate = jstate2string(p->state);
output_filename = ofilename_shown(p);
Expand All @@ -104,10 +104,19 @@ static char *print_noresult(const struct Job *p) {
maxlen += 3 + strlen(p->label);
if (p->do_depend) {
maxlen += sizeof(dependstr);
if (p->depend_on == -1)
snprintf(dependstr, sizeof(dependstr), "&& ");
int pos = 0;
if (p->depend_on[0] == -1)
pos += snprintf(&dependstr[pos], sizeof(dependstr), "[ ");
else
snprintf(dependstr, sizeof(dependstr), "[%i]&& ", p->depend_on);
pos += snprintf(&dependstr[pos], sizeof(dependstr), "[%i", p->depend_on[0]);

for (int i = 1; i < p->depend_on_size; i++) {
if (p->depend_on[i] == -1)
pos += snprintf(&dependstr[pos], sizeof(dependstr), ", ");
else
pos += snprintf(&dependstr[pos], sizeof(dependstr), ",%i", p->depend_on[i]);
}
pos += snprintf(&dependstr[pos], sizeof(dependstr), "]&& ");
}

line = (char *) malloc(maxlen);
Expand Down Expand Up @@ -144,8 +153,8 @@ static char *print_result(const struct Job *p) {
int maxlen;
char *line;
const char *output_filename;
/* 18 chars should suffice for a string like "[int]&& " */
char dependstr[18] = "";
/* 20 chars should suffice for a string like "[int,int,..]&& " */
char dependstr[20] = "";
float real_ms = p->result.real_ms;
char *unit = "s";

Expand All @@ -159,10 +168,19 @@ static char *print_result(const struct Job *p) {
maxlen += 3 + strlen(p->label);
if (p->do_depend) {
maxlen += sizeof(dependstr);
if (p->depend_on == -1)
snprintf(dependstr, sizeof(dependstr), "&&");
int pos = 0;
if (p->depend_on[0] == -1)
pos += snprintf(&dependstr[pos], sizeof(dependstr), "[ ");
else
snprintf(dependstr, sizeof(dependstr), "[%i]&&", p->depend_on);
pos += snprintf(&dependstr[pos], sizeof(dependstr), "[%i", p->depend_on[0]);

for (int i = 1; i < p->depend_on_size; i++) {
if (p->depend_on[i] == -1)
pos += snprintf(&dependstr[pos], sizeof(dependstr), ", ");
else
pos += snprintf(&dependstr[pos], sizeof(dependstr), ",%i", p->depend_on[i]);
}
pos += snprintf(&dependstr[pos], sizeof(dependstr), "]&& ");
}

line = (char *) malloc(maxlen);
Expand Down
24 changes: 20 additions & 4 deletions main.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ static void default_command_line() {
command_line.send_output_by_mail = 0;
command_line.label = 0;
command_line.do_depend = 0;
command_line.depend_on = -1; /* -1 means depend on previous */
command_line.depend_on = NULL; /* -1 means depend on previous */
command_line.max_slots = 1;
command_line.wait_enqueuing = 1;
command_line.stderr_apart = 0;
Expand Down Expand Up @@ -76,6 +76,16 @@ static int get_two_jobs(const char *str, int *j1, int *j2) {
return 1;
}

int strtok_int(char* str, char* delim, int* ids) {
int count = 0;
char *ptr = strtok(str, delim);
while(ptr != NULL) {
ids[count++] = atoi(ptr);
ptr = strtok(NULL, delim);
}
return count;
}

static struct option longOptions[] = {
{"get_label", optional_argument, NULL, 'a'},
{"count_running", no_argument, NULL, 'R'},
Expand Down Expand Up @@ -133,7 +143,9 @@ void parse_opts(int argc, char **argv) {
break;
case 'd':
command_line.do_depend = 1;
command_line.depend_on = -1;
command_line.depend_on = (int*) malloc(sizeof(int));
command_line.depend_on_size = 1;
command_line.depend_on[0] = -1;
break;
case 'V':
command_line.request = c_SHOW_VERSION;
Expand Down Expand Up @@ -224,11 +236,13 @@ void parse_opts(int argc, char **argv) {
break;
case 'D':
command_line.do_depend = 1;
command_line.depend_on = atoi(optarg);
command_line.depend_on = (int*) malloc(strlen(optarg) * sizeof(int));
command_line.depend_on_size = strtok_int(optarg, ",", command_line.depend_on);
break;
case 'W':
command_line.do_depend = 1;
command_line.depend_on = atoi(optarg);
command_line.depend_on = (int*) malloc(strlen(optarg) * sizeof(int));
command_line.depend_on_size = strtok_int(optarg, ",", command_line.depend_on);
command_line.require_elevel = 1;
break;
case 'U':
Expand Down Expand Up @@ -621,6 +635,8 @@ int main(int argc, char **argv) {
if (command_line.need_server) {
close(server_socket);
}
free(command_line.depend_on);
free(command_line.label);

return errorlevel;
}
Loading

0 comments on commit 8163767

Please sign in to comment.