diff --git a/fsevents.go b/fsevents.go index 72ea28c..15c8bb6 100644 --- a/fsevents.go +++ b/fsevents.go @@ -54,7 +54,7 @@ func DeviceForPath(path string) (int32, error) { // ... type EventStream struct { stream fsEventStreamRef - rlref cfRunLoopRef + qref fsDispatchQueueRef hasFinalizer bool registryID uintptr uuid string @@ -164,8 +164,9 @@ func (es *EventStream) Flush(sync bool) { // Stop stops listening to the event stream. func (es *EventStream) Stop() { if es.stream != nil { - stop(es.stream, es.rlref) + stop(es.stream, es.qref) es.stream = nil + es.qref = nil } // Remove eventstream from the registry diff --git a/fsevents_test.go b/fsevents_test.go index 9b1c63c..7302d32 100644 --- a/fsevents_test.go +++ b/fsevents_test.go @@ -227,6 +227,7 @@ func TestMany(t *testing.T) { t.Fatal(err) } + var lock sync.Mutex events := make(map[string]EventFlags, 810) wait := make(chan struct{}) @@ -235,6 +236,8 @@ func TestMany(t *testing.T) { for { select { case msg := <-es.Events: + lock.Lock() + for _, event := range msg { if _, ok := events[event.Path]; !ok { events[event.Path] = event.Flags @@ -242,6 +245,8 @@ func TestMany(t *testing.T) { events[event.Path] = events[event.Path].set(event.Flags) } } + + lock.Unlock() case <-time.After(3 * time.Second): wait <- struct{}{} } diff --git a/wrap.go b/wrap.go index 6b8fe60..84aa6b0 100644 --- a/wrap.go +++ b/wrap.go @@ -22,6 +22,10 @@ static FSEventStreamRef EventStreamCreate(FSEventStreamContext * context, uintpt context->info = (void*) info; return FSEventStreamCreate(NULL, (FSEventStreamCallback) fsevtCallback, context, paths, since, latency, flags); } + +static void DispatchQueueRelease(dispatch_queue_t queue) { + dispatch_release(queue); +} */ import "C" import ( @@ -278,6 +282,8 @@ func fsevtCallback(stream C.FSEventStreamRef, info uintptr, numEvents C.size_t, es.Events <- events } +type fsDispatchQueueRef C.dispatch_queue_t + // fsEventStreamRef wraps C.FSEventStreamRef type fsEventStreamRef C.FSEventStreamRef @@ -349,9 +355,6 @@ func copyCFString(cfs C.CFStringRef) C.CFStringRef { return C.CFStringCreateCopy(C.kCFAllocatorDefault, cfs) } -// cfRunLoopRef wraps C.CFRunLoopRef -type cfRunLoopRef C.CFRunLoopRef - // EventIDForDeviceBeforeTime returns an event ID before a given time. func EventIDForDeviceBeforeTime(dev int32, before time.Time) uint64 { tm := C.CFAbsoluteTime(before.Unix()) @@ -429,26 +432,20 @@ func (es *EventStream) start(paths []string, callbackInfo uintptr) error { es.stream = setupStream(paths, es.Flags, callbackInfo, since, es.Latency, es.Device) - started := make(chan error) - - go func() { - runtime.LockOSThread() - es.rlref = cfRunLoopRef(C.CFRunLoopGetCurrent()) - C.CFRetain(C.CFTypeRef(es.rlref)) - C.FSEventStreamScheduleWithRunLoop(es.stream, C.CFRunLoopRef(es.rlref), C.kCFRunLoopDefaultMode) - if C.FSEventStreamStart(es.stream) == 0 { - // cleanup stream and runloop - C.FSEventStreamInvalidate(es.stream) - C.FSEventStreamRelease(es.stream) - C.CFRelease(C.CFTypeRef(es.rlref)) - es.stream = nil - started <- fmt.Errorf("failed to start eventstream") - close(started) - return - } - close(started) - C.CFRunLoopRun() - }() + es.qref = fsDispatchQueueRef(C.dispatch_queue_create(nil, nil)) + C.FSEventStreamSetDispatchQueue(es.stream, es.qref) + + if C.FSEventStreamStart(es.stream) == 0 { + // cleanup stream + C.FSEventStreamInvalidate(es.stream) + C.FSEventStreamRelease(es.stream) + es.stream = nil + + C.DispatchQueueRelease(es.qref) + es.qref = nil + + return fmt.Errorf("failed to start eventstream") + } if !es.hasFinalizer { // TODO: There is no guarantee this run before program exit @@ -457,7 +454,7 @@ func (es *EventStream) start(paths []string, callbackInfo uintptr) error { es.hasFinalizer = true } - return <-started + return nil } func finalizer(es *EventStream) { @@ -476,10 +473,9 @@ func flush(stream fsEventStreamRef, sync bool) { } // stop requests fsevents stops streaming events -func stop(stream fsEventStreamRef, rlref cfRunLoopRef) { +func stop(stream fsEventStreamRef, qref fsDispatchQueueRef) { C.FSEventStreamStop(stream) C.FSEventStreamInvalidate(stream) C.FSEventStreamRelease(stream) - C.CFRunLoopStop(C.CFRunLoopRef(rlref)) - C.CFRelease(C.CFTypeRef(rlref)) + C.DispatchQueueRelease(qref) }