1 module ut.concurrency.sender; 2 3 import concurrency; 4 import concurrency.sender; 5 import concurrency.thread; 6 import concurrency.operations; 7 import concurrency.receiver; 8 import unit_threaded; 9 import core.atomic : atomicOp; 10 11 @("syncWait.value") 12 @safe unittest { 13 ValueSender!(int)(5).syncWait.value.shouldEqual(5); 14 } 15 16 @("syncWait.match") 17 @safe unittest { 18 ValueSender!(int)(5).syncWait.match!((int i) => true, (ref t) => false).should == true; 19 } 20 21 @("syncWait.match.void") 22 @safe unittest { 23 VoidSender().syncWait.match!((typeof(null)) => true, (ref t) => false).should == true; 24 } 25 26 @("value.start.attributes.1") 27 @safe nothrow @nogc unittest { 28 ValueSender!(int)(5).connect(NullReceiver!int()).start(); 29 } 30 31 @("value.start.attributes.2") 32 @safe nothrow unittest { 33 ValueSender!(int)(5).connect(ThrowingNullReceiver!int()).start(); 34 } 35 36 @("value.void") 37 @safe unittest { 38 ValueSender!void().syncWait().isOk.should == true; 39 } 40 41 @("syncWait.thread") 42 @safe unittest { 43 ThreadSender().syncWait.isOk.should == true; 44 } 45 46 @("syncWait.thread.then.value") 47 @safe unittest { 48 ThreadSender().then(() shared => 2*3).syncWait.value.shouldEqual(6); 49 } 50 51 @("syncWait.thread.then.exception") 52 @safe unittest { 53 bool delegate() @safe shared dg = () shared { throw new Exception("Exceptions are rethrown"); }; 54 ThreadSender() 55 .then(dg) 56 .syncWait() 57 .isError.should == true; 58 } 59 60 @("toSenderObject.value") 61 @safe unittest { 62 ValueSender!(int)(4).toSenderObject.syncWait.value.shouldEqual(4); 63 } 64 65 @("toSenderObject.thread") 66 @safe unittest { 67 ThreadSender().then(() shared => 2*3+1).toSenderObject.syncWait.value.shouldEqual(7); 68 } 69 70 @("via.threadsender.error") 71 @safe unittest { 72 ThrowingSender().via(ThreadSender()).syncWait().isError.should == true; 73 } 74 75 @("toShared.basic") 76 @safe unittest { 77 import std.typecons : tuple; 78 79 shared int g; 80 81 auto s = just(1) 82 .then((int i) @trusted shared { return g.atomicOp!"+="(1); }) 83 .toShared(); 84 85 whenAll(s, s).syncWait.value.should == tuple(1,1); 86 race(s, s).syncWait.value.should == 1; 87 s.syncWait.value.should == 1; 88 s.syncWait.value.should == 1; 89 90 s.reset(); 91 s.syncWait.value.should == 2; 92 s.syncWait.value.should == 2; 93 whenAll(s, s).syncWait.value.should == tuple(2,2); 94 race(s, s).syncWait.value.should == 2; 95 } 96 97 @("toShared.via.thread") 98 @safe unittest { 99 import concurrency.operations.toshared; 100 101 shared int g; 102 103 auto s = just(1) 104 .then((int i) @trusted shared { return g.atomicOp!"+="(1); }) 105 .via(ThreadSender()) 106 .toShared(); 107 108 race(s, s).syncWait.value.should == 1; 109 s.reset(); 110 race(s, s).syncWait.value.should == 2; 111 } 112 113 @("toShared.error") 114 @safe unittest { 115 shared int g; 116 117 auto s = VoidSender() 118 .then(() @trusted shared { g.atomicOp!"+="(1); throw new Exception("Error"); }) 119 .toShared(); 120 121 s.syncWait.assumeOk.shouldThrowWithMessage("Error"); 122 g.should == 1; 123 s.syncWait.assumeOk.shouldThrowWithMessage("Error"); 124 g.should == 1; 125 126 race(s, s).syncWait.assumeOk.shouldThrowWithMessage("Error"); 127 g.should == 1; 128 129 s.reset(); 130 s.syncWait.assumeOk.shouldThrowWithMessage("Error"); 131 g.should == 2; 132 } 133 134 @("toShared.done") 135 @safe unittest { 136 shared int g; 137 138 auto s = DoneSender() 139 .via(VoidSender() 140 .then(() @trusted shared { g.atomicOp!"+="(1); })) 141 .toShared(); 142 143 s.syncWait.isCancelled.should == true; 144 g.should == 1; 145 s.syncWait.isCancelled.should == true; 146 g.should == 1; 147 148 race(s, s).syncWait.isCancelled.should == true; 149 g.should == 1; 150 151 s.reset(); 152 s.syncWait.isCancelled.should == true; 153 g.should == 2; 154 } 155 156 @("toShared.stop") 157 @safe unittest { 158 import concurrency.stoptoken; 159 import core.atomic : atomicStore, atomicLoad; 160 shared bool g; 161 162 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 163 while (!token.isStopRequested) { } 164 g.atomicStore(true); 165 }); 166 auto source = new StopSource(); 167 auto stopper = just(source).then((StopSource source) shared { source.stop(); }); 168 169 whenAll(waiting.toShared().withStopSource(source), stopper).syncWait.isCancelled.should == true; 170 171 g.atomicLoad.should == true; 172 } 173 174 @("toShared.scheduler") 175 @safe unittest { 176 import core.time : msecs; 177 // by default toShared doesn't support scheduling 178 static assert(!__traits(compiles, { DelaySender(1.msecs).toShared().syncWait().isOk.should == true; })); 179 // have to pass scheduler explicitly 180 import concurrency.scheduler : localThreadScheduler; 181 DelaySender(1.msecs).toShared(localThreadScheduler).syncWait().isOk.should == true; 182 } 183 184 @("nvro") 185 @safe unittest { 186 static struct Op(Receiver) { 187 Receiver receiver; 188 void* atConstructor; 189 @disable this(ref return scope typeof(this) rhs); 190 this(Receiver receiver) @trusted { 191 this.receiver = receiver; 192 atConstructor = cast(void*)&this; 193 } 194 void start() @trusted nothrow { 195 void* atStart = cast(void*)&this; 196 receiver.setValue(atConstructor == atStart); 197 } 198 } 199 static struct NRVOSender { 200 alias Value = bool; 201 auto connect(Receiver)(return Receiver receiver) @safe scope return { 202 // ensure NRVO 203 auto op = Op!Receiver(receiver); 204 return op; 205 } 206 } 207 NRVOSender().syncWait().isOk.should == true; 208 NRVOSender().via(ThreadSender()).syncWait().isOk.should == true; 209 whenAll(NRVOSender(),VoidSender()).syncWait.isOk.should == true; 210 whenAll(VoidSender(),NRVOSender()).syncWait.isOk.should == true; 211 race(NRVOSender(),NRVOSender()).syncWait.isOk.should == true; 212 } 213 214 @("justFrom") 215 @safe unittest { 216 justFrom(() shared =>42).syncWait.value.should == 42; 217 } 218 219 @("delay") 220 @safe unittest { 221 import core.time : msecs; 222 223 race(delay(2.msecs).then(() shared => 2), 224 delay(1.msecs).then(() shared => 1)).syncWait.value.should == 1; 225 }