1 module concurrency.fork; 2 3 import concurrency.sender; 4 import concurrency.executor; 5 import concepts; 6 7 // TODO: fork is an scheduler :) 8 // therefor the Function fun could be a `then` continuation 9 10 version (Posix) 11 struct ForkSender { 12 alias Value = void; 13 static assert(models!(typeof(this), isSender)); 14 alias Fun = void delegate() shared; 15 alias AfterFork = void delegate(int); 16 static struct Operation(Receiver) { 17 private { 18 Executor executor; 19 void delegate() shared fun; 20 Receiver receiver; 21 void delegate(int) afterFork; 22 void run() { 23 import concurrency.thread : executeInNewThread, executeAndWait; 24 import concurrency.stoptoken; 25 import core.sys.posix.sys.wait; 26 import core.sys.posix.sys.select; 27 import core.sys.posix.unistd; 28 import core.stdc.errno; 29 import core.stdc.stdlib; 30 import core.stdc.string : strerror; 31 import std.string : fromStringz; 32 import std.format : format; 33 34 auto token = receiver.getStopToken(); 35 shared pid = executor.executeAndWait((void delegate() shared fun) { 36 auto r = fork(); 37 if (r != 0) 38 return r; 39 40 reinitThreadLocks(); 41 detachOtherThreads(); 42 drainMessageBox(); 43 setMainThread(); 44 try { 45 (cast()fun)(); 46 } catch (Throwable t) { 47 exit(1); 48 } 49 exit(0); 50 assert(0); 51 }, fun); 52 53 if (pid == -1) { 54 receiver.setError(new Exception("Failed to fork, %s".format(strerror(errno).fromStringz))); 55 return; 56 } 57 import core.sys.posix.signal : kill, SIGINT; 58 59 if (afterFork) 60 afterFork(pid); 61 auto cb = token.onStop(() @trusted shared nothrow => cast(void)kill(pid, SIGINT)); 62 int status; 63 auto ret = waitpid(pid, &status, 0); 64 cb.dispose(); 65 if (ret == -1) { 66 receiver.setError(new Exception("Failed to wait for child, %s".format(strerror(errno).fromStringz))); 67 } 68 else if (WIFSIGNALED(status)) { 69 auto exitsignal = WTERMSIG(status); 70 receiver.setError(new Exception("Child exited by signal %d".format(exitsignal))); 71 } 72 else if (WIFEXITED(status)) { 73 auto exitstatus = WEXITSTATUS(status); 74 if (exitstatus == 0) 75 receiver.setValue(); 76 else 77 receiver.setError(new Exception("Child exited with %d".format(exitstatus))); 78 } 79 else { 80 receiver.setError(new Exception("Child unknown exit")); 81 } 82 } 83 } 84 this(Executor executor, Fun fun, Receiver receiver, AfterFork afterFork) { 85 this.executor = executor; 86 this.fun = fun; 87 this.receiver = receiver; 88 this.afterFork = afterFork; 89 } 90 void start() @trusted nothrow { 91 import concurrency.thread : executeInNewThread, executeAndWait; 92 import concurrency.utils : closure; 93 94 executeInNewThread(cast(void delegate() shared @safe)&this.run); 95 } 96 } 97 private Executor executor; 98 private void delegate() shared fun; 99 private void delegate(int) afterFork; 100 this(Executor executor, void delegate() shared fun, void delegate(int) afterFork = null) @system { // forking is dangerous so this is @system 101 this.executor = executor; 102 this.fun = fun; 103 this.afterFork = afterFork; 104 } 105 auto connect(Receiver)(return Receiver receiver) @safe return scope { 106 return new Operation!Receiver(executor, fun, receiver, afterFork); 107 } 108 static void reinitThreadLocks() { 109 import core.thread : Thread; 110 111 __traits(getMember, Thread, "initLocks")(); 112 } 113 // After forking there is only one thread left, but others (if any) are still registered, we need to detach them else the GC will fail suspending them during a GC cycle 114 static private void detachOtherThreads() { 115 import core.thread : Thread, thread_detachInstance; 116 import core.memory : GC; 117 118 GC.disable(); 119 auto threads = Thread.getAll(); 120 auto thisThread = Thread.getThis(); 121 foreach (t; threads) { 122 if (t != thisThread) 123 thread_detachInstance(t); 124 } 125 GC.enable(); 126 } 127 static private void drainMessageBox() { 128 import std.concurrency : receiveTimeout; 129 import std.variant : Variant; 130 import core.time : seconds; 131 import std.concurrency : thisTid; 132 thisTid(); // need to call otherwise the messagebox might be empty and receiveTimeout will assert 133 134 while(receiveTimeout(seconds(-1),(Variant v){})) {} 135 } 136 // after fork there is only one thread left, it is possible 137 // that wasn't the main thread in the 'old' program, so 138 // we overwrite the global to point to the only thread left 139 // in the (forked) process 140 static private void setMainThread() { 141 import core.thread : Thread; 142 143 __traits(getMember, Thread, "sm_main") = Thread.getThis(); 144 } 145 }