diff --git a/std/cpp/_std/sys/thread/Condition.hx b/std/cpp/_std/sys/thread/Condition.hx new file mode 100644 index 00000000000..484db3c6576 --- /dev/null +++ b/std/cpp/_std/sys/thread/Condition.hx @@ -0,0 +1,33 @@ +package sys.thread; + +@:coreApi +class Condition { + var c:Dynamic; + public function new():Void { + c = untyped __global__.__hxcpp_condition_create(); + } + + public function acquire():Void { + untyped __global__.__hxcpp_condition_acquire(c); + } + + public function tryAcquire():Bool { + return untyped __global__.__hxcpp_condition_try_acquire(c); + } + + public function release():Void { + untyped __global__.__hxcpp_condition_release(c); + } + + public function wait():Void { + untyped __global__.__hxcpp_condition_wait(c); + } + + public function signal():Void { + untyped __global__.__hxcpp_condition_signal(c); + } + + public function broadcast():Void { + untyped __global__.__hxcpp_condition_broadcast(c); + } +} diff --git a/std/cpp/_std/sys/thread/Semaphore.hx b/std/cpp/_std/sys/thread/Semaphore.hx new file mode 100644 index 00000000000..a09774f7bd0 --- /dev/null +++ b/std/cpp/_std/sys/thread/Semaphore.hx @@ -0,0 +1,22 @@ +package sys.thread; + +@:coreApi +class Semaphore { + var m:Dynamic; + + public function new(value:Int) { + m = untyped __global__.__hxcpp_semaphore_create(value); + } + + public function acquire():Void { + untyped __global__.__hxcpp_semaphore_acquire(m); + } + + public function tryAcquire(?timeout:Float):Bool { + return untyped __global__.__hxcpp_semaphore_try_acquire(m, timeout == null ? 0 : (timeout:Float)); + } + + public function release():Void { + untyped __global__.__hxcpp_semaphore_release(m); + } +} diff --git a/std/cpp/cppia/HostClasses.hx b/std/cpp/cppia/HostClasses.hx index 392e403e328..d7a802a00ca 100644 --- a/std/cpp/cppia/HostClasses.hx +++ b/std/cpp/cppia/HostClasses.hx @@ -46,6 +46,8 @@ class HostClasses { "sys.thread.Mutex", "sys.thread.Thread", "sys.thread.Tls", + "sys.thread.Semaphore", + "sys.thread.Condition", "cpp.vm.ExecutionTrace", "cpp.vm.Gc", "cpp.vm.Profiler", diff --git a/std/cs/_std/sys/thread/Condition.hx b/std/cs/_std/sys/thread/Condition.hx new file mode 100644 index 00000000000..52b1f24adac --- /dev/null +++ b/std/cs/_std/sys/thread/Condition.hx @@ -0,0 +1,37 @@ +package sys.thread; + +import cs.system.threading.Monitor; + +@:coreApi +@:access(sys.thread.Mutex) +class Condition { + final object:cs.system.Object; + + public function new():Void { + this.object = new cs.system.Object(); + } + + public function acquire():Void { + Monitor.Enter(object); + } + + public function tryAcquire():Bool { + return Monitor.TryEnter(object); + } + + public function release():Void { + Monitor.Exit(object); + } + + public function wait():Void { + Monitor.Wait(object); + } + + public function signal():Void { + Monitor.Pulse(object); + } + + public function broadcast():Void { + Monitor.PulseAll(object); + } +} diff --git a/std/cs/_std/sys/thread/Semaphore.hx b/std/cs/_std/sys/thread/Semaphore.hx new file mode 100644 index 00000000000..e08064b8a0d --- /dev/null +++ b/std/cs/_std/sys/thread/Semaphore.hx @@ -0,0 +1,22 @@ +package sys.thread; + +@:coreApi +class Semaphore { + final native:cs.system.threading.Semaphore; + + public function new(value:Int):Void { + this.native = new cs.system.threading.Semaphore(value, 0x7FFFFFFF); + } + + public function acquire():Void { + native.WaitOne(); + } + + public function tryAcquire(?timeout:Float):Bool { + return native.WaitOne(timeout == null ? 0 : Std.int(timeout * 1000)); + } + + public function release():Void { + native.Release(); + } +} diff --git a/std/eval/_std/sys/thread/Condition.hx b/std/eval/_std/sys/thread/Condition.hx new file mode 100644 index 00000000000..15b6b182411 --- /dev/null +++ b/std/eval/_std/sys/thread/Condition.hx @@ -0,0 +1,41 @@ +package sys.thread; + +@:coreApi class Condition { + final cond:eval.luv.Condition; + final mutex:eval.luv.Mutex; + + public function new():Void { + cond = eval.luv.Condition.init().resolve(); + mutex = eval.luv.Mutex.init(true).resolve(); + eval.vm.Gc.finalise(destroy, this); + } + + static function destroy(cond:Condition):Void { + cond.cond.destroy(); + cond.mutex.destroy(); + } + + public function acquire():Void { + mutex.lock(); + } + + public function tryAcquire():Bool { + return mutex.tryLock().isOk(); + } + + public function release():Void { + mutex.unlock(); + } + + public function wait():Void { + cond.wait(mutex); + } + + public function signal():Void { + cond.signal(); + } + + public function broadcast():Void { + cond.broadcast(); + } +} diff --git a/std/eval/_std/sys/thread/Semaphore.hx b/std/eval/_std/sys/thread/Semaphore.hx new file mode 100644 index 00000000000..5c75b471528 --- /dev/null +++ b/std/eval/_std/sys/thread/Semaphore.hx @@ -0,0 +1,36 @@ +package sys.thread; + +@:coreApi class Semaphore { + final native:eval.luv.Semaphore; + + public function new(value:Int):Void { + native = eval.luv.Semaphore.init(value).resolve(); + eval.vm.Gc.finalise(destroy, this); + } + + static function destroy(sem:Semaphore):Void { + sem.native.destroy(); + } + + public function acquire():Void { + native.wait(); + } + + public function tryAcquire(?timeout:Float):Bool { + if (timeout == null) { + return native.tryWait().isOk(); + } else { + var t = Sys.time() + timeout; + while (Sys.time() < t) { + if (native.tryWait().isOk()) { + return true; + } + } + return false; + } + } + + public function release():Void { + native.post(); + } +} diff --git a/std/hl/_std/sys/thread/Condition.hx b/std/hl/_std/sys/thread/Condition.hx new file mode 100644 index 00000000000..43f52a34a0d --- /dev/null +++ b/std/hl/_std/sys/thread/Condition.hx @@ -0,0 +1,32 @@ +package sys.thread; + +abstract Condition(hl.Abstract<"hl_condition">) { + public function new():Void { + this = alloc(); + } + + @:hlNative("std", "condition_acquire") + public function acquire():Void {} + + @:hlNative("std", "condition_try_acquire") + public function tryAcquire():Bool { + return false; + } + + @:hlNative("std", "condition_release") + public function release():Void {} + + @:hlNative("std", "condition_wait") + public function wait():Void {} + + @:hlNative("std", "condition_signal") + public function signal():Void {} + + @:hlNative("std", "condition_broadcast") + public function broadcast():Void {} + + @:hlNative("std", "condition_alloc") + static function alloc():hl.Abstract<"hl_condition"> { + return null; + } +} diff --git a/std/hl/_std/sys/thread/Semaphore.hx b/std/hl/_std/sys/thread/Semaphore.hx new file mode 100644 index 00000000000..464e014482f --- /dev/null +++ b/std/hl/_std/sys/thread/Semaphore.hx @@ -0,0 +1,23 @@ +package sys.thread; + +abstract Semaphore(hl.Abstract<"hl_semaphore">) { + public function new(value:Int):Void { + this = alloc(value); + } + + @:hlNative("std", "semaphore_acquire") + public function acquire():Void {} + + @:hlNative("std", "semaphore_release") + public function release():Void {} + + @:hlNative("std", "semaphore_try_acquire") + public function tryAcquire(?timeout:Float):Bool { + return false; + } + + @:hlNative("std", "semaphore_alloc") + static function alloc(value:Int):hl.Abstract<"hl_semaphore"> { + return null; + } +} diff --git a/std/java/_std/sys/thread/Condition.hx b/std/java/_std/sys/thread/Condition.hx new file mode 100644 index 00000000000..fb469d0a0c8 --- /dev/null +++ b/std/java/_std/sys/thread/Condition.hx @@ -0,0 +1,45 @@ +package sys.thread; + +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.Condition as NativeCondition; + +@:access(sys.thread.Mutex) +@:coreApi +@:native('haxe.java.vm.Condition') +class Condition { + final lock:ReentrantLock; + final native:NativeCondition; + + public function new():Void { + this.lock = new ReentrantLock(); + this.native = lock.newCondition(); + } + + public function acquire():Void { + lock.lock(); + } + + public function tryAcquire():Bool { + return this.lock.tryLock(); + } + + public function release():Void { + lock.unlock(); + } + + // without the @:native, you get "java.lang.VerifyError: class sys.thread.Condition overrides final method java.lang.Object.wait()V" on jvm + // and "wait() in Condition cannot override wait() in Object" from javac + + @:native("waitOn") + public function wait():Void { + native.await(); + } + + public function signal():Void { + native.signal(); + } + + public function broadcast():Void { + native.signalAll(); + } +} diff --git a/std/java/_std/sys/thread/Semaphore.hx b/std/java/_std/sys/thread/Semaphore.hx new file mode 100644 index 00000000000..329434af8d1 --- /dev/null +++ b/std/java/_std/sys/thread/Semaphore.hx @@ -0,0 +1,25 @@ +package sys.thread; + +import java.util.concurrent.TimeUnit; + +@:coreApi +@:native('haxe.java.vm.Semaphore') +class Semaphore { + final native:java.util.concurrent.Semaphore; + + public function new(value:Int):Void { + this.native = new java.util.concurrent.Semaphore(value); + } + + public function acquire():Void { + native.acquire(); + } + + public function tryAcquire(?timeout:Float):Bool { + return timeout == null ? native.tryAcquire() : native.tryAcquire(haxe.Int64.fromFloat(timeout * 1000000000),TimeUnit.NANOSECONDS); + } + + public function release():Void { + native.release(); + } +} diff --git a/std/python/_std/sys/thread/Condition.hx b/std/python/_std/sys/thread/Condition.hx new file mode 100644 index 00000000000..1c5a5782947 --- /dev/null +++ b/std/python/_std/sys/thread/Condition.hx @@ -0,0 +1,34 @@ +package sys.thread; + +@:coreApi +class Condition { + final cond:python.lib.threading.Condition; + + public function new():Void { + this.cond = new python.lib.threading.Condition(); + } + + public function acquire():Void { + cond.acquire(); + } + + public function tryAcquire():Bool { + return cond.acquire(false); + } + + public function release():Void { + cond.release(); + } + + public function wait():Void { + cond.wait(); + } + + public function signal():Void { + cond.notify(); + } + + public function broadcast():Void { + cond.notify_all(); + } +} diff --git a/std/python/_std/sys/thread/Semaphore.hx b/std/python/_std/sys/thread/Semaphore.hx new file mode 100644 index 00000000000..8dd588179ba --- /dev/null +++ b/std/python/_std/sys/thread/Semaphore.hx @@ -0,0 +1,24 @@ +package sys.thread; + +import python.lib.threading.Semaphore as NativeSemaphore; + +@:coreApi +class Semaphore { + final semaphore:NativeSemaphore; + + public function new(value:Int):Void { + this.semaphore = new NativeSemaphore(value); + } + + public function acquire():Void { + semaphore.acquire(); + } + + public function tryAcquire(?timeout:Float):Bool { + return timeout == null ? semaphore.acquire(false) : semaphore.acquire(true, timeout); + } + + public function release():Void { + semaphore.release(); + } +} diff --git a/std/sys/thread/Condition.hx b/std/sys/thread/Condition.hx new file mode 100644 index 00000000000..4bad9df1072 --- /dev/null +++ b/std/sys/thread/Condition.hx @@ -0,0 +1,58 @@ +package sys.thread; + +#if (!target.threaded) +#error "This class is not available on this target" +#end + +/** + Creates a new condition variable. + Conditions variables can be used to block one or more threads at the same time, + until another thread modifies a shared variable (the condition) + and signals the condition variable. +**/ +@:coreApi extern class Condition { + /** + Create a new condition variable. + A thread that waits on a newly created condition variable will block. + **/ + function new():Void; + + /** + Acquires the internal mutex. + **/ + function acquire():Void; + + /** + Tries to acquire the internal mutex. + @see `Mutex.tryAcquire` + **/ + function tryAcquire():Bool; + + /*** + Releases the internal mutex. + **/ + function release():Void; + + /** + Atomically releases the mutex and blocks until the condition variable pointed is signaled by a call to + `signal` or to `broadcast`. When the calling thread becomes unblocked it + acquires the internal mutex. + The internal mutex should be locked before this function is called. + **/ + function wait():Void; + + /** + Unblocks one of the threads that are blocked on the + condition variable at the time of the call. If no threads are blocked + on the condition variable at the time of the call, the function does nothing. + **/ + function signal():Void; + + /** + Unblocks all of the threads that are blocked on the + condition variable at the time of the call. If no threads are blocked + on the condition variable at the time of the call, the function does + nothing. + **/ + function broadcast():Void; +} diff --git a/std/sys/thread/Semaphore.hx b/std/sys/thread/Semaphore.hx new file mode 100644 index 00000000000..6d0f474d711 --- /dev/null +++ b/std/sys/thread/Semaphore.hx @@ -0,0 +1,33 @@ +package sys.thread; + +#if (!target.threaded) +#error "This class is not available on this target" +#end +@:coreApi extern class Semaphore { + /** + Creates a new semaphore with an initial value. + **/ + public function new(value:Int):Void; + + /** + Locks the semaphore. + If the value of the semaphore is zero, then the thread will block until it is able to lock the semaphore. + If the value is non-zero, it is decreased by one. + **/ + public function acquire():Void; + + /** + Try to lock the semaphore. + If the value of the semaphore is zero, `false` is returned, else the value is increased. + + If `timeout` is specified, this function will block until the thread is able to acquire the semaphore, or the timout expires. + `timeout` is in seconds. + **/ + public function tryAcquire(?timeout:Float):Bool; + + /** + Release the semaphore. + The value of the semaphore is increased by one. + **/ + public function release():Void; +} diff --git a/tests/threads/src/cases/TestCondition.hx b/tests/threads/src/cases/TestCondition.hx new file mode 100644 index 00000000000..8f3f33a0824 --- /dev/null +++ b/tests/threads/src/cases/TestCondition.hx @@ -0,0 +1,24 @@ +package cases; + +#if !neko +import sys.thread.Condition; +import sys.thread.Thread; +#end + +class TestCondition extends utest.Test { + #if !neko + function test() { + final cond = new Condition(); + final thread = Thread.create(() -> { + Sys.sleep(0.01); + cond.acquire(); + cond.signal(); + cond.release(); + }); + cond.acquire(); + cond.wait(); + cond.release(); + utest.Assert.pass(); + } + #end +} diff --git a/tests/threads/src/cases/TestSemaphore.hx b/tests/threads/src/cases/TestSemaphore.hx new file mode 100644 index 00000000000..f18ce27d4b2 --- /dev/null +++ b/tests/threads/src/cases/TestSemaphore.hx @@ -0,0 +1,21 @@ +package cases; + +#if !neko +import sys.thread.Semaphore; +#end + +class TestSemaphore extends utest.Test { + #if !neko + function test() { + var m = new Semaphore(3); + m.acquire(); + m.acquire(); + isTrue(m.tryAcquire()); + isFalse(m.tryAcquire()); + isFalse(m.tryAcquire(0.1)); + m.release(); + m.release(); + m.release(); + } + #end +}