1 module ut.concurrency.sender; 2 3 import concurrency; 4 import concurrency.sender; 5 import concurrency.thread; 6 import concurrency.operations; 7 import concurrency.receiver; 8 import unit_threaded; 9 import core.atomic : atomicOp; 10 11 @("syncWait.value") 12 @safe unittest { 13 ValueSender!(int)(5).syncWait.value.shouldEqual(5); 14 whenAll(just(5), ThrowingSender()).syncWait.value.shouldThrow(); 15 whenAll(just(5), DoneSender()).syncWait.value.shouldThrow(); 16 } 17 18 @("syncWait.assumeOk") 19 @safe unittest { 20 ThrowingSender().syncWait.assumeOk.shouldThrow(); 21 DoneSender().syncWait.assumeOk.shouldThrow(); 22 ErrorSender(new Exception("Failure")).syncWait.assumeOk.shouldThrow(); 23 } 24 25 @("syncWait.match") 26 @safe unittest { 27 ValueSender!(int)(5).syncWait.match!((int i) => true, (ref t) => false).should == true; 28 } 29 30 @("syncWait.match.void") 31 @safe unittest { 32 VoidSender().syncWait.match!((typeof(null)) => true, (ref t) => false).should == true; 33 } 34 35 @("value.start.attributes.1") 36 @safe nothrow @nogc unittest { 37 ValueSender!(int)(5).connect(NullReceiver!int()).start(); 38 } 39 40 @("value.start.attributes.2") 41 @safe nothrow unittest { 42 ValueSender!(int)(5).connect(ThrowingNullReceiver!int()).start(); 43 } 44 45 @("value.void") 46 @safe unittest { 47 ValueSender!void().syncWait().isOk.should == true; 48 } 49 50 @("syncWait.thread") 51 @safe unittest { 52 ThreadSender().syncWait.isOk.should == true; 53 } 54 55 @("syncWait.thread.then.value") 56 @safe unittest { 57 ThreadSender().then(() shared => 2*3).syncWait.value.shouldEqual(6); 58 } 59 60 @("syncWait.thread.then.exception") 61 @safe unittest { 62 bool delegate() @safe shared dg = () shared { throw new Exception("Exceptions are rethrown"); }; 63 ThreadSender() 64 .then(dg) 65 .syncWait() 66 .isError.should == true; 67 } 68 69 @("toSenderObject.value") 70 @safe unittest { 71 ValueSender!(int)(4).toSenderObject.syncWait.value.shouldEqual(4); 72 } 73 74 @("toSenderObject.thread") 75 @safe unittest { 76 ThreadSender().then(() shared => 2*3+1).toSenderObject.syncWait.value.shouldEqual(7); 77 } 78 79 @("via.threadsender.error") 80 @safe unittest { 81 ThrowingSender().via(ThreadSender()).syncWait().isError.should == true; 82 } 83 84 @("toShared.basic") 85 @safe unittest { 86 import std.typecons : tuple; 87 88 shared int g; 89 90 auto s = just(1) 91 .then((int i) @trusted shared { return g.atomicOp!"+="(1); }) 92 .toShared(); 93 94 whenAll(s, s).syncWait.value.should == tuple(1,1); 95 race(s, s).syncWait.value.should == 1; 96 s.syncWait.value.should == 1; 97 s.syncWait.value.should == 1; 98 99 s.reset(); 100 s.syncWait.value.should == 2; 101 s.syncWait.value.should == 2; 102 whenAll(s, s).syncWait.value.should == tuple(2,2); 103 race(s, s).syncWait.value.should == 2; 104 } 105 106 @("toShared.via.thread") 107 @safe unittest { 108 import concurrency.operations.toshared; 109 110 shared int g; 111 112 auto s = just(1) 113 .then((int i) @trusted shared { return g.atomicOp!"+="(1); }) 114 .via(ThreadSender()) 115 .toShared(); 116 117 s.syncWait.value.should == 1; 118 s.syncWait.value.should == 1; 119 120 s.reset(); 121 s.syncWait.value.should == 2; 122 s.syncWait.value.should == 2; 123 } 124 125 @("toShared.error") 126 @safe unittest { 127 shared int g; 128 129 auto s = VoidSender() 130 .then(() @trusted shared { g.atomicOp!"+="(1); throw new Exception("Error"); }) 131 .toShared(); 132 133 s.syncWait.assumeOk.shouldThrowWithMessage("Error"); 134 g.should == 1; 135 s.syncWait.assumeOk.shouldThrowWithMessage("Error"); 136 g.should == 1; 137 138 race(s, s).syncWait.assumeOk.shouldThrowWithMessage("Error"); 139 g.should == 1; 140 141 s.reset(); 142 s.syncWait.assumeOk.shouldThrowWithMessage("Error"); 143 g.should == 2; 144 } 145 146 @("toShared.done") 147 @safe unittest { 148 shared int g; 149 150 auto s = DoneSender() 151 .via(VoidSender() 152 .then(() @trusted shared { g.atomicOp!"+="(1); })) 153 .toShared(); 154 155 s.syncWait.isCancelled.should == true; 156 g.should == 1; 157 s.syncWait.isCancelled.should == true; 158 g.should == 1; 159 160 race(s, s).syncWait.isCancelled.should == true; 161 g.should == 1; 162 163 s.reset(); 164 s.syncWait.isCancelled.should == true; 165 g.should == 2; 166 } 167 168 @("toShared.stop") 169 @safe unittest { 170 import concurrency.stoptoken; 171 import core.atomic : atomicStore, atomicLoad; 172 shared bool g; 173 174 auto waiting = ThreadSender().withStopToken((StopToken token) @trusted { 175 while (!token.isStopRequested) { } 176 g.atomicStore(true); 177 }); 178 auto source = new StopSource(); 179 auto stopper = just(source).then((StopSource source) shared { source.stop(); }); 180 181 whenAll(waiting.toShared().withStopSource(source), stopper).syncWait.isCancelled.should == true; 182 183 g.atomicLoad.should == true; 184 } 185 186 @("toShared.scheduler") 187 @safe unittest { 188 import core.time : msecs; 189 // by default toShared doesn't support scheduling 190 static assert(!__traits(compiles, { DelaySender(1.msecs).toShared().syncWait().isOk.should == true; })); 191 // have to pass scheduler explicitly 192 import concurrency.scheduler : localThreadScheduler; 193 DelaySender(1.msecs).toShared(localThreadScheduler).syncWait().isOk.should == true; 194 } 195 196 @("nvro") 197 @safe unittest { 198 static struct Op(Receiver) { 199 Receiver receiver; 200 void* atConstructor; 201 @disable this(ref return scope typeof(this) rhs); 202 this(Receiver receiver) @trusted { 203 this.receiver = receiver; 204 atConstructor = cast(void*)&this; 205 } 206 void start() @trusted nothrow { 207 void* atStart = cast(void*)&this; 208 receiver.setValue(atConstructor == atStart); 209 } 210 } 211 static struct NRVOSender { 212 alias Value = bool; 213 auto connect(Receiver)(return Receiver receiver) @safe scope return { 214 // ensure NRVO 215 auto op = Op!Receiver(receiver); 216 return op; 217 } 218 } 219 NRVOSender().syncWait().isOk.should == true; 220 NRVOSender().via(ThreadSender()).syncWait().isOk.should == true; 221 whenAll(NRVOSender(),VoidSender()).syncWait.isOk.should == true; 222 whenAll(VoidSender(),NRVOSender()).syncWait.isOk.should == true; 223 race(NRVOSender(),NRVOSender()).syncWait.isOk.should == true; 224 } 225 226 @("justFrom") 227 @safe unittest { 228 justFrom(() shared =>42).syncWait.value.should == 42; 229 } 230 231 @("delay") 232 @safe unittest { 233 import core.time : msecs; 234 235 race(delay(2.msecs).then(() shared => 2), 236 delay(1.msecs).then(() shared => 1)).syncWait.value.should == 1; 237 } 238 239 @("promise.basic") 240 @safe unittest { 241 auto prom = promise!int; 242 auto cont = prom.then((int i) => i * 2); 243 auto runner = justFrom(() shared => prom.fulfill(72)); 244 245 whenAll(cont, runner).syncWait.value.should == 144; 246 } 247 248 @("promise.double") 249 @safe unittest { 250 import std.typecons : tuple; 251 auto prom = promise!int; 252 auto cont = prom.then((int i) => i * 2); 253 auto runner = justFrom(() shared => prom.fulfill(72)); 254 255 whenAll(cont, cont, runner).syncWait.value.should == tuple(144, 144); 256 } 257 258 @("promise.scheduler") 259 @safe unittest { 260 import std.typecons : tuple; 261 auto prom = promise!int; 262 auto pool = stdTaskPool(2); 263 264 auto cont = prom.forwardOn(pool.getScheduler).then((int i) => i * 2); 265 auto runner = justFrom(() shared => prom.fulfill(72)).via(ThreadSender()); 266 267 whenAll(cont, cont, runner).syncWait.value.should == tuple(144, 144); 268 } 269 270 @("just.tuple") 271 @safe unittest { 272 import std.typecons : tuple; 273 import concurrency.stoptoken; 274 just(14, 52).syncWait.value.should == tuple(14, 52); 275 just(14, 53).then((int a, int b) => a*b).syncWait.value.should == 742; 276 just(14, 54).withStopToken((StopToken s, int a, int b) => a*b).syncWait.value.should == 756; 277 }