1 module concurrency.fork; 2 3 import concurrency.sender; 4 import concurrency.executor; 5 import concepts; 6 7 // TODO: fork is an scheduler :) 8 // therefor the Function fun could be a `then` continuation 9 10 version (Posix) 11 struct ForkSender { 12 alias Value = void; 13 static assert(models!(typeof(this), isSender)); 14 static struct Operation(Receiver) { 15 private { 16 Executor executor; 17 void delegate() shared fun; 18 Receiver receiver; 19 void delegate(int) afterFork; 20 void run() { 21 import concurrency.thread : executeInNewThread, executeAndWait; 22 import concurrency.stoptoken; 23 import core.sys.posix.sys.wait; 24 import core.sys.posix.sys.select; 25 import core.sys.posix.unistd; 26 import core.stdc.errno; 27 import core.stdc.stdlib; 28 import core.stdc.string : strerror; 29 import std.string : fromStringz; 30 import std.format : format; 31 32 auto token = receiver.getStopToken(); 33 shared pid = executor.executeAndWait((void delegate() shared fun) { 34 auto r = fork(); 35 if (r != 0) 36 return r; 37 38 reinitThreadLocks(); 39 detachOtherThreads(); 40 drainMessageBox(); 41 setMainThread(); 42 try { 43 (cast()fun)(); 44 } catch (Throwable t) { 45 exit(1); 46 } 47 exit(0); 48 assert(0); 49 }, fun); 50 51 if (pid == -1) { 52 receiver.setError(new Exception("Failed to fork, %s".format(strerror(errno).fromStringz))); 53 return; 54 } 55 import core.sys.posix.signal : kill, SIGINT; 56 57 if (afterFork) 58 afterFork(pid); 59 auto cb = token.onStop(() @trusted shared nothrow => cast(void)kill(pid, SIGINT)); 60 int status; 61 auto ret = waitpid(pid, &status, 0); 62 cb.dispose(); 63 if (ret == -1) { 64 receiver.setError(new Exception("Failed to wait for child, %s".format(strerror(errno).fromStringz))); 65 } 66 else if (WIFSIGNALED(status)) { 67 auto exitsignal = WTERMSIG(status); 68 receiver.setError(new Exception("Child exited by signal %d".format(exitsignal))); 69 } 70 else if (WIFEXITED(status)) { 71 auto exitstatus = WEXITSTATUS(status); 72 if (exitstatus == 0) 73 receiver.setValue(); 74 else 75 receiver.setError(new Exception("Child exited with %d".format(exitstatus))); 76 } 77 else { 78 receiver.setError(new Exception("Child unknown exit")); 79 } 80 } 81 } 82 void start() @trusted nothrow { 83 import concurrency.thread : executeInNewThread, executeAndWait; 84 import concurrency.utils : closure; 85 86 executeInNewThread(cast(void delegate() shared @safe)&this.run); 87 } 88 } 89 private Executor executor; 90 private void delegate() shared fun; 91 private void delegate(int) afterFork; 92 this(Executor executor, void delegate() shared fun, void delegate(int) afterFork = null) @system { // forking is dangerous so this is @system 93 this.executor = executor; 94 this.fun = fun; 95 this.afterFork = afterFork; 96 } 97 auto connect(Receiver)(Receiver receiver) @safe { 98 return new Operation!Receiver(executor, fun, receiver, afterFork); 99 } 100 static void reinitThreadLocks() { 101 import core.thread : Thread; 102 103 __traits(getMember, Thread, "initLocks")(); 104 } 105 // After forking there is only one thread left, but others (if any) are still registered, we need to detach them else the GC will fail suspending them during a GC cycle 106 static private void detachOtherThreads() { 107 import core.thread : Thread, thread_detachInstance; 108 import core.memory : GC; 109 110 GC.disable(); 111 auto threads = Thread.getAll(); 112 auto thisThread = Thread.getThis(); 113 foreach (t; threads) { 114 if (t != thisThread) 115 thread_detachInstance(t); 116 } 117 GC.enable(); 118 } 119 static private void drainMessageBox() { 120 import std.concurrency : receiveTimeout; 121 import std.variant : Variant; 122 import core.time : seconds; 123 import std.concurrency : thisTid; 124 thisTid(); // need to call otherwise the messagebox might be empty and receiveTimeout will assert 125 126 while(receiveTimeout(seconds(-1),(Variant v){})) {} 127 } 128 // after fork there is only one thread left, it is possible 129 // that wasn't the main thread in the 'old' program, so 130 // we overwrite the global to point to the only thread left 131 // in the (forked) process 132 static private void setMainThread() { 133 import core.thread : Thread; 134 135 __traits(getMember, Thread, "sm_main") = Thread.getThis(); 136 } 137 }