1 module concurrency; 2 3 import concurrency.stoptoken; 4 import concurrency.sender; 5 import concurrency.thread; 6 import concepts; 7 8 bool isMainThread() @trusted { 9 import core.thread : Thread; 10 return Thread.getThis().isMainThread(); 11 } 12 13 package struct SyncWaitReceiver2(Value, bool isNoThrow) { 14 static struct State { 15 LocalThreadWorker worker; 16 bool canceled; 17 static if (!is(Value == void)) 18 Value result; 19 static if (!isNoThrow) 20 Exception exception; 21 StopSource stopSource; 22 23 this(StopSource stopSource) { 24 this.stopSource = stopSource; 25 worker = LocalThreadWorker(getLocalThreadExecutor()); 26 } 27 28 void handleSignal(int signal) { 29 stopSource.stop(); 30 } 31 } 32 State* state; 33 void setDone() nothrow @safe { 34 state.canceled = true; 35 state.worker.stop(); 36 } 37 38 static if (!isNoThrow) 39 void setError(Exception e) nothrow @safe { 40 state.exception = e; 41 state.worker.stop(); 42 } 43 static if (is(Value == void)) 44 void setValue() nothrow @safe { 45 state.worker.stop(); 46 } 47 else 48 void setValue(Value value) nothrow @safe { 49 state.result = value; 50 state.worker.stop(); 51 } 52 auto getStopToken() nothrow @safe @nogc { 53 return StopToken(state.stopSource); 54 } 55 auto getScheduler() nothrow @safe { 56 import concurrency.scheduler : SchedulerAdapter; 57 return SchedulerAdapter!(LocalThreadWorker*)(&state.worker); 58 } 59 } 60 61 deprecated("Use syncWait instead") 62 auto sync_wait(Sender, StopSource)(auto ref Sender sender, StopSource stopSource) { 63 return sync_wait_impl(sender, (()@trusted=>cast()stopSource)()); 64 } 65 66 deprecated("Use syncWait instead") 67 auto sync_wait(Sender)(auto scope ref Sender sender) { 68 return sync_wait_impl(sender); 69 } 70 71 auto sync_wait_impl(Sender)(auto scope ref Sender sender, StopSource stopSource = null) @safe { 72 static assert(models!(Sender, isSender)); 73 import concurrency.signal; 74 import core.sys.posix.signal : SIGTERM, SIGINT; 75 76 alias Value = Sender.Value; 77 enum NoThrow = !canSenderThrow!(Sender); 78 79 alias Receiver = SyncWaitReceiver2!(Value, NoThrow); 80 81 auto state = Receiver.State(stopSource is null ? new StopSource() : stopSource); 82 Receiver receiver = (()@trusted => Receiver(&state))(); 83 SignalHandler signalHandler; 84 85 if (stopSource is null) { 86 /// TODO: not so sure about this 87 if (isMainThread) { 88 (()@trusted => signalHandler.setup(&state.handleSignal))(); 89 signalHandler.on(SIGINT); 90 signalHandler.on(SIGTERM); 91 } 92 } 93 94 auto op = sender.connect(receiver); 95 op.start(); 96 97 state.worker.start(); 98 99 if (isMainThread) 100 signalHandler.teardown(); 101 102 /// if exception, rethrow 103 static if (!NoThrow) 104 if (state.exception !is null) 105 throw state.exception; 106 107 /// if no value, return true if not canceled 108 static if (is(Value == void)) 109 return !state.canceled; 110 else { 111 if (state.canceled) 112 throw new Exception("Canceled"); 113 114 return state.result; 115 } 116 } 117 118 struct Cancelled {} 119 120 struct Result(T) { 121 import mir.algebraic : Algebraic, Nullable; 122 static struct Value(T) { 123 static if (!is(T == void)) 124 T value; 125 } 126 static if (is(T == void)) 127 Nullable!(Cancelled, Exception) result; 128 else 129 Algebraic!(Value!T, Cancelled, Exception) result; 130 131 static if (!is(T == void)) 132 this(T v) { 133 result = Value!T(v); 134 } 135 this(Cancelled c) { 136 result = c; 137 } 138 this(Exception e) { 139 result = e; 140 } 141 142 bool isCancelled() { 143 return result._is!Cancelled; 144 } 145 bool isError() { 146 return result._is!Exception; 147 } 148 bool isOk() { 149 static if (is(T == void)) 150 return result.isNull; 151 else 152 return result._is!(Value!T); 153 } 154 static if (!is(T == void)) 155 T value() { 156 return result.get!(Value!T).value; 157 } 158 Exception error() { 159 return result.get!(Exception); 160 } 161 void assumeOk() { 162 import mir.algebraic : match; 163 result.match!((Exception e){throw e;},(ref t){}); 164 } 165 } 166 167 /// matches over the result of syncWait 168 template match(Handlers...) { 169 // has to be separate because of dual-context limitation 170 auto match(T)(Result!T r) { 171 import mir.algebraic : match; 172 static if (is(T == void)) 173 return r.result.match!(Handlers); 174 else 175 return r.result.match!((Result!(T).Value!(T) v) => v.value, (ref t) => t).match!(Handlers); 176 } 177 } 178 179 /// Start the Sender and waits until it completes, cancels, or has an error. 180 auto syncWait(Sender, StopSource)(auto ref Sender sender, StopSource stopSource) { 181 return syncWaitImpl(sender, (()@trusted=>cast()stopSource)()); 182 } 183 184 auto syncWait(Sender)(auto scope ref Sender sender) { 185 return syncWaitImpl(sender); 186 } 187 188 Result!(Sender.Value) syncWaitImpl(Sender)(auto scope ref Sender sender, StopSource stopSource = null) @safe { 189 import mir.algebraic : Algebraic, Nullable; 190 static assert(models!(Sender, isSender)); 191 import concurrency.signal; 192 import core.sys.posix.signal : SIGTERM, SIGINT; 193 194 alias Value = Sender.Value; 195 enum NoThrow = !canSenderThrow!(Sender); 196 197 alias Receiver = SyncWaitReceiver2!(Value, NoThrow); 198 199 auto state = Receiver.State(stopSource is null ? new StopSource() : stopSource); 200 // Receiver receiver = (()@trusted => Receiver(&state))(); 201 Receiver receiver = (()@trusted => Receiver(&state))(); 202 SignalHandler signalHandler; 203 204 if (stopSource is null) { 205 /// TODO: not so sure about this 206 if (isMainThread) { 207 (()@trusted => signalHandler.setup(&state.handleSignal))(); 208 signalHandler.on(SIGINT); 209 signalHandler.on(SIGTERM); 210 } 211 } 212 213 auto op = sender.connect(receiver); 214 op.start(); 215 216 state.worker.start(); 217 218 if (isMainThread) 219 signalHandler.teardown(); 220 221 if (state.canceled) 222 return Result!Value(Cancelled()); 223 224 static if (!NoThrow) 225 if (state.exception !is null) 226 return Result!Value(state.exception); 227 228 static if (is(Value == void)) 229 return Result!Value(); 230 else 231 return Result!Value(state.result); 232 }