1 module concurrency.asyncscope;
2 
3 import concurrency.stoptoken;
4 import concurrency.scheduler : NullScheduler;
5 
6 private enum Flag {
7   locked = 0,
8   stopped = 1,
9   tick = 2
10 }
11 
12 auto asyncScope() @safe {
13   // ensure NRVO
14   auto as = shared AsyncScope(new shared StopSource());
15   return as;
16 }
17 
18 struct AsyncScope {
19 private:
20   import concurrency.bitfield : SharedBitField;
21   import concurrency.sender : Promise;
22 
23   shared SharedBitField!Flag flag;
24   shared Promise!void completion;
25   shared StopSource stopSource;
26   Throwable throwable;
27 
28   void forward() @trusted nothrow shared {
29     import core.atomic : atomicLoad;
30     auto t = throwable.atomicLoad();
31     if (t !is null)
32       completion.error(cast(Throwable)t);
33     else
34       completion.fulfill();
35   }
36 
37   void complete() @safe nothrow shared {
38     auto newState = flag.sub(Flag.tick);
39     if (newState == 1) {
40       forward();
41     }
42   }
43 
44   void setError(Throwable t) @trusted nothrow shared {
45     import core.atomic : cas;
46     cas(&throwable, cast(shared Throwable)null, cast(shared)t);
47     stop();
48     complete();
49   }
50 public:
51   @disable this(ref return scope typeof(this) rhs);
52   @disable this(this);
53   @disable this();
54 
55   ~this() @safe shared {
56     import concurrency : syncWait;
57     import core.atomic : atomicLoad;
58     auto t = throwable.atomicLoad();
59     if (t !is null && (cast(shared(Exception))t) is null)
60       return;
61     if (!completion.isCompleted)
62       cleanup.syncWait();
63   }
64 
65   this(shared StopSource stopSource) @safe shared {
66     completion = new shared Promise!void;
67     this.stopSource = stopSource;
68   }
69 
70   auto cleanup() @safe shared {
71     stop();
72     return completion.sender();
73   }
74 
75   bool stop() nothrow @trusted {
76     return (cast(shared)this).stop();
77   }
78 
79   bool stop() nothrow @trusted shared {
80     import core.atomic : MemoryOrder;
81     if ((flag.load!(MemoryOrder.acq) & Flag.stopped) > 0)
82       return false;
83 
84     auto newState = flag.add(Flag.stopped);
85     if (newState == 1) {
86       forward();
87     }
88     return stopSource.stop();
89   }
90 
91   bool spawn(Sender)(Sender s) shared @trusted {
92     import concurrency.sender : connectHeap;
93     with (flag.update(0, Flag.tick)) {
94       if ((oldState & Flag.stopped) == 1) {
95         complete();
96         return false;
97       }
98       try {
99         s.connectHeap(AsyncScopeReceiver(&this)).start();
100       } catch (Throwable t) {
101         // we are required to catch the throwable here, otherwise
102         // the destructor will wait infinitely for something that
103         // no longer runs
104         // by calling setError we ensure the internal state is correct
105         setError(t);
106         throw t;
107       }
108       return true;
109     }
110   }
111 }
112 
113 struct AsyncScopeReceiver {
114   private shared AsyncScope* s;
115   void setValue() nothrow @safe {
116     s.complete();
117   }
118   void setDone() nothrow @safe {
119     s.complete();
120   }
121   void setError(Throwable t) nothrow @safe {
122     s.setError(t);
123   }
124   auto getStopToken() nothrow @safe {
125     import concurrency.stoptoken : StopToken;
126     return StopToken(s.stopSource);
127   }
128   auto getScheduler() nothrow @safe {
129     return NullScheduler();
130   }
131 }