-
Notifications
You must be signed in to change notification settings - Fork 82
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Thread-safety of clingo_ground_callback #412
Comments
A control object should not be accessed simultaneously from different threads. However, two threads can access two different control objects in parallel. There is shared state, the symbols, but access to them is locked and thread safe. I do not see how it is possible that two different callbacks can be mixed up. They are not stored in global state. |
Hi again. I managed to reproduce the issue with the minimal working example below. Essentially, what I'm doing is creating a separate I compiled the minimal working example with What's interesting is, if one compiles with
appears again. What am I doing wrong here? Thanks #include <clingo.h>
#include <stdio.h>
#include <stdbool.h>
#include <pthread.h>
#include <string.h>
/********************** Start of model printing functions *********************/
#include <stdlib.h>
#include <inttypes.h>
typedef struct model_buffer {
clingo_symbol_t *symbols;
size_t symbols_n;
char *string;
size_t string_n;
} model_buffer_t;
void free_model_buffer(model_buffer_t *buf) {
if (buf->symbols) {
free(buf->symbols);
buf->symbols = NULL;
buf->symbols_n = 0;
}
if (buf->string) {
free(buf->string);
buf->string = NULL;
buf->string_n = 0;
}
}
bool print_symbol(clingo_symbol_t symbol, model_buffer_t *buf) {
bool ret = true;
char *string;
size_t n;
// determine size of the string representation of the next symbol in the model
if (!clingo_symbol_to_string_size(symbol, &n)) { goto error; }
if (buf->string_n < n) {
// allocate required memory to hold the symbol's string
if (!(string = (char*)realloc(buf->string, sizeof(*buf->string) * n))) {
clingo_set_error(clingo_error_bad_alloc, "could not allocate memory for symbol's string");
goto error;
}
buf->string = string;
buf->string_n = n;
}
// retrieve the symbol's string
if (!clingo_symbol_to_string(symbol, buf->string, n)) { goto error; }
printf("%s", buf->string);
goto out;
error:
ret = false;
out:
return ret;
}
bool print_model(const clingo_model_t *model, model_buffer_t *buf, const char *label, clingo_show_type_bitset_t show) {
bool ret = true;
clingo_symbol_t *symbols;
size_t n;
const clingo_symbol_t *it, *ie;
// determine the number of (shown) symbols in the model
if (!clingo_model_symbols_size(model, show, &n)) { goto error; }
// allocate required memory to hold all the symbols
if (buf->symbols_n < n) {
if (!(symbols = (clingo_symbol_t*)malloc(sizeof(*buf->symbols) * n))) {
clingo_set_error(clingo_error_bad_alloc, "could not allocate memory for atoms");
goto error;
}
buf->symbols = symbols;
buf->symbols_n = n;
}
// retrieve the symbols in the model
if (!clingo_model_symbols(model, show, buf->symbols, n)) { goto error; }
printf("%s:", label);
for (it = buf->symbols, ie = buf->symbols + n; it != ie; ++it) {
printf(" ");
if (!print_symbol(*it, buf)) { goto error; }
}
printf("\n");
goto out;
error:
ret = false;
out:
return ret;
}
bool print_solution(const clingo_model_t *model) {
static pthread_mutex_t mu = PTHREAD_MUTEX_INITIALIZER;
bool ret = true;
uint64_t number;
clingo_model_type_t type;
const char *type_string = "";
model_buffer_t data = {NULL, 0, NULL, 0};
// get model type
if (!clingo_model_type(model, &type)) { goto error; }
switch ((enum clingo_model_type_e)type) {
case clingo_model_type_stable_model: { type_string = "Stable model"; break; }
case clingo_model_type_brave_consequences: { type_string = "Brave consequences"; break; }
case clingo_model_type_cautious_consequences: { type_string = "Cautious consequences"; break; }
}
// get running number of model
if (!clingo_model_number(model, &number)) { goto error; }
pthread_mutex_lock(&mu);
printf("%s %" PRIu64 ":\n", type_string, number);
if (!print_model(model, &data, " shown", clingo_show_type_shown)) { goto error; }
if (!print_model(model, &data, " atoms", clingo_show_type_atoms)) { goto error; }
if (!print_model(model, &data, " terms", clingo_show_type_terms)) { goto error; }
if (!print_model(model, &data, " ~atoms", clingo_show_type_complement
| clingo_show_type_atoms)) { goto error; }
pthread_mutex_unlock(&mu);
goto out;
error:
ret = false;
out:
free_model_buffer(&data);
return ret;
}
/********************** End of model printing functions *********************/
#ifndef N_F
/* Number of facts a(1), ..., a(N_F). */
#define N_F 5
#endif
#ifndef N_T
/* Number of threads to be run in parallel. */
#define N_T 4
#endif
/* Max length of atoms a(i). */
#define ATOM_LEN 8
/* Default clingo ground parts. */
const clingo_part_t GROUND_DEFAULT_PARTS[] = {{"base", NULL, 0}};
/* Increment the choice vector by one. */
bool incr_choice(bool choice[N_F], int i) {
if (i == N_F-1 || incr_choice(choice, i+1)) return !(choice[i] = !choice[i]);
return 0;
}
/* This logger ignores clingo_warning_atom_undefined. */
void logger(clingo_warning_t code, const char *msg, void *_) {
if (code == clingo_warning_atom_undefined) return;
printf("clingo | error code %d: %s\n", code, msg);
}
/* This external function is a no-op. Return the argument as is to be injected. */
bool call(const clingo_location_t *loc, const char *name, const clingo_symbol_t *args,
size_t argc, void *data, clingo_symbol_callback_t sym_cb, void *sym_data) {
return sym_cb(args, argc, sym_data);
}
/* Create a new control, mock a grounding with callback. */
clingo_control_t* new_ctrl(bool choice[N_F], char F[N_F][ATOM_LEN]) {
bool ok = false;
clingo_control_t *C = NULL;
if (!clingo_control_new(NULL, 0, logger, NULL, 20, &C)) return NULL;
/* Add rules with variables with an external function named call. */
if (!clingo_control_add(C, "base", NULL, 0,
"f(@call(X)) :- a(X), X \\ 2 = 0.\n"
"g(@call(X)) :- a(X), X \\ 2 = 1.\n")) goto cleanup;
/* Add facts according to the current choice. */
for (int i = 0; i < N_F; ++i)
if (choice[i])
if (!clingo_control_add(C, "base", NULL, 0, F[i])) goto cleanup;
/* Call the grounder. */
if (!clingo_control_ground(C, GROUND_DEFAULT_PARTS, 1, call, NULL)) goto cleanup;
ok = true;
cleanup:
if (!ok) clingo_control_free(C);
return C;
}
/* Thread data. */
typedef struct {
bool choice[N_F]; /* Array of each fact being chosen. */
char F[N_F][ATOM_LEN]; /* Strings of facts a(i). */
pthread_mutex_t *wakeup; /* Address to wakeup mutex. */
pthread_cond_t *avail; /* Address to avail condition variable. */
bool *busy_procs; /* Address to table of busy threads. */
int pid; /* Thread ID. */
} data_t;
/* Mockup of a thread function running some task over the models of a program. */
void* routine(void *data) {
data_t *D = (data_t*) data;
clingo_control_t *C = new_ctrl(D->choice, D->F);
int N;
bool ok = false;
if (!C) return NULL;
/* Run solve and iterate models. */ {
clingo_solve_handle_t *hdl;
const clingo_model_t *M;
if (!clingo_control_solve(C, clingo_solve_mode_yield, NULL, 0, NULL, NULL, &hdl)) goto error;
for (N = 0; true; ++N) {
if (!clingo_solve_handle_resume(hdl)) goto error;
if (!clingo_solve_handle_model(hdl, &M)) goto error;
#ifdef PRINT
if (M) print_solution(M);
else break;
#else
if (!M) break;
#endif
}
if (!clingo_solve_handle_close(hdl)) goto error;
}
ok = true;
goto cleanup;
error:
ok = false;
cleanup:
clingo_control_free(C);
if (!ok) puts("Error!");
/* Thread is finished. Notify the busy_procs table and set it as available. */
pthread_mutex_lock(D->wakeup);
D->busy_procs[D->pid] = false;
pthread_cond_signal(D->avail);
pthread_mutex_unlock(D->wakeup);
return NULL;
}
/* Retrieve a free thread's ID. */
int retr_free_proc(bool *busy_procs, pthread_mutex_t *wakeup, pthread_cond_t *avail) {
int i, id = -1;
pthread_mutex_lock(wakeup);
/* Keep looking for an available ID. */
while (true) {
for (i = 0, id = -1; i < N_T; ++i) if (!busy_procs[i]) { id = i; break; }
if (id != -1) break;
pthread_cond_wait(avail, wakeup);
}
/* Found one. Set it as busy and return it. */
busy_procs[id] = true;
pthread_mutex_unlock(wakeup);
return id;
}
int main() {
char F[N_F][ATOM_LEN] = {0}; /* Facts a(0), ..., a(N_F). */
bool choice[N_F] = {0}; /* Choice of picking each a(i) (or not). */
pthread_t threads[N_T]; /* Thread pool. */
data_t D[N_T] = {0}; /* Data for each thread. */
bool busy_procs[N_T] = {0}; /* Whether the i-th thread is busy. */
pthread_mutex_t wakeup = PTHREAD_MUTEX_INITIALIZER; /* Mutex for accessing busy_procs. */
pthread_cond_t avail = PTHREAD_COND_INITIALIZER; /* Signal whether the thread is available. */
/* Write out each a(i) fact. */
for (int i = 0; i < N_F; ++i) sprintf(F[i], "a(%d).", i);
/* Initialize thread data. */
for (int i = 0; i < N_T; ++i) {
D[i].avail = &avail; D[i].wakeup = &wakeup;
D[i].busy_procs = busy_procs; D[i].pid = i;
}
/* Iterate through all possible combinations of N_F binary variables. */
do {
int id = retr_free_proc(busy_procs, &wakeup, &avail);
/* Copy the current combination to the thread's data. */
for (int i = 0; i < N_F; ++i) {
D[id].choice[i] = choice[i];
memcpy(D[id].F[i], F[i], ATOM_LEN*sizeof(char));
}
/* Assign thread id to this task. */
pthread_create(&threads[id], NULL, routine, (void*) &D[id]);
} while(!incr_choice(choice, 0));
/* Wait for threads to finish. */
for (int i = 0; i < N_T; ++i) pthread_join(threads[i], NULL);
return 0;
} |
Thanks, I'll have a look. Update: This is indeed a bug in clingo. There is a global struct involved that is changed when calling ground. It should be relatively easy to avoid changing this struct. I'll link a PR once I start working on this. |
Awesome, thanks! |
The problem should be fixed in the linked PR above. Thanks for providing the MWE. I would not have found this bug without an example. |
Hi,
I was wondering whether calling
clingo_control_ground
with aclingo_ground_callback_t
was supposed to be thread-safe. I'm running severalclingo_control_t
's in parallel, one for each thread, and grounding with aclingo_ground_callback_t
with no memory sharing of programs between threads. I'm getting non-deterministic segfaults together with(where
externalfunc
is the external function to be injected inclingo_ground_callback_t
) errors.To be more precise on this (seemingly) thread-safety problem itself,$T_1$ and $T_2$ , each with its own $f_1$ and $f_2$ . When running in parallel, it seems $f_2$ for $T_1$ instead of $f_1$ (and vice-versa).
gdb
points toclingo_ground_callback_t
being called on the wrong thread. For instance, suppose I have two threadsclingo_control_t
, and callbacksclingo_control_ground
sometimes callsAdding mutex locks before and after
clingo_control_ground
calls seems to fix this issue.I tried to write a minimal working example of this problem but I couldn't replicate the error, which is weird since it seemed to be thread-safe in the MWE but not in my use-case (even though the issue was fixed with mutex locks in my code). I've been debugging this issue for weeks now. Please help me keep my sanity. :)
Thanks
The text was updated successfully, but these errors were encountered: