1 module concurrency.asyncscope; 2 3 import concurrency.stoptoken; 4 import concurrency.scheduler : NullScheduler; 5 6 private enum Flag { 7 locked = 0, 8 stopped = 1, 9 tick = 2 10 } 11 12 auto asyncScope() @safe { 13 // ensure NRVO 14 auto as = shared AsyncScope(new shared StopSource()); 15 return as; 16 } 17 18 struct AsyncScope { 19 private: 20 import concurrency.bitfield : SharedBitField; 21 import concurrency.sender : Promise; 22 23 shared SharedBitField!Flag flag; 24 shared Promise!void completion; 25 shared StopSource stopSource; 26 Throwable throwable; 27 28 void forward() @trusted nothrow shared { 29 import core.atomic : atomicLoad; 30 auto t = throwable.atomicLoad(); 31 if (t !is null) 32 completion.error(cast(Throwable)t); 33 else 34 completion.fulfill(); 35 } 36 37 void complete() @safe nothrow shared { 38 auto newState = flag.sub(Flag.tick); 39 if (newState == 1) { 40 forward(); 41 } 42 } 43 44 void setError(Throwable t) @trusted nothrow shared { 45 import core.atomic : cas; 46 cas(&throwable, cast(shared Throwable)null, cast(shared)t); 47 stop(); 48 complete(); 49 } 50 public: 51 @disable this(ref return scope typeof(this) rhs); 52 @disable this(this); 53 @disable this(); 54 55 ~this() @safe shared { 56 import concurrency : syncWait; 57 import core.atomic : atomicLoad; 58 auto t = throwable.atomicLoad(); 59 if (t !is null && (cast(shared(Exception))t) is null) 60 return; 61 if (!completion.isCompleted) 62 cleanup.syncWait(); 63 } 64 65 this(shared StopSource stopSource) @safe shared { 66 completion = new shared Promise!void; 67 this.stopSource = stopSource; 68 } 69 70 auto cleanup() @safe shared { 71 stop(); 72 return completion.sender(); 73 } 74 75 bool stop() nothrow @trusted { 76 return (cast(shared)this).stop(); 77 } 78 79 bool stop() nothrow @trusted shared { 80 import core.atomic : MemoryOrder; 81 if ((flag.load!(MemoryOrder.acq) & Flag.stopped) > 0) 82 return false; 83 84 auto newState = flag.add(Flag.stopped); 85 if (newState == 1) { 86 forward(); 87 } 88 return stopSource.stop(); 89 } 90 91 bool spawn(Sender)(Sender s) shared @trusted { 92 import concurrency.sender : connectHeap; 93 with (flag.update(0, Flag.tick)) { 94 if ((oldState & Flag.stopped) == 1) { 95 complete(); 96 return false; 97 } 98 try { 99 s.connectHeap(AsyncScopeReceiver(&this)).start(); 100 } catch (Throwable t) { 101 // we are required to catch the throwable here, otherwise 102 // the destructor will wait infinitely for something that 103 // no longer runs 104 // by calling setError we ensure the internal state is correct 105 setError(t); 106 throw t; 107 } 108 return true; 109 } 110 } 111 } 112 113 struct AsyncScopeReceiver { 114 private shared AsyncScope* s; 115 void setValue() nothrow @safe { 116 s.complete(); 117 } 118 void setDone() nothrow @safe { 119 s.complete(); 120 } 121 void setError(Throwable t) nothrow @safe { 122 s.setError(t); 123 } 124 auto getStopToken() nothrow @safe { 125 import concurrency.stoptoken : StopToken; 126 return StopToken(s.stopSource); 127 } 128 auto getScheduler() nothrow @safe { 129 return NullScheduler(); 130 } 131 }