Skip to content

Commit

Permalink
use FSEventStreamSetDispatchQueue instead of deprecated FSEventStream…
Browse files Browse the repository at this point in the history
…ScheduleWithRunLoop (#62)
  • Loading branch information
emanuel-skrenkovic committed May 14, 2024
1 parent 1e71d45 commit f73112e
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 29 deletions.
5 changes: 3 additions & 2 deletions fsevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func DeviceForPath(path string) (int32, error) {
// ...
type EventStream struct {
stream fsEventStreamRef
rlref cfRunLoopRef
qref fsDispatchQueueRef
hasFinalizer bool
registryID uintptr
uuid string
Expand Down Expand Up @@ -164,8 +164,9 @@ func (es *EventStream) Flush(sync bool) {
// Stop stops listening to the event stream.
func (es *EventStream) Stop() {
if es.stream != nil {
stop(es.stream, es.rlref)
stop(es.stream, es.qref)
es.stream = nil
es.qref = nil
}

// Remove eventstream from the registry
Expand Down
5 changes: 5 additions & 0 deletions fsevents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ func TestMany(t *testing.T) {
t.Fatal(err)
}

var lock sync.Mutex
events := make(map[string]EventFlags, 810)

wait := make(chan struct{})
Expand All @@ -235,13 +236,17 @@ func TestMany(t *testing.T) {
for {
select {
case msg := <-es.Events:
lock.Lock()

for _, event := range msg {
if _, ok := events[event.Path]; !ok {
events[event.Path] = event.Flags
} else {
events[event.Path] = events[event.Path].set(event.Flags)
}
}

lock.Unlock()
case <-time.After(3 * time.Second):
wait <- struct{}{}
}
Expand Down
50 changes: 23 additions & 27 deletions wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ static FSEventStreamRef EventStreamCreate(FSEventStreamContext * context, uintpt
context->info = (void*) info;
return FSEventStreamCreate(NULL, (FSEventStreamCallback) fsevtCallback, context, paths, since, latency, flags);
}
static void DispatchQueueRelease(dispatch_queue_t queue) {
dispatch_release(queue);
}
*/
import "C"
import (
Expand Down Expand Up @@ -278,6 +282,8 @@ func fsevtCallback(stream C.FSEventStreamRef, info uintptr, numEvents C.size_t,
es.Events <- events
}

type fsDispatchQueueRef C.dispatch_queue_t

// fsEventStreamRef wraps C.FSEventStreamRef
type fsEventStreamRef C.FSEventStreamRef

Expand Down Expand Up @@ -349,9 +355,6 @@ func copyCFString(cfs C.CFStringRef) C.CFStringRef {
return C.CFStringCreateCopy(C.kCFAllocatorDefault, cfs)
}

// cfRunLoopRef wraps C.CFRunLoopRef
type cfRunLoopRef C.CFRunLoopRef

// EventIDForDeviceBeforeTime returns an event ID before a given time.
func EventIDForDeviceBeforeTime(dev int32, before time.Time) uint64 {
tm := C.CFAbsoluteTime(before.Unix())
Expand Down Expand Up @@ -429,26 +432,20 @@ func (es *EventStream) start(paths []string, callbackInfo uintptr) error {

es.stream = setupStream(paths, es.Flags, callbackInfo, since, es.Latency, es.Device)

started := make(chan error)

go func() {
runtime.LockOSThread()
es.rlref = cfRunLoopRef(C.CFRunLoopGetCurrent())
C.CFRetain(C.CFTypeRef(es.rlref))
C.FSEventStreamScheduleWithRunLoop(es.stream, C.CFRunLoopRef(es.rlref), C.kCFRunLoopDefaultMode)
if C.FSEventStreamStart(es.stream) == 0 {
// cleanup stream and runloop
C.FSEventStreamInvalidate(es.stream)
C.FSEventStreamRelease(es.stream)
C.CFRelease(C.CFTypeRef(es.rlref))
es.stream = nil
started <- fmt.Errorf("failed to start eventstream")
close(started)
return
}
close(started)
C.CFRunLoopRun()
}()
es.qref = fsDispatchQueueRef(C.dispatch_queue_create(nil, nil))
C.FSEventStreamSetDispatchQueue(es.stream, es.qref)

if C.FSEventStreamStart(es.stream) == 0 {
// cleanup stream
C.FSEventStreamInvalidate(es.stream)
C.FSEventStreamRelease(es.stream)
es.stream = nil

C.DispatchQueueRelease(es.qref)
es.qref = nil

return fmt.Errorf("failed to start eventstream")
}

if !es.hasFinalizer {
// TODO: There is no guarantee this run before program exit
Expand All @@ -457,7 +454,7 @@ func (es *EventStream) start(paths []string, callbackInfo uintptr) error {
es.hasFinalizer = true
}

return <-started
return nil
}

func finalizer(es *EventStream) {
Expand All @@ -476,10 +473,9 @@ func flush(stream fsEventStreamRef, sync bool) {
}

// stop requests fsevents stops streaming events
func stop(stream fsEventStreamRef, rlref cfRunLoopRef) {
func stop(stream fsEventStreamRef, qref fsDispatchQueueRef) {
C.FSEventStreamStop(stream)
C.FSEventStreamInvalidate(stream)
C.FSEventStreamRelease(stream)
C.CFRunLoopStop(C.CFRunLoopRef(rlref))
C.CFRelease(C.CFTypeRef(rlref))
C.DispatchQueueRelease(qref)
}

0 comments on commit f73112e

Please sign in to comment.