1 module concurrency;
2 
3 import concurrency.stoptoken;
4 import concurrency.sender;
5 import concurrency.thread;
6 import concepts;
7 
8 bool isMainThread() @trusted {
9   import core.thread : Thread;
10   return Thread.getThis().isMainThread();
11 }
12 
13 package struct SyncWaitReceiver2(Value, bool isNoThrow) {
14   static struct State {
15     LocalThreadWorker worker;
16     bool canceled;
17     static if (!is(Value == void))
18       Value result;
19     static if (!isNoThrow)
20       Exception exception;
21     StopSource stopSource;
22 
23     this(StopSource stopSource) {
24       this.stopSource = stopSource;
25       worker = LocalThreadWorker(getLocalThreadExecutor());
26     }
27 
28     void handleSignal(int signal) {
29       stopSource.stop();
30     }
31   }
32   State* state;
33   void setDone() nothrow @safe {
34     state.canceled = true;
35     state.worker.stop();
36   }
37 
38   static if (!isNoThrow)
39     void setError(Exception e) nothrow @safe {
40       state.exception = e;
41       state.worker.stop();
42     }
43   static if (is(Value == void))
44     void setValue() nothrow @safe {
45       state.worker.stop();
46     }
47   else
48     void setValue(Value value) nothrow @safe {
49       state.result = value;
50       state.worker.stop();
51     }
52   auto getStopToken() nothrow @safe @nogc {
53     return StopToken(state.stopSource);
54   }
55   auto getScheduler() nothrow @safe {
56     import concurrency.scheduler : SchedulerAdapter;
57     return SchedulerAdapter!(LocalThreadWorker*)(&state.worker);
58   }
59 }
60 
61 deprecated("Use syncWait instead")
62 auto sync_wait(Sender, StopSource)(auto ref Sender sender, StopSource stopSource) {
63   return sync_wait_impl(sender, (()@trusted=>cast()stopSource)());
64 }
65 
66 deprecated("Use syncWait instead")
67 auto sync_wait(Sender)(auto scope ref Sender sender) {
68   return sync_wait_impl(sender);
69 }
70 
71 auto sync_wait_impl(Sender)(auto scope ref Sender sender, StopSource stopSource = null) @safe {
72   static assert(models!(Sender, isSender));
73   import concurrency.signal;
74   import core.sys.posix.signal : SIGTERM, SIGINT;
75 
76   alias Value = Sender.Value;
77   enum NoThrow = !canSenderThrow!(Sender);
78 
79   alias Receiver = SyncWaitReceiver2!(Value, NoThrow);
80 
81   auto state = Receiver.State(stopSource is null ? new StopSource() : stopSource);
82   Receiver receiver = (()@trusted => Receiver(&state))();
83   SignalHandler signalHandler;
84 
85   if (stopSource is null) {
86     /// TODO: not so sure about this
87     if (isMainThread) {
88       (()@trusted => signalHandler.setup(&state.handleSignal))();
89       signalHandler.on(SIGINT);
90       signalHandler.on(SIGTERM);
91     }
92   }
93 
94   auto op = sender.connect(receiver);
95   op.start();
96 
97   state.worker.start();
98 
99   if (isMainThread)
100     signalHandler.teardown();
101 
102   /// if exception, rethrow
103   static if (!NoThrow)
104     if (state.exception !is null)
105       throw state.exception;
106 
107   /// if no value, return true if not canceled
108   static if (is(Value == void))
109     return !state.canceled;
110   else {
111     if (state.canceled)
112       throw new Exception("Canceled");
113 
114     return state.result;
115   }
116 }
117 
118 struct Cancelled {}
119 static immutable cancelledException = new Exception("Cancelled");
120 
121 struct Result(T) {
122   import mir.algebraic : Algebraic, Nullable;
123   static struct Value(T) {
124     static if (!is(T == void))
125       T value;
126   }
127   static if (is(T == void))
128     Nullable!(Cancelled, Exception) result;
129   else
130     Algebraic!(Value!T, Cancelled, Exception) result;
131 
132   static if (!is(T == void))
133     this(T v) {
134       result = Value!T(v);
135     }
136   this(Cancelled c) {
137     result = c;
138   }
139   this(Exception e) {
140     result = e;
141   }
142 
143   bool isCancelled() {
144     return result._is!Cancelled;
145   }
146   bool isError() {
147     return result._is!Exception;
148   }
149   bool isOk() {
150     static if (is(T == void))
151       return result.isNull;
152     else
153       return result._is!(Value!T);
154   }
155   static if (!is(T == void))
156     T value() {
157       if (isCancelled)
158         throw cancelledException;
159       if (isError)
160         throw error;
161       return result.get!(Value!T).value;
162     }
163   Exception error() {
164     return result.get!(Exception);
165   }
166   void assumeOk() {
167     import mir.algebraic : match;
168     static if (is(T == void))
169       result.match!((typeof(null)){},(Exception e){throw e;},(Cancelled c){throw cancelledException;});
170     else
171       result.match!((Exception e){throw e;},(Cancelled c){throw cancelledException;},(ref t){});
172   }
173 }
174 
175 /// matches over the result of syncWait
176 template match(Handlers...) {
177   // has to be separate because of dual-context limitation
178   auto match(T)(Result!T r) {
179     import mir.algebraic : match;
180     static if (is(T == void))
181       return r.result.match!(Handlers);
182     else
183       return r.result.match!((Result!(T).Value!(T) v) => v.value, (ref t) => t).match!(Handlers);
184   }
185 }
186 
187 /// Start the Sender and waits until it completes, cancels, or has an error.
188 auto syncWait(Sender, StopSource)(auto ref Sender sender, StopSource stopSource) {
189   return syncWaitImpl(sender, (()@trusted=>cast()stopSource)());
190 }
191 
192 auto syncWait(Sender)(auto scope ref Sender sender) {
193   return syncWaitImpl(sender);
194 }
195 
196 Result!(Sender.Value) syncWaitImpl(Sender)(auto scope ref Sender sender, StopSource stopSource = null) @safe {
197   import mir.algebraic : Algebraic, Nullable;
198   static assert(models!(Sender, isSender));
199   import concurrency.signal;
200   import core.sys.posix.signal : SIGTERM, SIGINT;
201 
202   alias Value = Sender.Value;
203   enum NoThrow = !canSenderThrow!(Sender);
204 
205   alias Receiver = SyncWaitReceiver2!(Value, NoThrow);
206 
207   auto state = Receiver.State(stopSource is null ? new StopSource() : stopSource);
208   // Receiver receiver = (()@trusted => Receiver(&state))();
209   Receiver receiver = (()@trusted => Receiver(&state))();
210   SignalHandler signalHandler;
211 
212   if (stopSource is null) {
213     /// TODO: not so sure about this
214     if (isMainThread) {
215       (()@trusted => signalHandler.setup(&state.handleSignal))();
216       signalHandler.on(SIGINT);
217       signalHandler.on(SIGTERM);
218     }
219   }
220 
221   auto op = sender.connect(receiver);
222   op.start();
223 
224   state.worker.start();
225 
226   if (isMainThread)
227     signalHandler.teardown();
228 
229   if (state.canceled)
230     return Result!Value(Cancelled());
231 
232   static if (!NoThrow)
233     if (state.exception !is null)
234       return Result!Value(state.exception);
235 
236   static if (is(Value == void))
237     return Result!Value();
238   else
239     return Result!Value(state.result);
240 }