1 module concurrency;
2 
3 import concurrency.stoptoken;
4 import concurrency.sender;
5 import concurrency.thread;
6 import concepts;
7 
8 bool isMainThread() @trusted {
9   import core.thread : Thread;
10   return Thread.getThis().isMainThread();
11 }
12 
13 package struct SyncWaitReceiver2(Value, bool isNoThrow) {
14   static struct State {
15     LocalThreadWorker worker;
16     bool canceled;
17     static if (!is(Value == void))
18       Value result;
19     static if (!isNoThrow)
20       Exception exception;
21     StopSource stopSource;
22 
23     this(StopSource stopSource) {
24       this.stopSource = stopSource;
25       worker = LocalThreadWorker(getLocalThreadExecutor());
26     }
27 
28     void handleSignal(int signal) {
29       stopSource.stop();
30     }
31   }
32   State* state;
33   void setDone() nothrow @safe {
34     state.canceled = true;
35     state.worker.stop();
36   }
37 
38   static if (!isNoThrow)
39     void setError(Exception e) nothrow @safe {
40       state.exception = e;
41       state.worker.stop();
42     }
43   static if (is(Value == void))
44     void setValue() nothrow @safe {
45       state.worker.stop();
46     }
47   else
48     void setValue(Value value) nothrow @safe {
49       state.result = value;
50       state.worker.stop();
51     }
52   auto getStopToken() nothrow @safe @nogc {
53     return StopToken(state.stopSource);
54   }
55   auto getScheduler() nothrow @safe {
56     import concurrency.scheduler : SchedulerAdapter;
57     return SchedulerAdapter!(LocalThreadWorker*)(&state.worker);
58   }
59 }
60 
61 deprecated("Use syncWait instead")
62 auto sync_wait(Sender, StopSource)(auto ref Sender sender, StopSource stopSource) {
63   return sync_wait_impl(sender, (()@trusted=>cast()stopSource)());
64 }
65 
66 deprecated("Use syncWait instead")
67 auto sync_wait(Sender)(auto scope ref Sender sender) {
68   return sync_wait_impl(sender);
69 }
70 
71 auto sync_wait_impl(Sender)(auto scope ref Sender sender, StopSource stopSource = null) @safe {
72   static assert(models!(Sender, isSender));
73   import concurrency.signal;
74   import core.sys.posix.signal : SIGTERM, SIGINT;
75 
76   alias Value = Sender.Value;
77   enum NoThrow = !canSenderThrow!(Sender);
78 
79   alias Receiver = SyncWaitReceiver2!(Value, NoThrow);
80 
81   auto state = Receiver.State(stopSource is null ? new StopSource() : stopSource);
82   Receiver receiver = (()@trusted => Receiver(&state))();
83   SignalHandler signalHandler;
84 
85   if (stopSource is null) {
86     /// TODO: not so sure about this
87     if (isMainThread) {
88       (()@trusted => signalHandler.setup(&state.handleSignal))();
89       signalHandler.on(SIGINT);
90       signalHandler.on(SIGTERM);
91     }
92   }
93 
94   auto op = sender.connect(receiver);
95   op.start();
96 
97   state.worker.start();
98 
99   if (isMainThread)
100     signalHandler.teardown();
101 
102   /// if exception, rethrow
103   static if (!NoThrow)
104     if (state.exception !is null)
105       throw state.exception;
106 
107   /// if no value, return true if not canceled
108   static if (is(Value == void))
109     return !state.canceled;
110   else {
111     if (state.canceled)
112       throw new Exception("Canceled");
113 
114     return state.result;
115   }
116 }
117 
118 struct Cancelled {}
119 
120 struct Result(T) {
121   import mir.algebraic : Algebraic, Nullable;
122   static struct Value(T) {
123     static if (!is(T == void))
124       T value;
125   }
126   static if (is(T == void))
127     Nullable!(Cancelled, Exception) result;
128   else
129     Algebraic!(Value!T, Cancelled, Exception) result;
130 
131   static if (!is(T == void))
132     this(T v) {
133       result = Value!T(v);
134     }
135   this(Cancelled c) {
136     result = c;
137   }
138   this(Exception e) {
139     result = e;
140   }
141 
142   bool isCancelled() {
143     return result._is!Cancelled;
144   }
145   bool isError() {
146     return result._is!Exception;
147   }
148   bool isOk() {
149     static if (is(T == void))
150       return result.isNull;
151     else
152       return result._is!(Value!T);
153   }
154   static if (!is(T == void))
155     T value() {
156       return result.get!(Value!T).value;
157     }
158   Exception error() {
159     return result.get!(Exception);
160   }
161   void assumeOk() {
162     import mir.algebraic : match;
163     result.match!((Exception e){throw e;},(ref t){});
164   }
165 }
166 
167 /// matches over the result of syncWait
168 template match(Handlers...) {
169   // has to be separate because of dual-context limitation
170   auto match(T)(Result!T r) {
171     import mir.algebraic : match;
172     static if (is(T == void))
173       return r.result.match!(Handlers);
174     else
175       return r.result.match!((Result!(T).Value!(T) v) => v.value, (ref t) => t).match!(Handlers);
176   }
177 }
178 
179 /// Start the Sender and waits until it completes, cancels, or has an error.
180 auto syncWait(Sender, StopSource)(auto ref Sender sender, StopSource stopSource) {
181   return syncWaitImpl(sender, (()@trusted=>cast()stopSource)());
182 }
183 
184 auto syncWait(Sender)(auto scope ref Sender sender) {
185   return syncWaitImpl(sender);
186 }
187 
188 Result!(Sender.Value) syncWaitImpl(Sender)(auto scope ref Sender sender, StopSource stopSource = null) @safe {
189   import mir.algebraic : Algebraic, Nullable;
190   static assert(models!(Sender, isSender));
191   import concurrency.signal;
192   import core.sys.posix.signal : SIGTERM, SIGINT;
193 
194   alias Value = Sender.Value;
195   enum NoThrow = !canSenderThrow!(Sender);
196 
197   alias Receiver = SyncWaitReceiver2!(Value, NoThrow);
198 
199   auto state = Receiver.State(stopSource is null ? new StopSource() : stopSource);
200   // Receiver receiver = (()@trusted => Receiver(&state))();
201   Receiver receiver = (()@trusted => Receiver(&state))();
202   SignalHandler signalHandler;
203 
204   if (stopSource is null) {
205     /// TODO: not so sure about this
206     if (isMainThread) {
207       (()@trusted => signalHandler.setup(&state.handleSignal))();
208       signalHandler.on(SIGINT);
209       signalHandler.on(SIGTERM);
210     }
211   }
212 
213   auto op = sender.connect(receiver);
214   op.start();
215 
216   state.worker.start();
217 
218   if (isMainThread)
219     signalHandler.teardown();
220 
221   if (state.canceled)
222     return Result!Value(Cancelled());
223 
224   static if (!NoThrow)
225     if (state.exception !is null)
226       return Result!Value(state.exception);
227 
228   static if (is(Value == void))
229     return Result!Value();
230   else
231     return Result!Value(state.result);
232 }