1 module concurrency.fork;
2 
3 import concurrency.sender;
4 import concurrency.executor;
5 import concepts;
6 
7 // TODO: fork is an scheduler :)
8 // therefor the Function fun could be a `then` continuation
9 
10 version (Posix)
11 struct ForkSender {
12   alias Value = void;
13   static assert(models!(typeof(this), isSender));
14   alias Fun = void delegate() shared;
15   alias AfterFork = void delegate(int);
16   static struct Operation(Receiver) {
17     private {
18       Executor executor;
19       void delegate() shared fun;
20       Receiver receiver;
21       void delegate(int) afterFork;
22       void run() {
23         import concurrency.thread : executeInNewThread, executeAndWait;
24         import concurrency.stoptoken;
25         import core.sys.posix.sys.wait;
26         import core.sys.posix.sys.select;
27         import core.sys.posix.unistd;
28         import core.stdc.errno;
29         import core.stdc.stdlib;
30         import core.stdc.string : strerror;
31         import std.string : fromStringz;
32         import std.format : format;
33 
34         auto token = receiver.getStopToken();
35         shared pid = executor.executeAndWait((void delegate() shared fun) {
36             auto r = fork();
37             if (r != 0)
38               return r;
39 
40             reinitThreadLocks();
41             detachOtherThreads();
42             drainMessageBox();
43             setMainThread();
44             try {
45               (cast()fun)();
46             } catch (Throwable t) {
47               exit(1);
48             }
49             exit(0);
50             assert(0);
51           }, fun);
52 
53         if (pid == -1) {
54           receiver.setError(new Exception("Failed to fork, %s".format(strerror(errno).fromStringz)));
55           return;
56         }
57         import core.sys.posix.signal : kill, SIGINT;
58 
59         if (afterFork)
60           afterFork(pid);
61         auto cb = token.onStop(() @trusted shared nothrow => cast(void)kill(pid, SIGINT));
62         int status;
63         auto ret = waitpid(pid, &status, 0);
64         cb.dispose();
65         if (ret == -1) {
66           receiver.setError(new Exception("Failed to wait for child, %s".format(strerror(errno).fromStringz)));
67         }
68         else if (WIFSIGNALED(status)) {
69           auto exitsignal = WTERMSIG(status);
70           receiver.setError(new Exception("Child exited by signal %d".format(exitsignal)));
71         }
72         else if (WIFEXITED(status)) {
73           auto exitstatus = WEXITSTATUS(status);
74           if (exitstatus == 0)
75             receiver.setValue();
76           else
77             receiver.setError(new Exception("Child exited with %d".format(exitstatus)));
78         }
79         else {
80           receiver.setError(new Exception("Child unknown exit"));
81         }
82       }
83     }
84     this(Executor executor, Fun fun, Receiver receiver, AfterFork afterFork) {
85       this.executor = executor;
86       this.fun = fun;
87       this.receiver = receiver;
88       this.afterFork = afterFork;
89     }
90     void start() @trusted nothrow {
91       import concurrency.thread : executeInNewThread, executeAndWait;
92       import concurrency.utils : closure;
93 
94       executeInNewThread(cast(void delegate() shared @safe)&this.run);
95     }
96   }
97   private Executor executor;
98   private void delegate() shared fun;
99   private void delegate(int) afterFork;
100   this(Executor executor, void delegate() shared fun, void delegate(int) afterFork = null) @system { // forking is dangerous so this is @system
101     this.executor = executor;
102     this.fun = fun;
103     this.afterFork = afterFork;
104   }
105   auto connect(Receiver)(return Receiver receiver) @safe return scope {
106     return new Operation!Receiver(executor, fun, receiver, afterFork);
107   }
108   static void reinitThreadLocks() {
109     import core.thread : Thread;
110 
111     __traits(getMember, Thread, "initLocks")();
112   }
113   // After forking there is only one thread left, but others (if any) are still registered, we need to detach them else the GC will fail suspending them during a GC cycle
114   static private void detachOtherThreads() {
115     import core.thread : Thread, thread_detachInstance;
116     import core.memory : GC;
117 
118     GC.disable();
119     auto threads = Thread.getAll();
120     auto thisThread = Thread.getThis();
121     foreach (t; threads) {
122       if (t != thisThread)
123         thread_detachInstance(t);
124     }
125     GC.enable();
126   }
127   static private void drainMessageBox() {
128     import std.concurrency : receiveTimeout;
129     import std.variant : Variant;
130     import core.time : seconds;
131     import std.concurrency : thisTid;
132     thisTid(); // need to call otherwise the messagebox might be empty and receiveTimeout will assert
133 
134     while(receiveTimeout(seconds(-1),(Variant v){})) {}
135   }
136   // after fork there is only one thread left, it is possible
137   // that wasn't the main thread in the 'old' program, so
138   // we overwrite the global to point to the only thread left
139   // in the (forked) process
140   static private void setMainThread() {
141     import core.thread : Thread;
142 
143     __traits(getMember, Thread, "sm_main") = Thread.getThis();
144   }
145 }