1 module concurrency.signal;
2 
3 import concurrency.stoptoken;
4 
5 shared(StopSource) globalStopSource() @trusted {
6   import core.atomic : atomicLoad;
7   static StopSource localSource; // can't be shared else it is global
8   if (localSource !is null)
9     return cast(shared)localSource;
10 
11   if (globalSource.atomicLoad is null) {
12     auto tmp = new shared StopSource();
13     if (setGlobalStopSource(tmp)) {
14       setupCtrlCHandler(tmp);
15     }
16   }
17   localSource = cast()globalSource;
18   return cast(shared)localSource;
19 }
20 
21 /// Returns true if first to set (otherwise it is ignored)
22 bool setGlobalStopSource(shared StopSource stopSource) @safe {
23   import core.atomic : atomicExchange;
24   return atomicExchange(&globalSource, stopSource) is null;
25 }
26 
27 /// Sets the stopSource to be called when receiving an interrupt
28 void setupCtrlCHandler(shared StopSource stopSource) @trusted {
29   import core.sys.posix.signal;
30   import core.atomic;
31 
32   if (stopSource is null)
33     return;
34 
35   auto old = atomicExchange(&SignalHandler.signalStopSource, stopSource);
36   if (old !is null)
37     return;
38 
39   SignalHandler.setup();
40   SignalHandler.launchHandlerThread();
41   version (Windows) {
42     import core.sys.windows.windows;
43     SetConsoleCtrlHandler(&signalHandler, true);
44   } else {
45     static void handleSignal(int s) @trusted {
46       import core.sys.posix.signal;
47       sigaction_t old;
48       sigset_t sigset;
49       sigemptyset(&sigset);
50       sigaction_t siginfo;
51       siginfo.sa_handler = &signalHandler;
52       siginfo.sa_mask = sigset;
53       siginfo.sa_flags = SA_RESTART;
54       sigaction(s, &siginfo, &old);
55       // TODO: what to do with old?
56     }
57 
58     handleSignal(SIGINT);
59     handleSignal(SIGTERM);
60   }
61 }
62 
63 private static shared StopSource globalSource;
64 
65 struct SignalHandler {
66   import core.atomic : atomicStore, atomicLoad, MemoryOrder, atomicExchange;
67   import core.thread : Thread;
68   static shared int lastSignal; // last signal received
69   enum int ABORT = -1;
70   version (Windows) {
71     import core.sync.event : Event;
72     private static shared Event event; // used to notify the dedicated thread to shutdown
73     static void notify(int num) nothrow @nogc @trusted {
74       lastSignal.atomicStore!(MemoryOrder.rel)(num);
75       (cast()event).set();
76     }
77     static int await() nothrow @nogc @trusted {
78       (cast()event).wait();
79       return lastSignal.atomicLoad!(MemoryOrder.acq)();
80     }
81     static void setup() @trusted {
82       (cast()event).initialize(false, false);
83     }
84   } else version (linux) {
85     import core.sys.posix.unistd : write, read;
86     private static shared int event; // eventfd to notify dedicated thread
87     static void notify(int num) nothrow @nogc {
88       lastSignal.atomicStore!(MemoryOrder.rel)(num);
89       ulong b = 1;
90       write(event, &b, typeof(b).sizeof);
91     }
92     static int await() nothrow @nogc {
93       ulong b;
94       while(read(event, &b, typeof(b).sizeof) != typeof(b).sizeof) {}
95       return lastSignal.atomicLoad!(MemoryOrder.acq)();
96     }
97     static void setup() {
98       import core.sys.linux.sys.eventfd;
99       event = eventfd(0, EFD_CLOEXEC);
100     }
101   } else version (Posix) {
102     import core.sys.posix.unistd : write, read, pipe;
103     private static shared int[2] selfPipe; // self pipe to notify dedicated thread
104     static void notify(int num) nothrow @nogc {
105       lastSignal.atomicStore!(MemoryOrder.rel)(num);
106       ulong b = 1;
107       write(selfPipe[1], &b, typeof(b).sizeof);
108     }
109     static int await() nothrow @nogc {
110       ulong b;
111       while(read(cast()selfPipe[0], &b, typeof(b).sizeof) != typeof(b).sizeof) {}
112       return lastSignal.atomicLoad!(MemoryOrder.acq)();
113     }
114     static void setup() {
115       import std.exception : ErrnoException;
116       if (pipe(cast(int[2])selfPipe) == -1)
117         throw new ErrnoException("Failed to create self-pipe");
118     }
119   }
120   private static void shutdown() {
121     if (atomicLoad!(MemoryOrder.acq)(signalStopSource) !is null)
122       SignalHandler.notify(ABORT);
123   }
124   private static shared StopSource signalStopSource;
125   private static shared Thread handlerThread;
126   static void launchHandlerThread() {
127     if (handlerThread.atomicLoad !is null)
128       return;
129 
130     auto thread = new Thread((){
131         for(;;) {
132           if (SignalHandler.await() == ABORT) {
133             return;
134           }
135           signalStopSource.stop();
136         }
137       });
138     // This has to be a daemon thread otherwise the runtime will wait on it before calling the shared module destructor that stops it.
139     thread.isDaemon = true;
140 
141     if (atomicExchange(&handlerThread, cast(shared)thread) !is null)
142       return; // someone beat us to it
143 
144     thread.start();
145   }
146 }
147 
148 /// This is required to properly shutdown in the presence of sanitizers
149 shared static ~this() {
150   import core.atomic : atomicExchange;
151   import core.thread : Thread;
152   SignalHandler.shutdown();
153   if (auto thread = atomicExchange(&SignalHandler.handlerThread, null))
154     (cast()thread).join();
155 }
156 
157 version (Windows) {
158   import core.sys.windows.windows;
159   extern (Windows) static BOOL signalHandler(DWORD dwCtrlType) nothrow @system {
160     import core.sys.posix.signal;
161     if (dwCtrlType == CTRL_C_EVENT ||
162         dwCtrlType == CTRL_BREAK_EVENT ||
163         dwCtrlType == CTRL_CLOSE_EVENT ||
164         dwCtrlType == CTRL_SHUTDOWN_EVENT) {
165       SignalHandler.notify(SIGINT);
166       return TRUE;
167     }
168     return FALSE;
169   }
170 } else {
171   extern (C) static void signalHandler(int i) nothrow @nogc {
172     SignalHandler.notify(i);
173   }
174 }