1 module concurrency.fork;
2 
3 import concurrency.sender;
4 import concurrency.executor;
5 import concepts;
6 
7 // TODO: fork is an scheduler :)
8 // therefor the Function fun could be a `then` continuation
9 
10 version (Posix)
11 struct ForkSender {
12   alias Value = void;
13   static assert(models!(typeof(this), isSender));
14   static struct Operation(Receiver) {
15     private {
16       Executor executor;
17       void delegate() shared fun;
18       Receiver receiver;
19       void delegate(int) afterFork;
20       void run() {
21         import concurrency.thread : executeInNewThread, executeAndWait;
22         import concurrency.stoptoken;
23         import core.sys.posix.sys.wait;
24         import core.sys.posix.sys.select;
25         import core.sys.posix.unistd;
26         import core.stdc.errno;
27         import core.stdc.stdlib;
28         import core.stdc.string : strerror;
29         import std.string : fromStringz;
30         import std.format : format;
31 
32         auto token = receiver.getStopToken();
33         shared pid = executor.executeAndWait((void delegate() shared fun) {
34             auto r = fork();
35             if (r != 0)
36               return r;
37 
38             reinitThreadLocks();
39             detachOtherThreads();
40             drainMessageBox();
41             setMainThread();
42             try {
43               (cast()fun)();
44             } catch (Throwable t) {
45               exit(1);
46             }
47             exit(0);
48             assert(0);
49           }, fun);
50 
51         if (pid == -1) {
52           receiver.setError(new Exception("Failed to fork, %s".format(strerror(errno).fromStringz)));
53           return;
54         }
55         import core.sys.posix.signal : kill, SIGINT;
56 
57         if (afterFork)
58           afterFork(pid);
59         auto cb = token.onStop(() @trusted shared nothrow => cast(void)kill(pid, SIGINT));
60         int status;
61         auto ret = waitpid(pid, &status, 0);
62         cb.dispose();
63         if (ret == -1) {
64           receiver.setError(new Exception("Failed to wait for child, %s".format(strerror(errno).fromStringz)));
65         }
66         else if (WIFSIGNALED(status)) {
67           auto exitsignal = WTERMSIG(status);
68           receiver.setError(new Exception("Child exited by signal %d".format(exitsignal)));
69         }
70         else if (WIFEXITED(status)) {
71           auto exitstatus = WEXITSTATUS(status);
72           if (exitstatus == 0)
73             receiver.setValue();
74           else
75             receiver.setError(new Exception("Child exited with %d".format(exitstatus)));
76         }
77         else {
78           receiver.setError(new Exception("Child unknown exit"));
79         }
80       }
81     }
82     void start() @trusted nothrow {
83       import concurrency.thread : executeInNewThread, executeAndWait;
84       import concurrency.utils : closure;
85 
86       executeInNewThread(cast(void delegate() shared @safe)&this.run);
87     }
88   }
89   private Executor executor;
90   private void delegate() shared fun;
91   private void delegate(int) afterFork;
92   this(Executor executor, void delegate() shared fun, void delegate(int) afterFork = null) @system { // forking is dangerous so this is @system
93     this.executor = executor;
94     this.fun = fun;
95     this.afterFork = afterFork;
96   }
97   auto connect(Receiver)(Receiver receiver) @safe {
98     return new Operation!Receiver(executor, fun, receiver, afterFork);
99   }
100   static void reinitThreadLocks() {
101     import core.thread : Thread;
102 
103     __traits(getMember, Thread, "initLocks")();
104   }
105   // After forking there is only one thread left, but others (if any) are still registered, we need to detach them else the GC will fail suspending them during a GC cycle
106   static private void detachOtherThreads() {
107     import core.thread : Thread, thread_detachInstance;
108     import core.memory : GC;
109 
110     GC.disable();
111     auto threads = Thread.getAll();
112     auto thisThread = Thread.getThis();
113     foreach (t; threads) {
114       if (t != thisThread)
115         thread_detachInstance(t);
116     }
117     GC.enable();
118   }
119   static private void drainMessageBox() {
120     import std.concurrency : receiveTimeout;
121     import std.variant : Variant;
122     import core.time : seconds;
123     import std.concurrency : thisTid;
124     thisTid(); // need to call otherwise the messagebox might be empty and receiveTimeout will assert
125 
126     while(receiveTimeout(seconds(-1),(Variant v){})) {}
127   }
128   // after fork there is only one thread left, it is possible
129   // that wasn't the main thread in the 'old' program, so
130   // we overwrite the global to point to the only thread left
131   // in the (forked) process
132   static private void setMainThread() {
133     import core.thread : Thread;
134 
135     __traits(getMember, Thread, "sm_main") = Thread.getThis();
136   }
137 }