Skip to content

Commit

Permalink
Merge branch 'mysql-5.7' into mysql-trunk
Browse files Browse the repository at this point in the history
  • Loading branch information
Ole-Hjalmar Kristensen authored and Ole-Hjalmar Kristensen committed Aug 17, 2017
2 parents 983aabb + 194ff94 commit 69ff70a
Show file tree
Hide file tree
Showing 10 changed files with 123 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@
#include "synode_no.h"
#include "xcom_vp_str.h"

define_xdr_funcs(synode_no)
define_xdr_funcs(app_data_ptr)

static app_data_list nextp(app_data_list l);
static unsigned long msg_count(app_data_ptr a);

/**
Debug a single app_data struct.
Expand Down Expand Up @@ -358,7 +362,7 @@ void follow(app_data_list l, app_data_ptr p) {
/**
Count the number of messages in a list.
*/
unsigned long msg_count(app_data_ptr a) {
static unsigned long msg_count(app_data_ptr a) {
unsigned long n = 0;
while (a) {
n++;
Expand All @@ -367,7 +371,6 @@ unsigned long msg_count(app_data_ptr a) {
return n;
}

define_xdr_funcs(app_data_ptr)

/* {{{ Message constructors */

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ app_data_ptr new_exit();
app_data_ptr new_nodes(u_int n, node_address *names, cargo_type cargo);
app_data_ptr new_reset(cargo_type type);

d_xdr_funcs(app_data_ptr) unsigned long msg_count(app_data_ptr a);
void _replace_app_data_list(app_data_list target, app_data_ptr source);
char *dbg_app_data(app_data_ptr a);
void follow(app_data_list l, app_data_ptr p);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ extern "C" {
#define SY_FMT_DEF "%x %" PRIu64 " %u"
#define SY_MEM(s) (s).group_id, (uint64_t)(s).msgno, (s).node

d_xdr_funcs(synode_no)

int synode_eq(synode_no x, synode_no y);
int synode_eq(synode_no x, synode_no y);
int synode_gt(synode_no x, synode_no y);
int synode_lt(synode_no x, synode_no y);
static const synode_no null_synode = NULL_SYNODE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,14 @@
#include "task_net.h"
#include "task_os.h"
#include "xcom_cfg.h"
#ifndef _WIN32
#ifndef USE_SELECT
#include <poll.h>
#endif
#endif

#include "retry.h"
#include "xdr_utils.h"

extern char *pax_op_to_str(int x);

Expand All @@ -89,17 +92,33 @@ struct iotasks {
linkage tasks; /* OHKFIX Should be one each for read and write */
};
#else
typedef struct {
u_int pollfd_array_len;
pollfd *pollfd_array_val;
} pollfd_array;

typedef task_env* task_env_p;

typedef struct {
u_int task_env_p_array_len;
task_env_p *task_env_p_array_val;
} task_env_p_array;

define_xdr_funcs(pollfd)
define_xdr_funcs(task_env_p)

struct iotasks {
int nwait;
struct pollfd fd[MAXFILES];
task_env *tasks[MAXFILES];
pollfd_array fd;
task_env_p_array tasks;
};
#endif
int task_errno = 0;
static task_env *extract_first_delayed();
static task_env *task_ref(task_env *t);
static task_env *task_unref(task_env *t);
static void wake_all_io();
static void task_sys_deinit();

/* Return time as seconds */
static double _now = 0.0;
Expand Down Expand Up @@ -506,6 +525,7 @@ static void task_delete(task_env *t) {
#if 1
free(deactivate(t)); /* Deactivate and free task */
#else
deactivate(t);
link_into(&t->l, &free_tasks);
#endif
active_tasks--;
Expand Down Expand Up @@ -536,7 +556,7 @@ task_env *task_deactivate(task_env *t) { return deactivate(t); }
/* Set terminate flag and activate task */
task_env *task_terminate(task_env *t) {
if (t) {
MAY_DBG(FN; PTREXP(t); STREXP(t->name));
DBGOUT(FN; PTREXP(t); STREXP(t->name); NDBG(t->refcnt, d));
t->terminate = KILL; /* Set terminate flag */
activate(t); /* and get it running */
}
Expand Down Expand Up @@ -577,6 +597,11 @@ static void iotasks_init(iotasks *iot) {
link_init(&iot->tasks, type_hash("task_env"));
}

static void iotasks_deinit(iotasks *iot)
{
DBGOUT(FN);
}

#if TASK_DBUG_ON
static void poll_debug() MY_ATTRIBUTE((unused));
static void poll_debug() {
Expand Down Expand Up @@ -706,14 +731,20 @@ void remove_and_wakeup(int fd) {
}

#else
static int active_io() { return iot.nwait > 0; }
static void iotasks_init(iotasks *iot)
{
DBGOUT(FN);
iot->nwait = 0;
init_pollfd_array(&iot->fd);
init_task_env_p_array(&iot->tasks);
}

static void iotasks_init(iotasks *iot) {
int i;
static void iotasks_deinit(iotasks *iot)
{
DBGOUT(FN);
iot->nwait = 0;
for (i = 0; i < MAXFILES; i++) {
iot->tasks[i] = 0;
}
free_pollfd_array(&iot->fd);
free_task_env_p_array(&iot->tasks);
}

#if TASK_DBUG_ON
Expand All @@ -729,26 +760,27 @@ static void poll_debug() {
}
#endif

static void poll_wakeup(int i) {
activate(task_unref(iot.tasks[i]));
iot.tasks[i] = NULL;
static void poll_wakeup(int i)
{
activate(task_unref(get_task_env_p(&iot.tasks,i)));
set_task_env_p(&iot.tasks, NULL,i);
iot.nwait--; /* Shrink array of pollfds */
iot.fd[i] = iot.fd[iot.nwait];
iot.tasks[i] = iot.tasks[iot.nwait];
set_pollfd(&iot.fd, get_pollfd(&iot.fd,iot.nwait),i);
set_task_env_p(&iot.tasks, get_task_env_p(&iot.tasks,iot.nwait),i);
}

static int poll_wait(int ms) {
result nfds = {0, 0};
int wake = 0;

/* Wait at most ms milliseconds */
MAY_DBG(FN; NDBG(ms, d));
int wake = 0;
if (ms < 0 || ms > 1000) ms = 1000; /* Wait at most 1000 ms */
SET_OS_ERR(0);
while ((nfds.val = poll(iot.fd, iot.nwait, ms)) == -1) {
nfds.err = to_errno(GET_OS_ERR);
if (nfds.err != SOCK_EINTR) {
task_dump_err(nfds.err);
while ((nfds.val = poll(iot.fd.pollfd_array_val, iot.nwait, ms)) == -1) {
nfds.funerr = to_errno(GET_OS_ERR);
if (nfds.funerr != SOCK_EINTR) {
task_dump_err(nfds.funerr);
MAY_DBG(FN; STRLIT("poll failed"));
abort();
}
Expand All @@ -760,11 +792,12 @@ static int poll_wait(int ms) {
int interrupt = 0;
while (i < iot.nwait) {
interrupt =
(iot.tasks[i]->time != 0.0 && iot.tasks[i]->time < task_now());
(get_task_env_p(&iot.tasks,i)->time != 0.0 &&
get_task_env_p(&iot.tasks,i)->time < task_now());
if (interrupt || /* timeout ? */
iot.fd[i].revents) {
get_pollfd(&iot.fd,i).revents) {
/* if(iot.fd[i].revents & POLLERR) abort(); */
iot.tasks[i]->interrupt = interrupt;
get_task_env_p(&iot.tasks,i)->interrupt = interrupt;
poll_wakeup(i);
wake = 1;
} else {
Expand All @@ -779,28 +812,37 @@ static void add_fd(task_env *t, int fd, int op) {
int events = 'r' == op ? POLLIN | POLLRDNORM : POLLOUT;
MAY_DBG(FN; PTREXP(t); NDBG(fd, d); NDBG(op, d));
assert(fd >= 0);
assert(fd < MAXFILES);
t->waitfd = fd;
deactivate(t);
task_ref(t);
iot.tasks[iot.nwait] = t;
iot.fd[iot.nwait].fd = fd;
iot.fd[iot.nwait].events = events;
iot.fd[iot.nwait].revents = 0;
set_task_env_p(&iot.tasks, t, iot.nwait);
{
pollfd x;
x.fd = fd;
x.events = events;
x.revents = 0;
set_pollfd(&iot.fd, x, iot.nwait);
}
iot.nwait++;
}

void unpoll(int i) {
assert(i < MAXFILES);
iot.tasks[i] = NULL;
iot.fd[i].fd = -1;
task_unref(get_task_env_p(&iot.tasks, i));
set_task_env_p(&iot.tasks, NULL,i);
{
pollfd x;
x.fd = -1;
x.events = 0;
x.revents = 0;
set_pollfd(&iot.fd, x, i);
}
}

static void wake_all_io() {
int i;
for (i = 0; i < iot.nwait; i++) {
activate(get_task_env_p(&iot.tasks,i));
unpoll(i);
activate(task_unref(iot.tasks[i]));
}
iot.nwait = 0;
}
Expand All @@ -809,7 +851,7 @@ void remove_and_wakeup(int fd) {
int i = 0;
MAY_DBG(FN; NDBG(fd, d));
while (i < iot.nwait) {
if (iot.fd[i].fd == fd) {
if (get_pollfd(&iot.fd,i).fd == fd) {
poll_wakeup(i);
} else {
i++;
Expand All @@ -819,36 +861,6 @@ void remove_and_wakeup(int fd) {

#endif
task_env *stack = NULL;
/* Locks needed to get atomic reads and writes by protecting file descriptors
while the task has been suspended by wait_io */
static task_env *io_wait_locks[MAXFILES][2];
/* purecov: begin deadcode */
int is_locked(int fd) { return io_wait_locks[fd][0] || io_wait_locks[fd][1]; }

int lock_fd(int fd, task_env *t, int lock) {
if (fd < 0) return 0;
lock = lock != 'r';
if (io_wait_locks[fd][lock]) {
DBGOUT(FN; NDBG(fd, d); PTREXP(t); STRLIT(" failed"));
return 0;
} else {
io_wait_locks[fd][lock] = t;
return 1;
}
}

int unlock_fd(int fd, task_env *t, int lock) {
if (fd < 0) return 0;
lock = lock != 'r';
if (io_wait_locks[fd][lock] != t) {
DBGOUT(FN; NDBG(fd, d); PTREXP(t); STRLIT(" failed"));
return 0;
} else {
io_wait_locks[fd][lock] = NULL;
return 1;
}
}
/* purecov: end */

task_env *wait_io(task_env *t, int fd, int op) {
t->time = 0.0;
Expand Down Expand Up @@ -1158,6 +1170,7 @@ void task_loop() {
idle_time += seconds() - time;
}
}
task_sys_deinit();
}

static int init_sockaddr(char *server, struct sockaddr_in *sock_addr,
Expand Down Expand Up @@ -1471,6 +1484,13 @@ void task_sys_init() {
/* task_new(statistics_task, null_arg, "statistics_task", 1); */
}


static void task_sys_deinit()
{
DBGOUT(FN);
iotasks_deinit(&iot);
}

/* purecov: begin deadcode */
int is_running(task_env *t) { return t && t->terminate == RUN; }
/* purecov: end */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ extern "C" {
Nonblocking IO and event handling need to be rewritten for each new OS.
*/

#define USE_SELECT
/* #define USE_SELECT */
#ifdef TASK_EVENT_TRACE
void add_base_event(double when, char const *file, int state);
#define ADD_BASE_EVENT \
Expand Down Expand Up @@ -262,8 +262,6 @@ struct task_queue {
};
typedef struct task_queue task_queue;

#define MAXFILES MAXTASKS

#define _ep ((struct env *)(stack->sp->ptr))

#define TASK_ALLOC(pool, type) (task_allocate(pool, (unsigned int)sizeof(type)))
Expand All @@ -272,7 +270,7 @@ typedef struct task_queue task_queue;
#define TASK_DEBUG(x) \
if (stack->debug) { \
DBGOUT(FN; STRLIT(x " task "); PTREXP((void *)stack); \
STRLIT(stack->name)); \
STRLIT(stack->name); NDBG(stack->sp->state,d));\
}
#else
#define TASK_DEBUG(x)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,17 @@ static inline int hard_select_err(int err) {
return err != 0 && from_errno(err) != WSAEINTR;
}


#if(_WIN32_WINNT < 0x0600)
#error "Need _WIN32_WINNT >= 0x0600"
#endif

typedef ULONG nfds_t;
typedef struct pollfd pollfd;
static inline int poll(pollfd * fds, nfds_t nfds, int timeout) {
return WSAPoll(fds, nfds, timeout);
}

#else
#include <errno.h>
#include <netdb.h>
Expand Down Expand Up @@ -87,6 +98,8 @@ static inline int hard_select_err(int err) {
return from_errno(err) != 0 && from_errno(err) != EINTR;
}

typedef struct pollfd pollfd;

#endif

extern void remove_and_wakeup(int fd);
Expand Down
Loading

0 comments on commit 69ff70a

Please sign in to comment.