1 module concurrency.signal; 2 3 import concurrency.stoptoken; 4 5 shared(StopSource) globalStopSource() @trusted { 6 import core.atomic : atomicLoad, cas; 7 8 if (globalSource.atomicLoad is null) { 9 import concurrency.utils : dynamicLoad; 10 auto ptr = getGlobalStopSourcePointer(); 11 12 if (auto source = (*ptr).atomicLoad) { 13 globalSource = source; 14 return globalSource; 15 } 16 17 auto tmp = new shared StopSource(); 18 if (ptr.cas(cast(shared StopSource)null, tmp)) { 19 setupCtrlCHandler(tmp); 20 globalSource = tmp; 21 } else 22 globalSource = (*ptr).atomicLoad; 23 } 24 return globalSource; 25 } 26 27 /// Returns true if first to set (otherwise it is ignored) 28 bool setGlobalStopSource(shared StopSource stopSource) @safe { 29 import core.atomic : cas; 30 auto ptr = getGlobalStopSourcePointer(); 31 if (!ptr.cas(cast(shared StopSource)null, stopSource)) 32 return false; 33 globalSource = stopSource; 34 return true; 35 } 36 37 /// Sets the stopSource to be called when receiving an interrupt 38 void setupCtrlCHandler(shared StopSource stopSource) @trusted { 39 import core.atomic; 40 41 if (stopSource is null) 42 return; 43 44 auto old = atomicExchange(&SignalHandler.signalStopSource, stopSource); 45 if (old !is null) 46 return; 47 48 SignalHandler.setup(); 49 SignalHandler.launchHandlerThread(); 50 version (Windows) { 51 import core.sys.windows.windows; 52 SetConsoleCtrlHandler(&signalHandler, true); 53 } else { 54 import core.sys.posix.signal; 55 56 static void handleSignal(int s) @trusted { 57 sigaction_t old; 58 sigset_t sigset; 59 sigemptyset(&sigset); 60 sigaction_t siginfo; 61 siginfo.sa_handler = &signalHandler; 62 siginfo.sa_mask = sigset; 63 siginfo.sa_flags = SA_RESTART; 64 sigaction(s, &siginfo, &old); 65 // TODO: what to do with old? 66 } 67 68 handleSignal(SIGINT); 69 handleSignal(SIGTERM); 70 } 71 } 72 73 private static shared StopSource globalSource; 74 75 // we export this function so that dynamic libraries can load it to access 76 // the host's globalStopSource pointer. 77 // Otherwise they would access their own local instance. 78 // should not be called directly by usercode, instead use `globalStopSource`. 79 export extern(C) shared(StopSource*) concurrency_globalStopSourcePointer() @safe { 80 return &globalSource; 81 } 82 83 private shared(StopSource*) getGlobalStopSourcePointer() @safe { 84 import concurrency.utils : dynamicLoad; 85 return dynamicLoad!concurrency_globalStopSourcePointer()(); 86 } 87 88 struct SignalHandler { 89 import core.atomic : atomicStore, atomicLoad, MemoryOrder, atomicExchange; 90 import core.thread : Thread; 91 static shared int lastSignal; // last signal received 92 enum int ABORT = -1; 93 version (Windows) { 94 import core.sync.event : Event; 95 private static shared Event event; // used to notify the dedicated thread to shutdown 96 static void notify(int num) nothrow @nogc @trusted { 97 lastSignal.atomicStore!(MemoryOrder.rel)(num); 98 (cast()event).set(); 99 } 100 private static int await() nothrow @nogc @trusted { 101 (cast()event).wait(); 102 return lastSignal.atomicLoad!(MemoryOrder.acq)(); 103 } 104 private static void setup() @trusted { 105 (cast()event).initialize(false, false); 106 } 107 } else version (linux) { 108 import core.sys.posix.unistd : write, read; 109 private static shared int event; // eventfd to notify dedicated thread 110 static void notify(int num) nothrow @nogc { 111 lastSignal.atomicStore!(MemoryOrder.rel)(num); 112 ulong b = 1; 113 write(event, &b, typeof(b).sizeof); 114 } 115 private static int await() nothrow @nogc { 116 ulong b; 117 while(read(event, &b, typeof(b).sizeof) != typeof(b).sizeof) {} 118 return lastSignal.atomicLoad!(MemoryOrder.acq)(); 119 } 120 private static void setup() { 121 import core.sys.linux.sys.eventfd; 122 event = eventfd(0, EFD_CLOEXEC); 123 } 124 } else version (Posix) { 125 import core.sys.posix.unistd : write, read, pipe; 126 private static shared int[2] selfPipe; // self pipe to notify dedicated thread 127 static void notify(int num) nothrow @nogc { 128 lastSignal.atomicStore!(MemoryOrder.rel)(num); 129 ulong b = 1; 130 write(selfPipe[1], &b, typeof(b).sizeof); 131 } 132 private static int await() nothrow @nogc { 133 ulong b; 134 while(read(cast()selfPipe[0], &b, typeof(b).sizeof) != typeof(b).sizeof) {} 135 return lastSignal.atomicLoad!(MemoryOrder.acq)(); 136 } 137 private static void setup() { 138 import std.exception : ErrnoException; 139 if (pipe(cast(int[2])selfPipe) == -1) 140 throw new ErrnoException("Failed to create self-pipe"); 141 } 142 } 143 private static void shutdown() { 144 if (atomicLoad!(MemoryOrder.acq)(signalStopSource) !is null) 145 SignalHandler.notify(ABORT); 146 } 147 private static shared StopSource signalStopSource; 148 private static shared Thread handlerThread; 149 private static void launchHandlerThread() { 150 if (handlerThread.atomicLoad !is null) 151 return; 152 153 auto thread = new Thread((){ 154 for(;;) { 155 if (SignalHandler.await() == ABORT) { 156 return; 157 } 158 signalStopSource.stop(); 159 } 160 }); 161 // This has to be a daemon thread otherwise the runtime will wait on it before calling the shared module destructor that stops it. 162 thread.isDaemon = true; 163 164 if (atomicExchange(&handlerThread, cast(shared)thread) !is null) 165 return; // someone beat us to it 166 167 thread.start(); 168 } 169 } 170 171 /// This is required to properly shutdown in the presence of sanitizers 172 shared static ~this() { 173 import core.atomic : atomicExchange; 174 import core.thread : Thread; 175 SignalHandler.shutdown(); 176 if (auto thread = atomicExchange(&SignalHandler.handlerThread, null)) 177 (cast()thread).join(); 178 } 179 180 version (Windows) { 181 import core.sys.windows.windows; 182 extern (Windows) static BOOL signalHandler(DWORD dwCtrlType) nothrow @system { 183 import core.stdc.signal; 184 if (dwCtrlType == CTRL_C_EVENT || 185 dwCtrlType == CTRL_BREAK_EVENT || 186 dwCtrlType == CTRL_CLOSE_EVENT || 187 dwCtrlType == CTRL_SHUTDOWN_EVENT) { 188 SignalHandler.notify(SIGINT); 189 return TRUE; 190 } 191 return FALSE; 192 } 193 } else { 194 extern (C) static void signalHandler(int i) nothrow @nogc { 195 SignalHandler.notify(i); 196 } 197 }