1 module concurrency.asyncscope;
2 
3 import concurrency.stoptoken;
4 
5 enum Flag {
6   locked = 0,
7   stopped = 1,
8   tick = 2
9 }
10 
11 auto asyncScope() @safe {
12   // ensure NRVO
13   auto as = shared AsyncScope(new shared StopSource());
14   return as;
15 }
16 
17 struct AsyncScope {
18 private:
19   import concurrency.bitfield : SharedBitField;
20   import concurrency.sender : Promise;
21 
22   shared SharedBitField!Flag flag;
23   shared Promise!void completion;
24   shared StopSource stopSource;
25   Throwable throwable;
26 
27   void forward() @trusted nothrow shared {
28     import core.atomic : atomicLoad;
29     auto t = throwable.atomicLoad();
30     if (t !is null)
31       completion.error(cast(Throwable)t);
32     else
33       completion.fulfill();
34   }
35 
36   void complete() @safe nothrow shared {
37     auto newState = flag.sub(Flag.tick);
38     if (newState == 1) {
39       forward();
40     }
41   }
42 
43   void setError(Throwable t) @trusted nothrow shared {
44     import core.atomic : cas;
45     cas(&throwable, cast(shared Throwable)null, cast(shared)t);
46     stop();
47     complete();
48   }
49 public:
50   @disable this(ref return scope typeof(this) rhs);
51   @disable this(this);
52   @disable this();
53 
54   ~this() @safe shared {
55     import concurrency : syncWait;
56     import core.atomic : atomicLoad;
57     auto t = throwable.atomicLoad();
58     if (t !is null && (cast(shared(Exception))t) is null)
59       return;
60     if (!completion.isCompleted)
61       cleanup.syncWait();
62   }
63 
64   this(shared StopSource stopSource) @safe shared {
65     completion = new shared Promise!void;
66     this.stopSource = stopSource;
67   }
68 
69   auto cleanup() @safe shared {
70     stop();
71     return completion.sender();
72   }
73 
74   bool stop() nothrow @trusted {
75     return (cast(shared)this).stop();
76   }
77 
78   bool stop() nothrow @trusted shared {
79     import core.atomic : MemoryOrder;
80     if ((flag.load!(MemoryOrder.acq) & Flag.stopped) > 0)
81       return false;
82 
83     auto newState = flag.add(Flag.stopped);
84     if (newState == 1) {
85       forward();
86     }
87     return stopSource.stop();
88   }
89 
90   bool spawn(Sender)(Sender s) shared @trusted {
91     import concurrency.sender : connectHeap;
92     with (flag.update(0, Flag.tick)) {
93       if ((oldState & Flag.stopped) == 1) {
94         complete();
95         return false;
96       }
97       try {
98         s.connectHeap(AsyncScopeReceiver(&this)).start();
99       } catch (Throwable t) {
100         // we are required to catch the throwable here, otherwise
101         // the destructor will wait infinitely for something that
102         // no longer runs
103         // by calling setError we ensure the internal state is correct
104         setError(t);
105         throw t;
106       }
107       return true;
108     }
109   }
110 }
111 
112 struct AsyncScopeReceiver {
113   private shared AsyncScope* s;
114   void setValue() nothrow @safe {
115     s.complete();
116   }
117   void setDone() nothrow @safe {
118     s.complete();
119   }
120   void setError(Throwable t) nothrow @safe {
121     s.setError(t);
122   }
123   auto getStopToken() nothrow @safe {
124     import concurrency.stoptoken : StopToken;
125     return StopToken(s.stopSource);
126   }
127   auto getScheduler() nothrow @safe {
128     return NullScheduler();
129   }
130 }
131 
132 struct NullScheduler {}