1 module ut.concurrency.sender; 2 3 import concurrency; 4 import concurrency.sender; 5 import concurrency.thread; 6 import concurrency.operations; 7 import concurrency.receiver; 8 import unit_threaded; 9 import core.atomic : atomicOp; 10 11 @("syncWait.value") 12 @safe unittest { 13 ValueSender!(int)(5).syncWait.value.shouldEqual(5); 14 whenAll(just(5), ThrowingSender()).syncWait.value.shouldThrow(); 15 whenAll(just(5), DoneSender()).syncWait.value.shouldThrow(); 16 } 17 18 @("syncWait.assumeOk") 19 @safe unittest { 20 ThrowingSender().syncWait.assumeOk.shouldThrow(); 21 DoneSender().syncWait.assumeOk.shouldThrow(); 22 ErrorSender(new Exception("Failure")).syncWait.assumeOk.shouldThrow(); 23 } 24 25 @("syncWait.match") 26 @safe unittest { 27 ValueSender!(int)(5).syncWait.match!((int i) => true, (ref t) => false).should == true; 28 } 29 30 @("syncWait.match.void") 31 @safe unittest { 32 VoidSender().syncWait.match!((typeof(null)) => true, (ref t) => false).should == true; 33 } 34 35 @("syncWait.nested.basic") 36 @safe unittest { 37 import concurrency.stoptoken; 38 auto source = new shared StopSource(); 39 40 justFrom(() shared { 41 VoidSender().withStopToken((StopToken token){ 42 source.stop(); 43 token.isStopRequested.should == true; 44 }).syncWait().isCancelled.should == true; 45 }).syncWait(source).isCancelled.should == true; 46 } 47 48 @("syncWait.nested.thread") 49 @safe unittest { 50 import concurrency.stoptoken; 51 auto source = new shared StopSource(); 52 53 justFrom(() shared { 54 VoidSender().withStopToken((StopToken token){ 55 source.stop(); 56 token.isStopRequested.should == true; 57 }).syncWait().isCancelled.should == true; 58 }).via(ThreadSender()).syncWait(source).isCancelled.should == true; 59 } 60 61 @("syncWait.nested.threadpool") 62 @safe unittest { 63 import concurrency.stoptoken; 64 auto source = new shared StopSource(); 65 66 auto pool = stdTaskPool(2); 67 68 justFrom(() shared { 69 VoidSender().withStopToken((StopToken token){ 70 source.stop(); 71 token.isStopRequested.should == true; 72 }).syncWait().isCancelled.should == true; 73 }).via(pool.getScheduler().schedule()).syncWait(source).isCancelled.should == true; 74 } 75 76 @("value.start.attributes.1") 77 @safe nothrow @nogc unittest { 78 ValueSender!(int)(5).connect(NullReceiver!int()).start(); 79 } 80 81 @("value.start.attributes.2") 82 @safe nothrow unittest { 83 ValueSender!(int)(5).connect(ThrowingNullReceiver!int()).start(); 84 } 85 86 @("value.void") 87 @safe unittest { 88 ValueSender!void().syncWait().assumeOk; 89 } 90 91 @("syncWait.thread") 92 @safe unittest { 93 ThreadSender().syncWait.assumeOk; 94 } 95 96 @("syncWait.thread.then.value") 97 @safe unittest { 98 ThreadSender().then(() shared => 2*3).syncWait.value.shouldEqual(6); 99 } 100 101 @("syncWait.thread.then.exception") 102 @safe unittest { 103 bool delegate() @safe shared dg = () shared { throw new Exception("Exceptions are forwarded"); }; 104 ThreadSender() 105 .then(dg) 106 .syncWait() 107 .isError.should == true; 108 } 109 110 @("toSenderObject.value") 111 @safe unittest { 112 ValueSender!(int)(4).toSenderObject.syncWait.value.shouldEqual(4); 113 } 114 115 @("toSenderObject.thread") 116 @safe unittest { 117 ThreadSender().then(() shared => 2*3+1).toSenderObject.syncWait.value.shouldEqual(7); 118 } 119 120 @("via.threadsender.error") 121 @safe unittest { 122 ThrowingSender().via(ThreadSender()).syncWait().isError.should == true; 123 } 124 125 @("toShared.basic") 126 @safe unittest { 127 import std.typecons : tuple; 128 129 shared int g; 130 131 auto s = just(1) 132 .then((int i) @trusted shared { return g.atomicOp!"+="(1); }) 133 .toShared(); 134 135 whenAll(s, s).syncWait.value.should == tuple(1,1); 136 race(s, s).syncWait.value.should == 1; 137 s.syncWait.value.should == 1; 138 s.syncWait.value.should == 1; 139 140 s.reset(); 141 s.syncWait.value.should == 2; 142 s.syncWait.value.should == 2; 143 whenAll(s, s).syncWait.value.should == tuple(2,2); 144 race(s, s).syncWait.value.should == 2; 145 } 146 147 @("toShared.via.thread") 148 @safe unittest { 149 import concurrency.operations.toshared; 150 151 shared int g; 152 153 auto s = just(1) 154 .then((int i) @trusted shared { return g.atomicOp!"+="(1); }) 155 .via(ThreadSender()) 156 .toShared(); 157 158 s.syncWait.value.should == 1; 159 s.syncWait.value.should == 1; 160 161 s.reset(); 162 s.syncWait.value.should == 2; 163 s.syncWait.value.should == 2; 164 } 165 166 @("toShared.error") 167 @safe unittest { 168 shared int g; 169 170 auto s = VoidSender() 171 .then(() @trusted shared { g.atomicOp!"+="(1); throw new Exception("Error"); }) 172 .toShared(); 173 174 s.syncWait.assumeOk.shouldThrowWithMessage("Error"); 175 g.should == 1; 176 s.syncWait.assumeOk.shouldThrowWithMessage("Error"); 177 g.should == 1; 178 179 race(s, s).syncWait.assumeOk.shouldThrowWithMessage("Error"); 180 g.should == 1; 181 182 s.reset(); 183 s.syncWait.assumeOk.shouldThrowWithMessage("Error"); 184 g.should == 2; 185 } 186 187 @("toShared.done") 188 @safe unittest { 189 shared int g; 190 191 auto s = DoneSender() 192 .via(VoidSender() 193 .then(() @trusted shared { g.atomicOp!"+="(1); })) 194 .toShared(); 195 196 s.syncWait.isCancelled.should == true; 197 g.should == 1; 198 s.syncWait.isCancelled.should == true; 199 g.should == 1; 200 201 race(s, s).syncWait.isCancelled.should == true; 202 g.should == 1; 203 204 s.reset(); 205 s.syncWait.isCancelled.should == true; 206 g.should == 2; 207 } 208 209 @("toShared.stop") 210 @safe unittest { 211 import concurrency.stoptoken; 212 import core.atomic : atomicStore, atomicLoad; 213 shared bool g; 214 215 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 216 while (!token.isStopRequested) { } 217 g.atomicStore(true); 218 }); 219 auto source = new StopSource(); 220 auto stopper = just(source).then((StopSource source) shared { source.stop(); }); 221 222 whenAll(waiting.toShared().withStopSource(source), stopper).syncWait.isCancelled.should == true; 223 224 g.atomicLoad.should == true; 225 } 226 227 @("toShared.scheduler") 228 @safe unittest { 229 import core.time : msecs; 230 // by default toShared doesn't support scheduling 231 static assert(!__traits(compiles, { DelaySender(1.msecs).toShared().syncWait().assumeOk; })); 232 // have to pass scheduler explicitly 233 import concurrency.scheduler : localThreadScheduler; 234 DelaySender(1.msecs).toShared(localThreadScheduler).syncWait().assumeOk; 235 } 236 237 @("toShared.nursery") 238 @safe unittest { 239 /// just see if we can instantiate 240 import concurrency.nursery; 241 import concurrency.scheduler; 242 auto n = new shared Nursery(); 243 auto s = n.toShared(localThreadScheduler()); 244 } 245 246 @("nvro") 247 @safe unittest { 248 static struct Op(Receiver) { 249 Receiver receiver; 250 void* atConstructor; 251 @disable this(ref return scope typeof(this) rhs); 252 this(Receiver receiver) @trusted { 253 this.receiver = receiver; 254 atConstructor = cast(void*)&this; 255 } 256 void start() @trusted nothrow { 257 void* atStart = cast(void*)&this; 258 receiver.setValue(atConstructor == atStart); 259 } 260 } 261 static struct NRVOSender { 262 alias Value = bool; 263 auto connect(Receiver)(return Receiver receiver) @safe scope return { 264 // ensure NRVO 265 auto op = Op!Receiver(receiver); 266 return op; 267 } 268 } 269 NRVOSender().syncWait().assumeOk; 270 NRVOSender().via(ThreadSender()).syncWait().assumeOk; 271 whenAll(NRVOSender(),VoidSender()).syncWait.assumeOk; 272 whenAll(VoidSender(),NRVOSender()).syncWait.assumeOk; 273 race(NRVOSender(),NRVOSender()).syncWait.assumeOk; 274 } 275 276 @("justFrom") 277 @safe unittest { 278 justFrom(() shared =>42).syncWait.value.should == 42; 279 } 280 281 @("justFrom.exception") 282 @safe unittest { 283 justFrom(() shared { throw new Exception("failure"); }).syncWait.isError.should == true; 284 } 285 286 @("delay") 287 @safe unittest { 288 import core.time : msecs; 289 290 race(delay(20.msecs).then(() shared => 2), 291 delay(1.msecs).then(() shared => 1)).syncWait.value.should == 1; 292 } 293 294 @("promise.basic") 295 @safe unittest { 296 auto prom = promise!int; 297 auto cont = prom.then((int i) => i * 2); 298 auto runner = justFrom(() shared => prom.fulfill(72)); 299 300 whenAll(cont, runner).syncWait.value.should == 144; 301 } 302 303 @("promise.double") 304 @safe unittest { 305 import std.typecons : tuple; 306 auto prom = promise!int; 307 auto cont = prom.then((int i) => i * 2); 308 auto runner = justFrom(() shared => prom.fulfill(72)); 309 310 whenAll(cont, cont, runner).syncWait.value.should == tuple(144, 144); 311 } 312 313 @("promise.scheduler") 314 @safe unittest { 315 import std.typecons : tuple; 316 auto prom = promise!int; 317 auto pool = stdTaskPool(2); 318 319 auto cont = prom.forwardOn(pool.getScheduler).then((int i) => i * 2); 320 auto runner = justFrom(() shared => prom.fulfill(72)).via(ThreadSender()); 321 322 whenAll(cont, cont, runner).syncWait.value.should == tuple(144, 144); 323 } 324 325 @("just.tuple") 326 @safe unittest { 327 import std.typecons : tuple; 328 import concurrency.stoptoken; 329 just(14, 52).syncWait.value.should == tuple(14, 52); 330 just(14, 53).then((int a, int b) => a*b).syncWait.value.should == 742; 331 just(14, 54).withStopToken((StopToken s, int a, int b) => a*b).syncWait.value.should == 756; 332 }