1 module concurrency; 2 3 import concurrency.stoptoken; 4 import concurrency.sender; 5 import concurrency.thread; 6 import concepts; 7 8 bool isMainThread() @trusted { 9 import core.thread : Thread; 10 return Thread.getThis().isMainThread(); 11 } 12 13 package struct SyncWaitReceiver2(Value, bool isNoThrow) { 14 static struct State { 15 LocalThreadWorker worker; 16 bool canceled; 17 static if (!is(Value == void)) 18 Value result; 19 static if (!isNoThrow) 20 Exception exception; 21 StopSource stopSource; 22 23 this(StopSource stopSource) { 24 this.stopSource = stopSource; 25 worker = LocalThreadWorker(getLocalThreadExecutor()); 26 } 27 28 void handleSignal(int signal) { 29 stopSource.stop(); 30 } 31 } 32 State* state; 33 void setDone() nothrow @safe { 34 state.canceled = true; 35 state.worker.stop(); 36 } 37 38 static if (!isNoThrow) 39 void setError(Exception e) nothrow @safe { 40 state.exception = e; 41 state.worker.stop(); 42 } 43 static if (is(Value == void)) 44 void setValue() nothrow @safe { 45 state.worker.stop(); 46 } 47 else 48 void setValue(Value value) nothrow @safe { 49 state.result = value; 50 state.worker.stop(); 51 } 52 auto getStopToken() nothrow @safe @nogc { 53 return StopToken(state.stopSource); 54 } 55 auto getScheduler() nothrow @safe { 56 import concurrency.scheduler : SchedulerAdapter; 57 return SchedulerAdapter!(LocalThreadWorker*)(&state.worker); 58 } 59 } 60 61 deprecated("Use syncWait instead") 62 auto sync_wait(Sender, StopSource)(auto ref Sender sender, StopSource stopSource) { 63 return sync_wait_impl(sender, (()@trusted=>cast()stopSource)()); 64 } 65 66 deprecated("Use syncWait instead") 67 auto sync_wait(Sender)(auto scope ref Sender sender) { 68 return sync_wait_impl(sender); 69 } 70 71 auto sync_wait_impl(Sender)(auto scope ref Sender sender, StopSource stopSource = null) @safe { 72 static assert(models!(Sender, isSender)); 73 import concurrency.signal; 74 import core.sys.posix.signal : SIGTERM, SIGINT; 75 76 alias Value = Sender.Value; 77 enum NoThrow = !canSenderThrow!(Sender); 78 79 alias Receiver = SyncWaitReceiver2!(Value, NoThrow); 80 81 auto state = Receiver.State(stopSource is null ? new StopSource() : stopSource); 82 Receiver receiver = (()@trusted => Receiver(&state))(); 83 SignalHandler signalHandler; 84 85 if (stopSource is null) { 86 /// TODO: not so sure about this 87 if (isMainThread) { 88 (()@trusted => signalHandler.setup(&state.handleSignal))(); 89 signalHandler.on(SIGINT); 90 signalHandler.on(SIGTERM); 91 } 92 } 93 94 auto op = sender.connect(receiver); 95 op.start(); 96 97 state.worker.start(); 98 99 if (isMainThread) 100 signalHandler.teardown(); 101 102 /// if exception, rethrow 103 static if (!NoThrow) 104 if (state.exception !is null) 105 throw state.exception; 106 107 /// if no value, return true if not canceled 108 static if (is(Value == void)) 109 return !state.canceled; 110 else { 111 if (state.canceled) 112 throw new Exception("Canceled"); 113 114 return state.result; 115 } 116 } 117 118 struct Cancelled {} 119 static immutable cancelledException = new Exception("Cancelled"); 120 121 struct Result(T) { 122 import mir.algebraic : Algebraic, Nullable; 123 static struct Value(T) { 124 static if (!is(T == void)) 125 T value; 126 } 127 static if (is(T == void)) 128 Nullable!(Cancelled, Exception) result; 129 else 130 Algebraic!(Value!T, Cancelled, Exception) result; 131 132 static if (!is(T == void)) 133 this(T v) { 134 result = Value!T(v); 135 } 136 this(Cancelled c) { 137 result = c; 138 } 139 this(Exception e) { 140 result = e; 141 } 142 143 bool isCancelled() { 144 return result._is!Cancelled; 145 } 146 bool isError() { 147 return result._is!Exception; 148 } 149 bool isOk() { 150 static if (is(T == void)) 151 return result.isNull; 152 else 153 return result._is!(Value!T); 154 } 155 static if (!is(T == void)) 156 T value() { 157 if (isCancelled) 158 throw cancelledException; 159 if (isError) 160 throw error; 161 return result.get!(Value!T).value; 162 } 163 Exception error() { 164 return result.get!(Exception); 165 } 166 void assumeOk() { 167 import mir.algebraic : match; 168 static if (is(T == void)) 169 result.match!((typeof(null)){},(Exception e){throw e;},(Cancelled c){throw cancelledException;}); 170 else 171 result.match!((Exception e){throw e;},(Cancelled c){throw cancelledException;},(ref t){}); 172 } 173 } 174 175 /// matches over the result of syncWait 176 template match(Handlers...) { 177 // has to be separate because of dual-context limitation 178 auto match(T)(Result!T r) { 179 import mir.algebraic : match; 180 static if (is(T == void)) 181 return r.result.match!(Handlers); 182 else 183 return r.result.match!((Result!(T).Value!(T) v) => v.value, (ref t) => t).match!(Handlers); 184 } 185 } 186 187 /// Start the Sender and waits until it completes, cancels, or has an error. 188 auto syncWait(Sender, StopSource)(auto ref Sender sender, StopSource stopSource) { 189 return syncWaitImpl(sender, (()@trusted=>cast()stopSource)()); 190 } 191 192 auto syncWait(Sender)(auto scope ref Sender sender) { 193 return syncWaitImpl(sender); 194 } 195 196 Result!(Sender.Value) syncWaitImpl(Sender)(auto scope ref Sender sender, StopSource stopSource = null) @safe { 197 import mir.algebraic : Algebraic, Nullable; 198 static assert(models!(Sender, isSender)); 199 import concurrency.signal; 200 import core.sys.posix.signal : SIGTERM, SIGINT; 201 202 alias Value = Sender.Value; 203 enum NoThrow = !canSenderThrow!(Sender); 204 205 alias Receiver = SyncWaitReceiver2!(Value, NoThrow); 206 207 auto state = Receiver.State(stopSource is null ? new StopSource() : stopSource); 208 // Receiver receiver = (()@trusted => Receiver(&state))(); 209 Receiver receiver = (()@trusted => Receiver(&state))(); 210 SignalHandler signalHandler; 211 212 if (stopSource is null) { 213 /// TODO: not so sure about this 214 if (isMainThread) { 215 (()@trusted => signalHandler.setup(&state.handleSignal))(); 216 signalHandler.on(SIGINT); 217 signalHandler.on(SIGTERM); 218 } 219 } 220 221 auto op = sender.connect(receiver); 222 op.start(); 223 224 state.worker.start(); 225 226 if (isMainThread) 227 signalHandler.teardown(); 228 229 if (state.canceled) 230 return Result!Value(Cancelled()); 231 232 static if (!NoThrow) 233 if (state.exception !is null) 234 return Result!Value(state.exception); 235 236 static if (is(Value == void)) 237 return Result!Value(); 238 else 239 return Result!Value(state.result); 240 }