1 module concurrency.stream; 2 3 import concurrency.stoptoken; 4 import concurrency.receiver; 5 public import concurrency.stream.stream; 6 public import concurrency.stream.filter; 7 public import concurrency.stream.take; 8 public import concurrency.stream.transform; 9 public import concurrency.stream.scan; 10 public import concurrency.stream.sample; 11 public import concurrency.stream.tolist; 12 public import concurrency.stream.slide; 13 public import concurrency.stream.throttling; 14 public import concurrency.stream.cycle; 15 public import concurrency.stream.cron; 16 public import concurrency.stream.defer; 17 public import concurrency.stream.flatmapconcat; 18 public import concurrency.stream.flatmaplatest; 19 import concurrency.sender : isSender, OpType; 20 import concepts; 21 import std.traits : hasFunctionAttributes; 22 23 /* 24 catch? 25 combineLatest 26 count 27 debounce 28 distinctUntilChanged 29 drop 30 dropWhile 31 filter 32 first 33 firstOrNull 34 flatMapConcat 35 flatMapLatest 36 flatMapMerge 37 fold 38 map 39 mapLatest 40 merge 41 onEach 42 onEmpty 43 onStart 44 onSubscription 45 reduce (fold with no seed) 46 retry 47 retryWhen 48 runningReduce 49 sample 50 scan (like runningReduce but with initial value) 51 take 52 takeWhile 53 toList 54 transform 55 transformLatest 56 zip 57 */ 58 59 /// Stream that emit the same value until cancelled 60 auto infiniteStream(T)(T t) { 61 alias DG = CollectDelegate!(T); 62 struct Loop { 63 T val; 64 void loop(StopToken)(DG emit, StopToken stopToken) { 65 while(!stopToken.isStopRequested) 66 emit(val); 67 } 68 } 69 return Loop(t).loopStream!T; 70 } 71 72 /// Stream that emits from start..end or until cancelled 73 auto iotaStream(T)(T start, T end) { 74 alias DG = CollectDelegate!(T); 75 struct Loop { 76 T b,e; 77 void loop(StopToken)(DG emit, StopToken stopToken) { 78 foreach(i; b..e) { 79 emit(i); 80 if (stopToken.isStopRequested) 81 break; 82 } 83 } 84 } 85 return Loop(start, end).loopStream!T; 86 } 87 88 /// Stream that emits each value from the array or until cancelled 89 auto arrayStream(T)(T[] arr) { 90 alias DG = CollectDelegate!(T); 91 struct Loop { 92 T[] arr; 93 void loop(StopToken)(DG emit, StopToken stopToken) @safe { 94 foreach(item; arr) { 95 emit(item); 96 if (stopToken.isStopRequested) 97 break; 98 } 99 } 100 } 101 return Loop(arr).loopStream!T; 102 } 103 104 import core.time : Duration; 105 106 auto intervalStream(Duration duration, bool emitAtStart = false) { 107 alias DG = CollectDelegate!(void); 108 static struct ItemReceiver(Op) { 109 Op* op; 110 void setValue() @safe { 111 if (op.receiver.getStopToken.isStopRequested) { 112 op.receiver.setDone(); 113 return; 114 } 115 try { 116 op.dg(); 117 if (op.receiver.getStopToken.isStopRequested) { 118 op.receiver.setDone(); 119 return; 120 } 121 op.load(); 122 } catch (Exception e) { 123 op.receiver.setError(e); 124 } 125 } 126 void setDone() @safe nothrow { 127 op.receiver.setDone(); 128 } 129 void setError(Throwable t) @safe nothrow { 130 op.receiver.setError(t); 131 } 132 auto getStopToken() @safe { 133 return op.receiver.getStopToken(); 134 } 135 auto getScheduler() @safe { 136 return op.receiver.getScheduler(); 137 } 138 } 139 static struct Op(Receiver) { 140 import std.traits : ReturnType; 141 Duration duration; 142 DG dg; 143 Receiver receiver; 144 alias SchedulerAfterSender = ReturnType!(SchedulerType!(Receiver).scheduleAfter); 145 alias Op = OpType!(SchedulerAfterSender, ItemReceiver!(typeof(this))); 146 Op op; 147 bool emitAtStart; 148 @disable this(this); 149 @disable this(ref return scope typeof(this) rhs); 150 this(Duration duration, DG dg, Receiver receiver, bool emitAtStart) { 151 this.duration = duration; 152 this.dg = dg; 153 this.receiver = receiver; 154 this.emitAtStart = emitAtStart; 155 } 156 void start() @trusted nothrow scope { 157 try { 158 if (emitAtStart) { 159 emitAtStart = false; 160 dg(); 161 if (receiver.getStopToken.isStopRequested) { 162 receiver.setDone(); 163 return; 164 } 165 } 166 load(); 167 } catch (Exception e) { 168 receiver.setError(e); 169 } 170 } 171 private void load() @trusted nothrow { 172 try { 173 op = receiver.getScheduler().scheduleAfter(duration).connect(ItemReceiver!(typeof(this))(&this)); 174 op.start(); 175 } catch (Exception e) { 176 receiver.setError(e); 177 } 178 } 179 } 180 static struct Sender { 181 alias Value = void; 182 Duration duration; 183 DG dg; 184 bool emitAtStart; 185 auto connect(Receiver)(return Receiver receiver) @safe return scope { 186 // ensure NRVO 187 auto op = Op!(Receiver)(duration, dg, receiver, emitAtStart); 188 return op; 189 } 190 } 191 static struct IntervalStream { 192 alias ElementType = void; 193 Duration duration; 194 bool emitAtStart = false; 195 auto collect(DG dg) @safe { 196 return Sender(duration, dg, emitAtStart); 197 } 198 } 199 return IntervalStream(duration, emitAtStart); 200 } 201 202 auto via(Stream, Sender)(Stream stream, Sender sender) if (models!(Sender, isSender) && models!(Stream, isStream)) { 203 alias Properties = StreamProperties!Stream; 204 alias DG = Properties.DG; 205 static struct ViaStreamOp(Receiver) { 206 import std.traits : ReturnType; 207 import concurrency.operations.via : senderVia = via; 208 alias Op = OpType!(ReturnType!(senderVia!(Properties.Sender, Sender)), Receiver); 209 Op op; 210 @disable this(ref return scope typeof(this) rhs); 211 @disable this(this); 212 this(Stream stream, Sender sender, DG dg, Receiver receiver) @trusted { 213 op = stream.collect(dg).senderVia(sender).connect(receiver); 214 } 215 void start() nothrow @safe { 216 op.start(); 217 } 218 } 219 return fromStreamOp!(Properties.ElementType, Properties.Value, ViaStreamOp)(stream, sender); 220 } 221 222 auto doneStream() { 223 alias DG = CollectDelegate!void; 224 static struct DoneStreamOp(Receiver) { 225 Receiver receiver; 226 @disable this(ref return scope typeof(this) rhs); 227 @disable this(this); 228 this(DG dg, Receiver receiver) { 229 this.receiver = receiver; 230 } 231 void start() nothrow @safe { 232 receiver.setDone(); 233 } 234 } 235 return fromStreamOp!(void, void, DoneStreamOp)(); 236 } 237 238 auto errorStream(Exception e) { 239 alias DG = CollectDelegate!void; 240 static struct ErrorStreamOp(Receiver) { 241 Exception e; 242 Receiver receiver; 243 @disable this(ref return scope typeof(this) rhs); 244 @disable this(this); 245 this(Exception e, DG dg, Receiver receiver) { 246 this.e = e; 247 this.receiver = receiver; 248 } 249 void start() nothrow @safe { 250 receiver.setError(e); 251 } 252 } 253 return fromStreamOp!(void, void, ErrorStreamOp)(e); 254 } 255 256 /// A SharedStream is used for broadcasting values to zero or more receivers. Receivers can be added and removed at any time. The stream itself never completes, so receivers should themselves terminate their connection. 257 auto sharedStream(T)() { 258 import concurrency.slist; 259 alias DG = CollectDelegate!(T); 260 return SharedStream!(T)(new shared SList!(SharedStream!(T).SubscriberDG)); 261 } 262 263 shared struct SharedStream(T) { 264 alias ElementType = T; 265 alias SubscriberDG = void delegate(T) nothrow @safe shared; 266 import concurrency.slist; 267 private { 268 alias DG = CollectDelegate!T; 269 static struct Op(Receiver) { 270 shared SharedStream!T source; 271 DG dg; 272 Receiver receiver; 273 StopCallback cb; 274 @disable this(ref return scope typeof(this) rhs); 275 @disable this(this); 276 void start() nothrow @trusted { 277 auto stopToken = receiver.getStopToken(); 278 cb = stopToken.onStop(&(cast(shared)this).onStop); 279 if (stopToken.isStopRequested) { 280 cb.dispose(); 281 receiver.setDone(); 282 } else { 283 source.add(&(cast(shared)this).onItem); 284 } 285 } 286 void onStop() nothrow @safe shared { 287 with(unshared) { 288 source.remove(&this.onItem); 289 receiver.setDone(); 290 } 291 } 292 void onItem(T element) nothrow @safe shared { 293 with(unshared) { 294 try { 295 dg(element); 296 } catch (Exception e) { 297 source.remove(&this.onItem); 298 cb.dispose(); 299 receiver.setError(e); 300 } 301 } 302 } 303 private auto ref unshared() nothrow @trusted shared { 304 return cast()this; 305 } 306 } 307 static struct SharedStreamSender { 308 alias Value = void; 309 shared SharedStream!T source; 310 DG dg; 311 auto connect(Receiver)(return Receiver receiver) @safe return scope { 312 // ensure NRVO 313 auto op = Op!(Receiver)(source, dg, receiver); 314 return op; 315 } 316 } 317 shared SList!SubscriberDG dgs; 318 } 319 this(shared SList!SubscriberDG dgs) { 320 this.dgs = dgs; 321 } 322 void emit(T t) nothrow @trusted { 323 foreach(dg; dgs[]) 324 dg(t); 325 } 326 private void remove(SubscriberDG dg) nothrow @trusted { 327 dgs.remove(dg); 328 } 329 private void add(SubscriberDG dg) nothrow @trusted { 330 dgs.pushBack(dg); 331 } 332 auto collect(DG dg) @safe { 333 return SharedStreamSender(this, dg); 334 } 335 }