1 module concurrency.signal;
2 
3 import concurrency.stoptoken;
4 
5 shared(StopSource) globalStopSource() @trusted {
6   import core.atomic : atomicLoad, cas;
7 
8   if (globalSource.atomicLoad is null) {
9     import concurrency.utils : dynamicLoad;
10     auto ptr = getGlobalStopSourcePointer();
11 
12     if (auto source = (*ptr).atomicLoad) {
13       globalSource = source;
14       return globalSource;
15     }
16 
17     auto tmp = new shared StopSource();
18     if (ptr.cas(cast(shared StopSource)null, tmp)) {
19       setupCtrlCHandler(tmp);
20       globalSource = tmp;
21     } else
22       globalSource = (*ptr).atomicLoad;
23   }
24   return globalSource;
25 }
26 
27 /// Returns true if first to set (otherwise it is ignored)
28 bool setGlobalStopSource(shared StopSource stopSource) @safe {
29   import core.atomic : cas;
30   auto ptr = getGlobalStopSourcePointer();
31   if (!ptr.cas(cast(shared StopSource)null, stopSource))
32     return false;
33   globalSource = stopSource;
34   return true;
35 }
36 
37 /// Sets the stopSource to be called when receiving an interrupt
38 void setupCtrlCHandler(shared StopSource stopSource) @trusted {
39   import core.sys.posix.signal;
40   import core.atomic;
41 
42   if (stopSource is null)
43     return;
44 
45   auto old = atomicExchange(&SignalHandler.signalStopSource, stopSource);
46   if (old !is null)
47     return;
48 
49   SignalHandler.setup();
50   SignalHandler.launchHandlerThread();
51   version (Windows) {
52     import core.sys.windows.windows;
53     SetConsoleCtrlHandler(&signalHandler, true);
54   } else {
55     static void handleSignal(int s) @trusted {
56       import core.sys.posix.signal;
57       sigaction_t old;
58       sigset_t sigset;
59       sigemptyset(&sigset);
60       sigaction_t siginfo;
61       siginfo.sa_handler = &signalHandler;
62       siginfo.sa_mask = sigset;
63       siginfo.sa_flags = SA_RESTART;
64       sigaction(s, &siginfo, &old);
65       // TODO: what to do with old?
66     }
67 
68     handleSignal(SIGINT);
69     handleSignal(SIGTERM);
70   }
71 }
72 
73 private static shared StopSource globalSource;
74 
75 // we export this function so that dynamic libraries can load it to access
76 // the host's globalStopSource pointer.
77 // Otherwise they would access their own local instance.
78 // should not be called directly by usercode, instead use `globalStopSource`.
79 export extern(C) shared(StopSource*) concurrency_globalStopSourcePointer() @safe {
80   return &globalSource;
81 }
82 
83 private shared(StopSource*) getGlobalStopSourcePointer() @safe {
84   import concurrency.utils : dynamicLoad;
85   return dynamicLoad!concurrency_globalStopSourcePointer()();
86 }
87 
88 struct SignalHandler {
89   import core.atomic : atomicStore, atomicLoad, MemoryOrder, atomicExchange;
90   import core.thread : Thread;
91   static shared int lastSignal; // last signal received
92   enum int ABORT = -1;
93   version (Windows) {
94     import core.sync.event : Event;
95     private static shared Event event; // used to notify the dedicated thread to shutdown
96     static void notify(int num) nothrow @nogc @trusted {
97       lastSignal.atomicStore!(MemoryOrder.rel)(num);
98       (cast()event).set();
99     }
100     private static int await() nothrow @nogc @trusted {
101       (cast()event).wait();
102       return lastSignal.atomicLoad!(MemoryOrder.acq)();
103     }
104     private static void setup() @trusted {
105       (cast()event).initialize(false, false);
106     }
107   } else version (linux) {
108     import core.sys.posix.unistd : write, read;
109     private static shared int event; // eventfd to notify dedicated thread
110     static void notify(int num) nothrow @nogc {
111       lastSignal.atomicStore!(MemoryOrder.rel)(num);
112       ulong b = 1;
113       write(event, &b, typeof(b).sizeof);
114     }
115     private static int await() nothrow @nogc {
116       ulong b;
117       while(read(event, &b, typeof(b).sizeof) != typeof(b).sizeof) {}
118       return lastSignal.atomicLoad!(MemoryOrder.acq)();
119     }
120     private static void setup() {
121       import core.sys.linux.sys.eventfd;
122       event = eventfd(0, EFD_CLOEXEC);
123     }
124   } else version (Posix) {
125     import core.sys.posix.unistd : write, read, pipe;
126     private static shared int[2] selfPipe; // self pipe to notify dedicated thread
127     static void notify(int num) nothrow @nogc {
128       lastSignal.atomicStore!(MemoryOrder.rel)(num);
129       ulong b = 1;
130       write(selfPipe[1], &b, typeof(b).sizeof);
131     }
132     private static int await() nothrow @nogc {
133       ulong b;
134       while(read(cast()selfPipe[0], &b, typeof(b).sizeof) != typeof(b).sizeof) {}
135       return lastSignal.atomicLoad!(MemoryOrder.acq)();
136     }
137     private static void setup() {
138       import std.exception : ErrnoException;
139       if (pipe(cast(int[2])selfPipe) == -1)
140         throw new ErrnoException("Failed to create self-pipe");
141     }
142   }
143   private static void shutdown() {
144     if (atomicLoad!(MemoryOrder.acq)(signalStopSource) !is null)
145       SignalHandler.notify(ABORT);
146   }
147   private static shared StopSource signalStopSource;
148   private static shared Thread handlerThread;
149   private static void launchHandlerThread() {
150     if (handlerThread.atomicLoad !is null)
151       return;
152 
153     auto thread = new Thread((){
154         for(;;) {
155           if (SignalHandler.await() == ABORT) {
156             return;
157           }
158           signalStopSource.stop();
159         }
160       });
161     // This has to be a daemon thread otherwise the runtime will wait on it before calling the shared module destructor that stops it.
162     thread.isDaemon = true;
163 
164     if (atomicExchange(&handlerThread, cast(shared)thread) !is null)
165       return; // someone beat us to it
166 
167     thread.start();
168   }
169 }
170 
171 /// This is required to properly shutdown in the presence of sanitizers
172 shared static ~this() {
173   import core.atomic : atomicExchange;
174   import core.thread : Thread;
175   SignalHandler.shutdown();
176   if (auto thread = atomicExchange(&SignalHandler.handlerThread, null))
177     (cast()thread).join();
178 }
179 
180 version (Windows) {
181   import core.sys.windows.windows;
182   extern (Windows) static BOOL signalHandler(DWORD dwCtrlType) nothrow @system {
183     import core.sys.posix.signal;
184     if (dwCtrlType == CTRL_C_EVENT ||
185         dwCtrlType == CTRL_BREAK_EVENT ||
186         dwCtrlType == CTRL_CLOSE_EVENT ||
187         dwCtrlType == CTRL_SHUTDOWN_EVENT) {
188       SignalHandler.notify(SIGINT);
189       return TRUE;
190     }
191     return FALSE;
192   }
193 } else {
194   extern (C) static void signalHandler(int i) nothrow @nogc {
195     SignalHandler.notify(i);
196   }
197 }