Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use FSEventStreamSetDispatchQueue instead of deprecated FSEventStreamScheduleWithRunLoop #62

Merged
merged 5 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions fsevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func DeviceForPath(path string) (int32, error) {
// ...
type EventStream struct {
stream fsEventStreamRef
rlref cfRunLoopRef
qref fsDispatchQueueRef
hasFinalizer bool
registryID uintptr
uuid string
Expand Down Expand Up @@ -164,8 +164,9 @@ func (es *EventStream) Flush(sync bool) {
// Stop stops listening to the event stream.
func (es *EventStream) Stop() {
if es.stream != nil {
stop(es.stream, es.rlref)
stop(es.stream, es.qref)
es.stream = nil
es.qref = nil
}

// Remove eventstream from the registry
Expand Down
5 changes: 5 additions & 0 deletions fsevents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ func TestMany(t *testing.T) {
t.Fatal(err)
}

var lock sync.Mutex
events := make(map[string]EventFlags, 810)

wait := make(chan struct{})
Expand All @@ -235,13 +236,17 @@ func TestMany(t *testing.T) {
for {
select {
case msg := <-es.Events:
lock.Lock()

for _, event := range msg {
if _, ok := events[event.Path]; !ok {
events[event.Path] = event.Flags
} else {
events[event.Path] = events[event.Path].set(event.Flags)
}
}

lock.Unlock()
case <-time.After(3 * time.Second):
wait <- struct{}{}
}
Expand Down
50 changes: 23 additions & 27 deletions wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ static FSEventStreamRef EventStreamCreate(FSEventStreamContext * context, uintpt
context->info = (void*) info;
return FSEventStreamCreate(NULL, (FSEventStreamCallback) fsevtCallback, context, paths, since, latency, flags);
}

static void DispatchQueueRelease(dispatch_queue_t queue) {
dispatch_release(queue);
}
*/
import "C"
import (
Expand Down Expand Up @@ -278,6 +282,8 @@ func fsevtCallback(stream C.FSEventStreamRef, info uintptr, numEvents C.size_t,
es.Events <- events
}

type fsDispatchQueueRef C.dispatch_queue_t

// fsEventStreamRef wraps C.FSEventStreamRef
type fsEventStreamRef C.FSEventStreamRef

Expand Down Expand Up @@ -349,9 +355,6 @@ func copyCFString(cfs C.CFStringRef) C.CFStringRef {
return C.CFStringCreateCopy(C.kCFAllocatorDefault, cfs)
}

// cfRunLoopRef wraps C.CFRunLoopRef
type cfRunLoopRef C.CFRunLoopRef

// EventIDForDeviceBeforeTime returns an event ID before a given time.
func EventIDForDeviceBeforeTime(dev int32, before time.Time) uint64 {
tm := C.CFAbsoluteTime(before.Unix())
Expand Down Expand Up @@ -429,26 +432,20 @@ func (es *EventStream) start(paths []string, callbackInfo uintptr) error {

es.stream = setupStream(paths, es.Flags, callbackInfo, since, es.Latency, es.Device)

started := make(chan error)

go func() {
runtime.LockOSThread()
es.rlref = cfRunLoopRef(C.CFRunLoopGetCurrent())
C.CFRetain(C.CFTypeRef(es.rlref))
C.FSEventStreamScheduleWithRunLoop(es.stream, C.CFRunLoopRef(es.rlref), C.kCFRunLoopDefaultMode)
if C.FSEventStreamStart(es.stream) == 0 {
// cleanup stream and runloop
C.FSEventStreamInvalidate(es.stream)
C.FSEventStreamRelease(es.stream)
C.CFRelease(C.CFTypeRef(es.rlref))
es.stream = nil
started <- fmt.Errorf("failed to start eventstream")
close(started)
return
}
close(started)
C.CFRunLoopRun()
}()
es.qref = fsDispatchQueueRef(C.dispatch_queue_create(nil, nil))
C.FSEventStreamSetDispatchQueue(es.stream, es.qref)

if C.FSEventStreamStart(es.stream) == 0 {
// cleanup stream
C.FSEventStreamInvalidate(es.stream)
C.FSEventStreamRelease(es.stream)
es.stream = nil

C.DispatchQueueRelease(es.qref)
es.qref = nil

return fmt.Errorf("failed to start eventstream")
}

if !es.hasFinalizer {
// TODO: There is no guarantee this run before program exit
Expand All @@ -457,7 +454,7 @@ func (es *EventStream) start(paths []string, callbackInfo uintptr) error {
es.hasFinalizer = true
}

return <-started
return nil
}

func finalizer(es *EventStream) {
Expand All @@ -476,10 +473,9 @@ func flush(stream fsEventStreamRef, sync bool) {
}

// stop requests fsevents stops streaming events
func stop(stream fsEventStreamRef, rlref cfRunLoopRef) {
func stop(stream fsEventStreamRef, qref fsDispatchQueueRef) {
C.FSEventStreamStop(stream)
C.FSEventStreamInvalidate(stream)
C.FSEventStreamRelease(stream)
C.CFRunLoopStop(C.CFRunLoopRef(rlref))
C.CFRelease(C.CFTypeRef(rlref))
C.DispatchQueueRelease(qref)
}
Loading