1 module concurrency.asyncscope; 2 3 import concurrency.stoptoken; 4 5 enum Flag { 6 locked = 0, 7 stopped = 1, 8 tick = 2 9 } 10 11 auto asyncScope() @safe { 12 // ensure NRVO 13 auto as = shared AsyncScope(new shared StopSource()); 14 return as; 15 } 16 17 struct AsyncScope { 18 private: 19 import concurrency.bitfield : SharedBitField; 20 import concurrency.sender : Promise; 21 22 shared SharedBitField!Flag flag; 23 shared Promise!void completion; 24 shared StopSource stopSource; 25 Throwable throwable; 26 27 void forward() @trusted nothrow shared { 28 import core.atomic : atomicLoad; 29 auto t = throwable.atomicLoad(); 30 if (t !is null) 31 completion.error(cast(Throwable)t); 32 else 33 completion.fulfill(); 34 } 35 36 void complete() @safe nothrow shared { 37 auto newState = flag.sub(Flag.tick); 38 if (newState == 1) { 39 forward(); 40 } 41 } 42 43 void setError(Throwable t) @trusted nothrow shared { 44 import core.atomic : cas; 45 cas(&throwable, cast(shared Throwable)null, cast(shared)t); 46 stop(); 47 complete(); 48 } 49 public: 50 @disable this(ref return scope typeof(this) rhs); 51 @disable this(this); 52 @disable this(); 53 54 ~this() @safe shared { 55 import concurrency : syncWait; 56 import core.atomic : atomicLoad; 57 auto t = throwable.atomicLoad(); 58 if (t !is null && (cast(shared(Exception))t) is null) 59 return; 60 if (!completion.isCompleted) 61 cleanup.syncWait(); 62 } 63 64 this(shared StopSource stopSource) @safe shared { 65 completion = new shared Promise!void; 66 this.stopSource = stopSource; 67 } 68 69 auto cleanup() @safe shared { 70 stop(); 71 return completion.sender(); 72 } 73 74 bool stop() nothrow @trusted { 75 return (cast(shared)this).stop(); 76 } 77 78 bool stop() nothrow @trusted shared { 79 import core.atomic : MemoryOrder; 80 if ((flag.load!(MemoryOrder.acq) & Flag.stopped) > 0) 81 return false; 82 83 auto newState = flag.add(Flag.stopped); 84 if (newState == 1) { 85 forward(); 86 } 87 return stopSource.stop(); 88 } 89 90 bool spawn(Sender)(Sender s) shared @trusted { 91 import concurrency.sender : connectHeap; 92 with (flag.update(0, Flag.tick)) { 93 if ((oldState & Flag.stopped) == 1) { 94 complete(); 95 return false; 96 } 97 try { 98 s.connectHeap(AsyncScopeReceiver(&this)).start(); 99 } catch (Throwable t) { 100 // we are required to catch the throwable here, otherwise 101 // the destructor will wait infinitely for something that 102 // no longer runs 103 // by calling setError we ensure the internal state is correct 104 setError(t); 105 throw t; 106 } 107 return true; 108 } 109 } 110 } 111 112 struct AsyncScopeReceiver { 113 private shared AsyncScope* s; 114 void setValue() nothrow @safe { 115 s.complete(); 116 } 117 void setDone() nothrow @safe { 118 s.complete(); 119 } 120 void setError(Throwable t) nothrow @safe { 121 s.setError(t); 122 } 123 auto getStopToken() nothrow @safe { 124 import concurrency.stoptoken : StopToken; 125 return StopToken(s.stopSource); 126 } 127 auto getScheduler() nothrow @safe { 128 return NullScheduler(); 129 } 130 } 131 132 struct NullScheduler {}