1 module concurrency.signal; 2 3 import concurrency.stoptoken; 4 5 shared(StopSource) globalStopSource() @trusted { 6 import core.atomic : atomicLoad; 7 static StopSource localSource; // can't be shared else it is global 8 if (localSource !is null) 9 return cast(shared)localSource; 10 11 if (globalSource.atomicLoad is null) { 12 auto tmp = new shared StopSource(); 13 if (setGlobalStopSource(tmp)) { 14 setupCtrlCHandler(tmp); 15 } 16 } 17 localSource = cast()globalSource; 18 return cast(shared)localSource; 19 } 20 21 /// Returns true if first to set (otherwise it is ignored) 22 bool setGlobalStopSource(shared StopSource stopSource) @safe { 23 import core.atomic : atomicExchange; 24 return atomicExchange(&globalSource, stopSource) is null; 25 } 26 27 /// Sets the stopSource to be called when receiving an interrupt 28 void setupCtrlCHandler(shared StopSource stopSource) @trusted { 29 import core.sys.posix.signal; 30 import core.atomic; 31 32 if (stopSource is null) 33 return; 34 35 auto old = atomicExchange(&SignalHandler.signalStopSource, stopSource); 36 if (old !is null) 37 return; 38 39 SignalHandler.setup(); 40 SignalHandler.launchHandlerThread(); 41 version (Windows) { 42 import core.sys.windows.windows; 43 SetConsoleCtrlHandler(&signalHandler, true); 44 } else { 45 static void handleSignal(int s) @trusted { 46 import core.sys.posix.signal; 47 sigaction_t old; 48 sigset_t sigset; 49 sigemptyset(&sigset); 50 sigaction_t siginfo; 51 siginfo.sa_handler = &signalHandler; 52 siginfo.sa_mask = sigset; 53 siginfo.sa_flags = SA_RESTART; 54 sigaction(s, &siginfo, &old); 55 // TODO: what to do with old? 56 } 57 58 handleSignal(SIGINT); 59 handleSignal(SIGTERM); 60 } 61 } 62 63 private static shared StopSource globalSource; 64 65 struct SignalHandler { 66 import core.atomic : atomicStore, atomicLoad, MemoryOrder, atomicExchange; 67 import core.thread : Thread; 68 static shared int lastSignal; // last signal received 69 enum int ABORT = -1; 70 version (Windows) { 71 import core.sync.event : Event; 72 private static shared Event event; // used to notify the dedicated thread to shutdown 73 static void notify(int num) nothrow @nogc @trusted { 74 lastSignal.atomicStore!(MemoryOrder.rel)(num); 75 (cast()event).set(); 76 } 77 static int await() nothrow @nogc @trusted { 78 (cast()event).wait(); 79 return lastSignal.atomicLoad!(MemoryOrder.acq)(); 80 } 81 static void setup() @trusted { 82 (cast()event).initialize(false, false); 83 } 84 } else version (linux) { 85 import core.sys.posix.unistd : write, read; 86 private static shared int event; // eventfd to notify dedicated thread 87 static void notify(int num) nothrow @nogc { 88 lastSignal.atomicStore!(MemoryOrder.rel)(num); 89 ulong b = 1; 90 write(event, &b, typeof(b).sizeof); 91 } 92 static int await() nothrow @nogc { 93 ulong b; 94 while(read(event, &b, typeof(b).sizeof) != typeof(b).sizeof) {} 95 return lastSignal.atomicLoad!(MemoryOrder.acq)(); 96 } 97 static void setup() { 98 import core.sys.linux.sys.eventfd; 99 event = eventfd(0, EFD_CLOEXEC); 100 } 101 } else version (Posix) { 102 import core.sys.posix.unistd : write, read, pipe; 103 private static shared int[2] selfPipe; // self pipe to notify dedicated thread 104 static void notify(int num) nothrow @nogc { 105 lastSignal.atomicStore!(MemoryOrder.rel)(num); 106 ulong b = 1; 107 write(selfPipe[1], &b, typeof(b).sizeof); 108 } 109 static int await() nothrow @nogc { 110 ulong b; 111 while(read(cast()selfPipe[0], &b, typeof(b).sizeof) != typeof(b).sizeof) {} 112 return lastSignal.atomicLoad!(MemoryOrder.acq)(); 113 } 114 static void setup() { 115 import std.exception : ErrnoException; 116 if (pipe(cast(int[2])selfPipe) == -1) 117 throw new ErrnoException("Failed to create self-pipe"); 118 } 119 } 120 private static void shutdown() { 121 if (atomicLoad!(MemoryOrder.acq)(signalStopSource) !is null) 122 SignalHandler.notify(ABORT); 123 } 124 private static shared StopSource signalStopSource; 125 private static shared Thread handlerThread; 126 static void launchHandlerThread() { 127 if (handlerThread.atomicLoad !is null) 128 return; 129 130 auto thread = new Thread((){ 131 for(;;) { 132 if (SignalHandler.await() == ABORT) { 133 return; 134 } 135 signalStopSource.stop(); 136 } 137 }); 138 // This has to be a daemon thread otherwise the runtime will wait on it before calling the shared module destructor that stops it. 139 thread.isDaemon = true; 140 141 if (atomicExchange(&handlerThread, cast(shared)thread) !is null) 142 return; // someone beat us to it 143 144 thread.start(); 145 } 146 } 147 148 /// This is required to properly shutdown in the presence of sanitizers 149 shared static ~this() { 150 import core.atomic : atomicExchange; 151 import core.thread : Thread; 152 SignalHandler.shutdown(); 153 if (auto thread = atomicExchange(&SignalHandler.handlerThread, null)) 154 (cast()thread).join(); 155 } 156 157 version (Windows) { 158 import core.sys.windows.windows; 159 extern (Windows) static BOOL signalHandler(DWORD dwCtrlType) nothrow @system { 160 import core.sys.posix.signal; 161 if (dwCtrlType == CTRL_C_EVENT || 162 dwCtrlType == CTRL_BREAK_EVENT || 163 dwCtrlType == CTRL_CLOSE_EVENT || 164 dwCtrlType == CTRL_SHUTDOWN_EVENT) { 165 SignalHandler.notify(SIGINT); 166 return TRUE; 167 } 168 return FALSE; 169 } 170 } else { 171 extern (C) static void signalHandler(int i) nothrow @nogc { 172 SignalHandler.notify(i); 173 } 174 }