From c690f0007dfc83496d342add011f3b283fd963ce Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Wed, 21 Aug 2013 01:43:09 +0400 Subject: [PATCH 1/3] fsevents: use shared FSEventStream It seems that number of simultaneously opened FSEventStreams is limited on OSX (i.e. you can have only fixed number of them on one running system), getting past through this limit will cause `FSEventStreamCreate` to return false and write following message to stderr: (CarbonCore.framework) FSEventStreamStart: register_with_server: ERROR: f2d_register_rpc() => (null) (-21) To prevent this, we must use only one shared FSEventStream with a paths for all uv_fsevent_t handles, and then filter out events for each handle using this paths again. See https://github.com/joyent/node/issues/5463 --- include/uv-darwin.h | 10 +- src/unix/darwin.c | 2 +- src/unix/fsevents.c | 392 ++++++++++++++++++++++++++++++-------------- src/unix/kqueue.c | 1 - 4 files changed, 278 insertions(+), 127 deletions(-) diff --git a/include/uv-darwin.h b/include/uv-darwin.h index 43b261f5fb..dcdd42ba6d 100644 --- a/include/uv-darwin.h +++ b/include/uv-darwin.h @@ -36,8 +36,8 @@ #define UV_PLATFORM_LOOP_FIELDS \ uv_thread_t cf_thread; \ - void* cf_cb; \ - void* cf_loop; \ + void* _cf_reserved; \ + void* cf_state; \ uv_mutex_t cf_mutex; \ uv_sem_t cf_sem; \ void* cf_signals[2]; \ @@ -47,10 +47,10 @@ char* realpath; \ int realpath_len; \ int cf_flags; \ - void* cf_eventstream; \ + void* cf_event; \ uv_async_t* cf_cb; \ - void* cf_events[2]; \ - uv_sem_t cf_sem; \ + void* cf_member[2]; \ + uv_sem_t _cf_reserved; \ uv_mutex_t cf_mutex; \ #define UV_STREAM_PRIVATE_PLATFORM_FIELDS \ diff --git a/src/unix/darwin.c b/src/unix/darwin.c index 8a9b4bab63..a03ef2a9e0 100644 --- a/src/unix/darwin.c +++ b/src/unix/darwin.c @@ -38,7 +38,7 @@ int uv__platform_loop_init(uv_loop_t* loop, int default_loop) { - loop->cf_loop = NULL; + loop->cf_state = NULL; if (uv__kqueue_init(loop)) return -errno; diff --git a/src/unix/fsevents.c b/src/unix/fsevents.c index 79ad198bae..2b0b1af973 100644 --- a/src/unix/fsevents.c +++ b/src/unix/fsevents.c @@ -49,8 +49,20 @@ void uv__fsevents_loop_delete(uv_loop_t* loop) { typedef struct uv__fsevents_event_s uv__fsevents_event_t; typedef struct uv__cf_loop_signal_s uv__cf_loop_signal_t; +typedef struct uv__cf_loop_state_s uv__cf_loop_state_t; typedef void (*cf_loop_signal_cb)(void* arg); +struct uv__cf_loop_state_s { + CFRunLoopRef loop; + CFRunLoopSourceRef signal_source; + volatile int fsevent_need_reschedule; + FSEventStreamRef fsevent_stream; + uv_sem_t fsevent_sem; + uv_mutex_t fsevent_mutex; + void* fsevent_handles[2]; + int fsevent_handle_count; +}; + struct uv__cf_loop_signal_s { cf_loop_signal_cb cb; QUEUE member; @@ -59,10 +71,26 @@ struct uv__cf_loop_signal_s { struct uv__fsevents_event_s { int events; - QUEUE member; + void* next; char path[1]; }; +static const int kFSEventsModified = kFSEventStreamEventFlagItemFinderInfoMod | + kFSEventStreamEventFlagItemModified | + kFSEventStreamEventFlagItemInodeMetaMod | + kFSEventStreamEventFlagItemChangeOwner | + kFSEventStreamEventFlagItemXattrMod; +static const int kFSEventsRenamed = kFSEventStreamEventFlagItemCreated | + kFSEventStreamEventFlagItemRemoved | + kFSEventStreamEventFlagItemRenamed; +static const int kFSEventsSystem = kFSEventStreamEventFlagUserDropped | + kFSEventStreamEventFlagKernelDropped | + kFSEventStreamEventFlagEventIdsWrapped | + kFSEventStreamEventFlagHistoryDone | + kFSEventStreamEventFlagMount | + kFSEventStreamEventFlagUnmount | + kFSEventStreamEventFlagRootChanged; + /* Forward declarations */ static void uv__cf_loop_cb(void* arg); static void* uv__cf_loop_runner(void* arg); @@ -72,26 +100,21 @@ static void uv__cf_loop_signal(uv_loop_t* loop, #define UV__FSEVENTS_WALK(handle, block) \ { \ - QUEUE* curr; \ - QUEUE split_head; \ uv__fsevents_event_t* event; \ + uv__fsevents_event_t* next; \ uv_mutex_lock(&(handle)->cf_mutex); \ - QUEUE_INIT(&split_head); \ - if (!QUEUE_EMPTY(&(handle)->cf_events)) { \ - QUEUE* split_pos = QUEUE_HEAD(&(handle)->cf_events); \ - QUEUE_SPLIT(&(handle)->cf_events, split_pos, &split_head); \ - } \ + event = (handle)->cf_event; \ + (handle)->cf_event = NULL; \ uv_mutex_unlock(&(handle)->cf_mutex); \ - while (!QUEUE_EMPTY(&split_head)) { \ - curr = QUEUE_HEAD(&split_head); \ + while (event != NULL) { \ /* Invoke callback */ \ - event = QUEUE_DATA(curr, uv__fsevents_event_t, member); \ - QUEUE_REMOVE(curr); \ /* Invoke block code, but only if handle wasn't closed */ \ if (((handle)->flags & (UV_CLOSING | UV_CLOSED)) == 0) \ block \ /* Free allocated data */ \ + next = event->next; \ free(event); \ + event = next; \ } \ } @@ -125,42 +148,35 @@ static void uv__fsevents_event_cb(ConstFSEventStreamRef streamRef, char* path; char* pos; uv_fs_event_t* handle; + QUEUE* q; + uv_loop_t* loop; + uv__cf_loop_state_t* state; uv__fsevents_event_t* event; - QUEUE add_list; - int kFSEventsModified; - int kFSEventsRenamed; - - kFSEventsModified = kFSEventStreamEventFlagItemFinderInfoMod | - kFSEventStreamEventFlagItemModified | - kFSEventStreamEventFlagItemInodeMetaMod | - kFSEventStreamEventFlagItemChangeOwner | - kFSEventStreamEventFlagItemXattrMod; - kFSEventsRenamed = kFSEventStreamEventFlagItemCreated | - kFSEventStreamEventFlagItemRemoved | - kFSEventStreamEventFlagItemRenamed; - - handle = info; + uv__fsevents_event_t* tail; + + loop = info; + state = loop->cf_state; + assert(state != NULL); paths = eventPaths; - QUEUE_INIT(&add_list); - - for (i = 0; i < numEvents; i++) { - /* Ignore system events */ - if (eventFlags[i] & (kFSEventStreamEventFlagUserDropped | - kFSEventStreamEventFlagKernelDropped | - kFSEventStreamEventFlagEventIdsWrapped | - kFSEventStreamEventFlagHistoryDone | - kFSEventStreamEventFlagMount | - kFSEventStreamEventFlagUnmount | - kFSEventStreamEventFlagRootChanged)) { - continue; - } - /* TODO: Report errors */ - path = paths[i]; - len = strlen(path); + /* For each handle */ + QUEUE_FOREACH(q, &state->fsevent_handles) { + handle = QUEUE_DATA(q, uv_fs_event_t, cf_member); + tail = NULL; + + /* Process and filter out events */ + for (i = 0; i < numEvents; i++) { + /* Ignore system events */ + if (eventFlags[i] & kFSEventsSystem) + continue; + + path = paths[i]; + len = strlen(path); + + /* Filter out paths that are outside hanlde's request */ + if (strstr(path, handle->realpath) != path) + continue; - /* Remove absolute path prefix */ - if (strstr(path, handle->realpath) == path) { path += handle->realpath_len; len -= handle->realpath_len; @@ -169,79 +185,79 @@ static void uv__fsevents_event_cb(ConstFSEventStreamRef streamRef, path++; len--; } - } #ifdef MAC_OS_X_VERSION_10_7 - /* Ignore events with path equal to directory itself */ - if (len == 0) - continue; + /* Ignore events with path equal to directory itself */ + if (len == 0) + continue; #endif /* MAC_OS_X_VERSION_10_7 */ - /* Do not emit events from subdirectories (without option set) */ - pos = strchr(path, '/'); - if ((handle->cf_flags & UV_FS_EVENT_RECURSIVE) == 0 && - pos != NULL && - pos != path + 1) - continue; + /* Do not emit events from subdirectories (without option set) */ + pos = strchr(path, '/'); + if ((handle->cf_flags & UV_FS_EVENT_RECURSIVE) == 0 && + pos != NULL && + pos != path + 1) + continue; #ifndef MAC_OS_X_VERSION_10_7 - path = ""; - len = 0; + path = ""; + len = 0; #endif /* MAC_OS_X_VERSION_10_7 */ - event = malloc(sizeof(*event) + len); - if (event == NULL) - break; + event = malloc(sizeof(*event) + len); + if (event == NULL) + break; - memcpy(event->path, path, len + 1); + memset(event, 0, sizeof(*event)); + memcpy(event->path, path, len + 1); - if ((eventFlags[i] & kFSEventsModified) != 0 && - (eventFlags[i] & kFSEventsRenamed) == 0) - event->events = UV_CHANGE; - else - event->events = UV_RENAME; + if ((eventFlags[i] & kFSEventsModified) != 0 && + (eventFlags[i] & kFSEventsRenamed) == 0) + event->events = UV_CHANGE; + else + event->events = UV_RENAME; - QUEUE_INSERT_TAIL(&add_list, &event->member); - } - uv_mutex_lock(&handle->cf_mutex); - QUEUE_ADD(&handle->cf_events, &add_list); - uv_mutex_unlock(&handle->cf_mutex); + if (tail != NULL) + tail->next = event; + tail = event; + } - uv_async_send(handle->cf_cb); + if (tail != NULL) { + uv_mutex_lock(&handle->cf_mutex); + tail->next = handle->cf_event; + handle->cf_event = tail; + uv_mutex_unlock(&handle->cf_mutex); + + uv_async_send(handle->cf_cb); + } + } } -static void uv__fsevents_schedule(void* arg) { - uv_fs_event_t* handle; +static void uv__fsevents_create_stream(uv_loop_t* loop, CFArrayRef paths) { + uv__cf_loop_state_t* state; FSEventStreamContext ctx; FSEventStreamRef ref; - CFStringRef path; - CFArrayRef paths; CFAbsoluteTime latency; FSEventStreamCreateFlags flags; - handle = arg; - /* Initialize context */ ctx.version = 0; - ctx.info = handle; + ctx.info = loop; ctx.retain = NULL; ctx.release = NULL; ctx.copyDescription = NULL; - /* Initialize paths array */ - path = CFStringCreateWithCString(NULL, - handle->filename, - CFStringGetSystemEncoding()); - assert(path != NULL); - paths = CFArrayCreate(NULL, (const void**)&path, 1, NULL); - assert(paths != NULL); - latency = 0.15; /* Set appropriate flags */ flags = kFSEventStreamCreateFlagFileEvents; + /* + * NOTE: While we definitely could remember last event id, + * this will not be a right solution here. Because we might be watching + * different paths after reschedule. + */ ref = FSEventStreamCreate(NULL, &uv__fsevents_event_cb, &ctx, @@ -250,43 +266,113 @@ static void uv__fsevents_schedule(void* arg) { latency, flags); assert(ref != NULL); - handle->cf_eventstream = ref; - FSEventStreamScheduleWithRunLoop(handle->cf_eventstream, - handle->loop->cf_loop, + state = loop->cf_state; + FSEventStreamScheduleWithRunLoop(ref, + state->loop, kCFRunLoopDefaultMode); - if (!FSEventStreamStart(handle->cf_eventstream)) + if (!FSEventStreamStart(ref)) abort(); + + state->fsevent_stream = ref; } -static void uv__fsevents_unschedule(void* arg) { - uv_fs_event_t* handle; +static void uv__fsevents_destroy_stream(uv_loop_t* loop) { + uv__cf_loop_state_t* state; - handle = arg; + state = loop->cf_state; + + if (state->fsevent_stream == NULL) + return; + + /* Flush all accumulated events */ + FSEventStreamFlushSync(state->fsevent_stream); /* Stop emitting events */ - FSEventStreamStop(handle->cf_eventstream); + FSEventStreamStop(state->fsevent_stream); /* Release stream */ - FSEventStreamInvalidate(handle->cf_eventstream); - FSEventStreamRelease(handle->cf_eventstream); - handle->cf_eventstream = NULL; + FSEventStreamInvalidate(state->fsevent_stream); + FSEventStreamRelease(state->fsevent_stream); + state->fsevent_stream = NULL; +} + + +static void uv__fsevents_reschedule(void* arg) { + uv__cf_loop_state_t* state; + uv_fs_event_t* handle; + QUEUE* q; + uv_fs_event_t* curr; + CFArrayRef cf_paths; + CFStringRef* paths; + int i; + int path_count; + + handle = arg; + state = handle->loop->cf_state; + + /* Optimization to prevent O(n^2) time spent when starting to watch + * many files simultaneously + */ + if (!state->fsevent_need_reschedule) + return; + state->fsevent_need_reschedule = 0; + + /* Destroy previous FSEventStream */ + uv__fsevents_destroy_stream(handle->loop); + + /* Create list of all watched paths */ + uv_mutex_lock(&state->fsevent_mutex); + path_count = state->fsevent_handle_count; + paths = malloc(sizeof(*paths) * path_count); + if (paths == NULL) + abort(); + + q = &state->fsevent_handles; + for (i = 0; i < path_count; i++) { + q = QUEUE_NEXT(q); + assert(q != &state->fsevent_handles); + curr = QUEUE_DATA(q, uv_fs_event_t, cf_member); + + assert(curr->realpath != NULL); + paths[i] = CFStringCreateWithCString(NULL, + curr->realpath, + CFStringGetSystemEncoding()); + if (paths[i] == NULL) + abort(); + } + uv_mutex_unlock(&state->fsevent_mutex); + + if (path_count != 0) { + /* Create new FSEventStream */ + cf_paths = CFArrayCreate(NULL, (const void**) paths, path_count, NULL); + if (cf_paths == NULL) + abort(); + uv__fsevents_create_stream(handle->loop, cf_paths); + } /* Notify main thread that we're done here */ - uv_sem_post(&handle->cf_sem); + if ((handle->flags & (UV_CLOSING | UV_CLOSED)) != 0) + uv_sem_post(&state->fsevent_sem); } static int uv__fsevents_loop_init(uv_loop_t* loop) { CFRunLoopSourceContext ctx; + uv__cf_loop_state_t* state; pthread_attr_t attr_storage; pthread_attr_t* attr; int err; - if (loop->cf_loop != NULL) + if (loop->cf_state != NULL) return 0; + state = malloc(sizeof(*state)); + if (state == NULL) + return -ENOMEM; + memset(state, 0, sizeof(*state)); + err = uv_mutex_init(&loop->cf_mutex); if (err) return err; @@ -296,10 +382,27 @@ static int uv__fsevents_loop_init(uv_loop_t* loop) { goto fail_sem_init; QUEUE_INIT(&loop->cf_signals); + + err = uv_sem_init(&state->fsevent_sem, 0); + if (err) + goto fail_fsevent_sem_init; + + err = uv_mutex_init(&state->fsevent_mutex); + if (err) + goto fail_fsevent_mutex_init; + + QUEUE_INIT(&state->fsevent_handles); + state->fsevent_need_reschedule = 0; + state->fsevent_handle_count = 0; + memset(&ctx, 0, sizeof(ctx)); ctx.info = loop; ctx.perform = uv__cf_loop_cb; - loop->cf_cb = CFRunLoopSourceCreate(NULL, 0, &ctx); + state->signal_source = CFRunLoopSourceCreate(NULL, 0, &ctx); + if (state->signal_source == NULL) { + err = -ENOMEM; + goto fail_signal_source_create; + } /* In the unlikely event that pthread_attr_init() fails, create the thread * with the default stack size. We'll use a little more address space but @@ -313,6 +416,8 @@ static int uv__fsevents_loop_init(uv_loop_t* loop) { if (pthread_attr_setstacksize(attr, 3 * PTHREAD_STACK_MIN)) abort(); + loop->cf_state = state; + /* uv_thread_t is an alias for pthread_t. */ err = -pthread_create(&loop->cf_thread, attr, uv__cf_loop_runner, loop); @@ -324,23 +429,33 @@ static int uv__fsevents_loop_init(uv_loop_t* loop) { /* Synchronize threads */ uv_sem_wait(&loop->cf_sem); - assert(loop->cf_loop != NULL); return 0; fail_thread_create: + loop->cf_state = NULL; + +fail_signal_source_create: + uv_mutex_destroy(&state->fsevent_mutex); + +fail_fsevent_mutex_init: + uv_sem_destroy(&state->fsevent_sem); + +fail_fsevent_sem_init: uv_sem_destroy(&loop->cf_sem); fail_sem_init: uv_mutex_destroy(&loop->cf_mutex); + free(state); return err; } void uv__fsevents_loop_delete(uv_loop_t* loop) { uv__cf_loop_signal_t* s; + uv__cf_loop_state_t* state; QUEUE* q; - if (loop->cf_loop == NULL) + if (loop->cf_state == NULL) return; uv__cf_loop_signal(loop, NULL, NULL); @@ -355,24 +470,34 @@ void uv__fsevents_loop_delete(uv_loop_t* loop) { QUEUE_REMOVE(q); free(s); } + + /* Destroy state */ + state = loop->cf_state; + uv_sem_destroy(&state->fsevent_sem); + uv_mutex_destroy(&state->fsevent_mutex); + CFRelease(state->signal_source); + free(state); + loop->cf_state = NULL; } static void* uv__cf_loop_runner(void* arg) { uv_loop_t* loop; + uv__cf_loop_state_t* state; loop = arg; - loop->cf_loop = CFRunLoopGetCurrent(); + state = loop->cf_state; + state->loop = CFRunLoopGetCurrent(); - CFRunLoopAddSource(loop->cf_loop, - loop->cf_cb, + CFRunLoopAddSource(state->loop, + state->signal_source, kCFRunLoopDefaultMode); uv_sem_post(&loop->cf_sem); CFRunLoopRun(); - CFRunLoopRemoveSource(loop->cf_loop, - loop->cf_cb, + CFRunLoopRemoveSource(state->loop, + state->signal_source, kCFRunLoopDefaultMode); return NULL; @@ -381,11 +506,13 @@ static void* uv__cf_loop_runner(void* arg) { static void uv__cf_loop_cb(void* arg) { uv_loop_t* loop; + uv__cf_loop_state_t* state; QUEUE* item; QUEUE split_head; uv__cf_loop_signal_t* s; loop = arg; + state = loop->cf_state; uv_mutex_lock(&loop->cf_mutex); QUEUE_INIT(&split_head); @@ -402,7 +529,7 @@ static void uv__cf_loop_cb(void* arg) { /* This was a termination signal */ if (s->cb == NULL) - CFRunLoopStop(loop->cf_loop); + CFRunLoopStop(state->loop); else s->cb(s->arg); @@ -414,6 +541,7 @@ static void uv__cf_loop_cb(void* arg) { void uv__cf_loop_signal(uv_loop_t* loop, cf_loop_signal_cb cb, void* arg) { uv__cf_loop_signal_t* item; + uv__cf_loop_state_t* state; item = malloc(sizeof(*item)); /* XXX: Fail */ @@ -427,14 +555,16 @@ void uv__cf_loop_signal(uv_loop_t* loop, cf_loop_signal_cb cb, void* arg) { QUEUE_INSERT_TAIL(&loop->cf_signals, &item->member); uv_mutex_unlock(&loop->cf_mutex); - assert(loop->cf_loop != NULL); - CFRunLoopSourceSignal(loop->cf_cb); - CFRunLoopWakeUp(loop->cf_loop); + state = loop->cf_state; + assert(state != NULL); + CFRunLoopSourceSignal(state->signal_source); + CFRunLoopWakeUp(state->loop); } int uv__fsevents_init(uv_fs_event_t* handle) { int err; + uv__cf_loop_state_t* state; err = uv__fsevents_loop_init(handle->loop); if (err) @@ -442,10 +572,13 @@ int uv__fsevents_init(uv_fs_event_t* handle) { /* Get absolute path to file */ handle->realpath = realpath(handle->filename, NULL); - if (handle->realpath != NULL) - handle->realpath_len = strlen(handle->realpath); + if (handle->realpath == NULL) + return -errno; + handle->realpath_len = strlen(handle->realpath); + + /* Initialize signly-linked list */ + handle->cf_event = NULL; - handle->cf_eventstream = NULL; /* * Events will occur in other thread. * Initialize callback for getting them back into event loop's thread @@ -459,24 +592,44 @@ int uv__fsevents_init(uv_fs_event_t* handle) { handle->cf_cb->flags |= UV__HANDLE_INTERNAL; uv_unref((uv_handle_t*) handle->cf_cb); + /* TODO(indutny): check return value */ uv_mutex_init(&handle->cf_mutex); - uv_sem_init(&handle->cf_sem, 0); - QUEUE_INIT(&handle->cf_events); - uv__cf_loop_signal(handle->loop, uv__fsevents_schedule, handle); + /* Insert handle into the list */ + state = handle->loop->cf_state; + uv_mutex_lock(&state->fsevent_mutex); + QUEUE_INIT(&handle->cf_member); + QUEUE_INSERT_TAIL(&state->fsevent_handles, &handle->cf_member); + state->fsevent_handle_count++; + state->fsevent_need_reschedule = 1; + uv_mutex_unlock(&state->fsevent_mutex); + + /* Reschedule FSEventStream */ + uv__cf_loop_signal(handle->loop, uv__fsevents_reschedule, handle); return 0; } int uv__fsevents_close(uv_fs_event_t* handle) { + uv__cf_loop_state_t* state; + if (handle->cf_cb == NULL) return -EINVAL; - uv__cf_loop_signal(handle->loop, uv__fsevents_unschedule, handle); + /* Remove handle from the list */ + state = handle->loop->cf_state; + uv_mutex_lock(&state->fsevent_mutex); + QUEUE_REMOVE(&handle->cf_member); + state->fsevent_handle_count--; + state->fsevent_need_reschedule = 1; + uv_mutex_unlock(&state->fsevent_mutex); + + /* Reschedule FSEventStream */ + uv__cf_loop_signal(handle->loop, uv__fsevents_reschedule, handle); /* Wait for deinitialization */ - uv_sem_wait(&handle->cf_sem); + uv_sem_wait(&state->fsevent_sem); uv_close((uv_handle_t*) handle->cf_cb, (uv_close_cb) free); handle->cf_cb = NULL; @@ -487,7 +640,6 @@ int uv__fsevents_close(uv_fs_event_t* handle) { }) uv_mutex_destroy(&handle->cf_mutex); - uv_sem_destroy(&handle->cf_sem); free(handle->realpath); handle->realpath = NULL; handle->realpath_len = 0; diff --git a/src/unix/kqueue.c b/src/unix/kqueue.c index 2c68bf36f2..391ab615de 100644 --- a/src/unix/kqueue.c +++ b/src/unix/kqueue.c @@ -320,7 +320,6 @@ int uv_fs_event_init(uv_loop_t* loop, #if defined(__APPLE__) /* Nullify field to perform checks later */ handle->cf_cb = NULL; - handle->cf_eventstream = NULL; handle->realpath = NULL; handle->realpath_len = 0; handle->cf_flags = flags; From 3261ec1e23ea1e4470d551908a456ae18aebb99e Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Wed, 21 Aug 2013 21:06:04 +0400 Subject: [PATCH 2/3] fsevents: rename WALK => PROCESS It isn't really walking through events, because it changes (resets) them. Per @bnoordhuis and @tjfontaine suggestion. --- src/unix/fsevents.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/unix/fsevents.c b/src/unix/fsevents.c index 2b0b1af973..b7cc488dcf 100644 --- a/src/unix/fsevents.c +++ b/src/unix/fsevents.c @@ -98,7 +98,7 @@ static void uv__cf_loop_signal(uv_loop_t* loop, cf_loop_signal_cb cb, void* arg); -#define UV__FSEVENTS_WALK(handle, block) \ +#define UV__FSEVENTS_PROCESS(handle, block) \ { \ uv__fsevents_event_t* event; \ uv__fsevents_event_t* next; \ @@ -124,7 +124,7 @@ static void uv__fsevents_cb(uv_async_t* cb, int status) { handle = cb->data; - UV__FSEVENTS_WALK(handle, { + UV__FSEVENTS_PROCESS(handle, { if (handle->event_watcher.fd != -1) handle->cb(handle, event->path[0] ? event->path : NULL, event->events, 0); }); @@ -635,7 +635,7 @@ int uv__fsevents_close(uv_fs_event_t* handle) { handle->cf_cb = NULL; /* Free data in queue */ - UV__FSEVENTS_WALK(handle, { + UV__FSEVENTS_PROCESS(handle, { /* NOP */ }) From df6309a37cf8a60216194e1de683a40322b4ae09 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Thu, 22 Aug 2013 00:06:47 +0400 Subject: [PATCH 3/3] fixes --- src/unix/fsevents.c | 143 +++++++++++++++++++++++++++----------------- 1 file changed, 89 insertions(+), 54 deletions(-) diff --git a/src/unix/fsevents.c b/src/unix/fsevents.c index b7cc488dcf..5196b9f1b0 100644 --- a/src/unix/fsevents.c +++ b/src/unix/fsevents.c @@ -50,7 +50,6 @@ void uv__fsevents_loop_delete(uv_loop_t* loop) { typedef struct uv__fsevents_event_s uv__fsevents_event_t; typedef struct uv__cf_loop_signal_s uv__cf_loop_signal_t; typedef struct uv__cf_loop_state_s uv__cf_loop_state_t; -typedef void (*cf_loop_signal_cb)(void* arg); struct uv__cf_loop_state_s { CFRunLoopRef loop; @@ -64,7 +63,7 @@ struct uv__cf_loop_state_s { }; struct uv__cf_loop_signal_s { - cf_loop_signal_cb cb; + int last; QUEUE member; void* arg; }; @@ -94,9 +93,7 @@ static const int kFSEventsSystem = kFSEventStreamEventFlagUserDropped | /* Forward declarations */ static void uv__cf_loop_cb(void* arg); static void* uv__cf_loop_runner(void* arg); -static void uv__cf_loop_signal(uv_loop_t* loop, - cf_loop_signal_cb cb, - void* arg); +static int uv__cf_loop_signal(uv_loop_t* loop, int last, void* arg); #define UV__FSEVENTS_PROCESS(handle, block) \ { \ @@ -109,7 +106,7 @@ static void uv__cf_loop_signal(uv_loop_t* loop, while (event != NULL) { \ /* Invoke callback */ \ /* Invoke block code, but only if handle wasn't closed */ \ - if (((handle)->flags & (UV_CLOSING | UV_CLOSED)) == 0) \ + if (!uv__is_closing((handle))) \ block \ /* Free allocated data */ \ next = event->next; \ @@ -119,6 +116,7 @@ static void uv__cf_loop_signal(uv_loop_t* loop, } +/* Runs in UV loop's thread, when there're events to report to handle */ static void uv__fsevents_cb(uv_async_t* cb, int status) { uv_fs_event_t* handle; @@ -129,13 +127,12 @@ static void uv__fsevents_cb(uv_async_t* cb, int status) { handle->cb(handle, event->path[0] ? event->path : NULL, event->events, 0); }); - if ((handle->flags & (UV_CLOSING | UV_CLOSED)) == 0 && - handle->event_watcher.fd == -1) { + if (!uv__is_closing(handle) && handle->event_watcher.fd == -1) uv__fsevents_close(handle); - } } +/* Runs in CF loop, when there're events in FSEventStream */ static void uv__fsevents_event_cb(ConstFSEventStreamRef streamRef, void* info, size_t numEvents, @@ -173,8 +170,8 @@ static void uv__fsevents_event_cb(ConstFSEventStreamRef streamRef, path = paths[i]; len = strlen(path); - /* Filter out paths that are outside hanlde's request */ - if (strstr(path, handle->realpath) != path) + /* Filter out paths that are outside handle's request */ + if (strncmp(path, handle->realpath, handle->realpath_len) != 0) continue; path += handle->realpath_len; @@ -193,11 +190,11 @@ static void uv__fsevents_event_cb(ConstFSEventStreamRef streamRef, #endif /* MAC_OS_X_VERSION_10_7 */ /* Do not emit events from subdirectories (without option set) */ - pos = strchr(path, '/'); - if ((handle->cf_flags & UV_FS_EVENT_RECURSIVE) == 0 && - pos != NULL && - pos != path + 1) - continue; + if ((handle->cf_flags & UV_FS_EVENT_RECURSIVE) == 0) { + pos = strchr(path, '/'); + if (pos != NULL && pos != path + 1) + continue; + } #ifndef MAC_OS_X_VERSION_10_7 path = ""; @@ -234,6 +231,7 @@ static void uv__fsevents_event_cb(ConstFSEventStreamRef streamRef, } +/* Runs in CF loop */ static void uv__fsevents_create_stream(uv_loop_t* loop, CFArrayRef paths) { uv__cf_loop_state_t* state; FSEventStreamContext ctx; @@ -254,9 +252,10 @@ static void uv__fsevents_create_stream(uv_loop_t* loop, CFArrayRef paths) { flags = kFSEventStreamCreateFlagFileEvents; /* - * NOTE: While we definitely could remember last event id, - * this will not be a right solution here. Because we might be watching - * different paths after reschedule. + * NOTE: It might sound like a good idea to remember last seen StreamEventId, + * but in reality one dir might have last StreamEventId less than, the other + * that is being watched now. Which will cause FSEventStream API to report + * changes to files from the past. */ ref = FSEventStreamCreate(NULL, &uv__fsevents_event_cb, @@ -278,6 +277,7 @@ static void uv__fsevents_create_stream(uv_loop_t* loop, CFArrayRef paths) { } +/* Runs in CF loop */ static void uv__fsevents_destroy_stream(uv_loop_t* loop) { uv__cf_loop_state_t* state; @@ -299,6 +299,7 @@ static void uv__fsevents_destroy_stream(uv_loop_t* loop) { } +/* Runs in CF loop, when there're new fsevent handles to add to stream */ static void uv__fsevents_reschedule(void* arg) { uv__cf_loop_state_t* state; uv_fs_event_t* handle; @@ -325,22 +326,24 @@ static void uv__fsevents_reschedule(void* arg) { /* Create list of all watched paths */ uv_mutex_lock(&state->fsevent_mutex); path_count = state->fsevent_handle_count; - paths = malloc(sizeof(*paths) * path_count); - if (paths == NULL) - abort(); - - q = &state->fsevent_handles; - for (i = 0; i < path_count; i++) { - q = QUEUE_NEXT(q); - assert(q != &state->fsevent_handles); - curr = QUEUE_DATA(q, uv_fs_event_t, cf_member); - - assert(curr->realpath != NULL); - paths[i] = CFStringCreateWithCString(NULL, - curr->realpath, - CFStringGetSystemEncoding()); - if (paths[i] == NULL) + if (path_count != 0) { + paths = malloc(sizeof(*paths) * path_count); + if (paths == NULL) abort(); + + q = &state->fsevent_handles; + for (i = 0; i < path_count; i++) { + q = QUEUE_NEXT(q); + assert(q != &state->fsevent_handles); + curr = QUEUE_DATA(q, uv_fs_event_t, cf_member); + + assert(curr->realpath != NULL); + paths[i] = CFStringCreateWithCString(NULL, + curr->realpath, + CFStringGetSystemEncoding()); + if (paths[i] == NULL) + abort(); + } } uv_mutex_unlock(&state->fsevent_mutex); @@ -352,12 +355,16 @@ static void uv__fsevents_reschedule(void* arg) { uv__fsevents_create_stream(handle->loop, cf_paths); } - /* Notify main thread that we're done here */ - if ((handle->flags & (UV_CLOSING | UV_CLOSED)) != 0) + /* + * Main thread will block until the removal of handle from the list, + * we must tell it when we're ready + */ + if (uv__is_closing(handle)) uv_sem_post(&state->fsevent_sem); } +/* Runs in UV loop */ static int uv__fsevents_loop_init(uv_loop_t* loop) { CFRunLoopSourceContext ctx; uv__cf_loop_state_t* state; @@ -368,10 +375,9 @@ static int uv__fsevents_loop_init(uv_loop_t* loop) { if (loop->cf_state != NULL) return 0; - state = malloc(sizeof(*state)); + state = calloc(1, sizeof(*state)); if (state == NULL) return -ENOMEM; - memset(state, 0, sizeof(*state)); err = uv_mutex_init(&loop->cf_mutex); if (err) @@ -450,6 +456,7 @@ static int uv__fsevents_loop_init(uv_loop_t* loop) { } +/* Runs in UV loop */ void uv__fsevents_loop_delete(uv_loop_t* loop) { uv__cf_loop_signal_t* s; uv__cf_loop_state_t* state; @@ -458,7 +465,9 @@ void uv__fsevents_loop_delete(uv_loop_t* loop) { if (loop->cf_state == NULL) return; - uv__cf_loop_signal(loop, NULL, NULL); + if (uv__cf_loop_signal(loop, 1, NULL) != 0) + abort(); + uv_thread_join(&loop->cf_thread); uv_sem_destroy(&loop->cf_sem); uv_mutex_destroy(&loop->cf_mutex); @@ -481,6 +490,7 @@ void uv__fsevents_loop_delete(uv_loop_t* loop) { } +/* Runs in CF loop. This is a CF loop's body */ static void* uv__cf_loop_runner(void* arg) { uv_loop_t* loop; uv__cf_loop_state_t* state; @@ -504,6 +514,7 @@ static void* uv__cf_loop_runner(void* arg) { } +/* Runs in CF loop, executed after `uv__cf_loop_signal()` */ static void uv__cf_loop_cb(void* arg) { uv_loop_t* loop; uv__cf_loop_state_t* state; @@ -513,9 +524,9 @@ static void uv__cf_loop_cb(void* arg) { loop = arg; state = loop->cf_state; + QUEUE_INIT(&split_head); uv_mutex_lock(&loop->cf_mutex); - QUEUE_INIT(&split_head); if (!QUEUE_EMPTY(&loop->cf_signals)) { QUEUE* split_pos = QUEUE_HEAD(&loop->cf_signals); QUEUE_SPLIT(&loop->cf_signals, split_pos, &split_head); @@ -528,10 +539,10 @@ static void uv__cf_loop_cb(void* arg) { s = QUEUE_DATA(item, uv__cf_loop_signal_t, member); /* This was a termination signal */ - if (s->cb == NULL) + if (s->last) CFRunLoopStop(state->loop); else - s->cb(s->arg); + uv__fsevents_reschedule(s->arg); QUEUE_REMOVE(item); free(s); @@ -539,17 +550,17 @@ static void uv__cf_loop_cb(void* arg) { } -void uv__cf_loop_signal(uv_loop_t* loop, cf_loop_signal_cb cb, void* arg) { +/* Runs in UV loop to notify CF loop */ +int uv__cf_loop_signal(uv_loop_t* loop, int last, void* arg) { uv__cf_loop_signal_t* item; uv__cf_loop_state_t* state; item = malloc(sizeof(*item)); - /* XXX: Fail */ if (item == NULL) - abort(); + return -ENOMEM; item->arg = arg; - item->cb = cb; + item->last = last; uv_mutex_lock(&loop->cf_mutex); QUEUE_INSERT_TAIL(&loop->cf_signals, &item->member); @@ -559,9 +570,12 @@ void uv__cf_loop_signal(uv_loop_t* loop, cf_loop_signal_cb cb, void* arg) { assert(state != NULL); CFRunLoopSourceSignal(state->signal_source); CFRunLoopWakeUp(state->loop); + + return 0; } +/* Runs in UV loop to initialize handle */ int uv__fsevents_init(uv_fs_event_t* handle) { int err; uv__cf_loop_state_t* state; @@ -576,7 +590,7 @@ int uv__fsevents_init(uv_fs_event_t* handle) { return -errno; handle->realpath_len = strlen(handle->realpath); - /* Initialize signly-linked list */ + /* Initialize singly-linked list */ handle->cf_event = NULL; /* @@ -584,34 +598,53 @@ int uv__fsevents_init(uv_fs_event_t* handle) { * Initialize callback for getting them back into event loop's thread */ handle->cf_cb = malloc(sizeof(*handle->cf_cb)); - if (handle->cf_cb == NULL) - return -ENOMEM; + if (handle->cf_cb == NULL) { + err = -ENOMEM; + goto fail_cf_cb_malloc; + } handle->cf_cb->data = handle; uv_async_init(handle->loop, handle->cf_cb, uv__fsevents_cb); handle->cf_cb->flags |= UV__HANDLE_INTERNAL; uv_unref((uv_handle_t*) handle->cf_cb); - /* TODO(indutny): check return value */ - uv_mutex_init(&handle->cf_mutex); + err = uv_mutex_init(&handle->cf_mutex); + if (err) + goto fail_cf_mutex_init; /* Insert handle into the list */ state = handle->loop->cf_state; uv_mutex_lock(&state->fsevent_mutex); - QUEUE_INIT(&handle->cf_member); QUEUE_INSERT_TAIL(&state->fsevent_handles, &handle->cf_member); state->fsevent_handle_count++; state->fsevent_need_reschedule = 1; uv_mutex_unlock(&state->fsevent_mutex); /* Reschedule FSEventStream */ - uv__cf_loop_signal(handle->loop, uv__fsevents_reschedule, handle); + err = uv__cf_loop_signal(handle->loop, 0, handle); + if (err) + goto fail_loop_signal; return 0; + +fail_loop_signal: + uv_mutex_destroy(&handle->cf_mutex); + +fail_cf_mutex_init: + free(handle->cf_cb); + handle->cf_cb = NULL; + +fail_cf_cb_malloc: + free(handle->realpath); + handle->realpath = NULL; + + return err; } +/* Runs in UV loop to de-initialize handle */ int uv__fsevents_close(uv_fs_event_t* handle) { + int err; uv__cf_loop_state_t* state; if (handle->cf_cb == NULL) @@ -626,7 +659,9 @@ int uv__fsevents_close(uv_fs_event_t* handle) { uv_mutex_unlock(&state->fsevent_mutex); /* Reschedule FSEventStream */ - uv__cf_loop_signal(handle->loop, uv__fsevents_reschedule, handle); + err = uv__cf_loop_signal(handle->loop, 0, handle); + if (err) + return -err; /* Wait for deinitialization */ uv_sem_wait(&state->fsevent_sem);