1 module concurrency.stream; 2 3 import concurrency.stoptoken; 4 import concurrency.receiver; 5 public import concurrency.stream.stream; 6 public import concurrency.stream.filter; 7 public import concurrency.stream.take; 8 public import concurrency.stream.transform; 9 public import concurrency.stream.scan; 10 public import concurrency.stream.sample; 11 public import concurrency.stream.tolist; 12 public import concurrency.stream.slide; 13 public import concurrency.stream.throttling; 14 public import concurrency.stream.cycle; 15 public import concurrency.stream.flatmapconcat; 16 public import concurrency.stream.flatmaplatest; 17 import concurrency.sender : isSender, OpType; 18 import concepts; 19 import std.traits : hasFunctionAttributes; 20 21 /* 22 catch? 23 combineLatest 24 count 25 debounce 26 distinctUntilChanged 27 drop 28 dropWhile 29 filter 30 first 31 firstOrNull 32 flatMapConcat 33 flatMapLatest 34 flatMapMerge 35 fold 36 map 37 mapLatest 38 merge 39 onEach 40 onEmpty 41 onStart 42 onSubscription 43 reduce (fold with no seed) 44 retry 45 retryWhen 46 runningReduce 47 sample 48 scan (like runningReduce but with initial value) 49 take 50 takeWhile 51 toList 52 transform 53 transformLatest 54 zip 55 */ 56 57 /// Stream that emit the same value until cancelled 58 auto infiniteStream(T)(T t) { 59 alias DG = CollectDelegate!(T); 60 struct Loop { 61 T val; 62 void loop(StopToken)(DG emit, StopToken stopToken) { 63 while(!stopToken.isStopRequested) 64 emit(val); 65 } 66 } 67 return Loop(t).loopStream!T; 68 } 69 70 /// Stream that emits from start..end or until cancelled 71 auto iotaStream(T)(T start, T end) { 72 alias DG = CollectDelegate!(T); 73 struct Loop { 74 T b,e; 75 void loop(StopToken)(DG emit, StopToken stopToken) { 76 foreach(i; b..e) { 77 emit(i); 78 if (stopToken.isStopRequested) 79 break; 80 } 81 } 82 } 83 return Loop(start, end).loopStream!T; 84 } 85 86 /// Stream that emits each value from the array or until cancelled 87 auto arrayStream(T)(T[] arr) { 88 alias DG = CollectDelegate!(T); 89 struct Loop { 90 T[] arr; 91 void loop(StopToken)(DG emit, StopToken stopToken) @safe { 92 foreach(item; arr) { 93 emit(item); 94 if (stopToken.isStopRequested) 95 break; 96 } 97 } 98 } 99 return Loop(arr).loopStream!T; 100 } 101 102 import core.time : Duration; 103 104 auto intervalStream(Duration duration, bool emitAtStart = false) { 105 alias DG = CollectDelegate!(void); 106 static struct ItemReceiver(Op) { 107 Op* op; 108 void setValue() @safe { 109 if (op.receiver.getStopToken.isStopRequested) { 110 op.receiver.setDone(); 111 return; 112 } 113 try { 114 op.dg(); 115 if (op.receiver.getStopToken.isStopRequested) { 116 op.receiver.setDone(); 117 return; 118 } 119 op.load(); 120 } catch (Exception e) { 121 op.receiver.setError(e); 122 } 123 } 124 void setDone() @safe nothrow { 125 op.receiver.setDone(); 126 } 127 void setError(Throwable t) @safe nothrow { 128 op.receiver.setError(t); 129 } 130 auto getStopToken() @safe { 131 return op.receiver.getStopToken(); 132 } 133 auto getScheduler() @safe { 134 return op.receiver.getScheduler(); 135 } 136 } 137 static struct Op(Receiver) { 138 import std.traits : ReturnType; 139 Duration duration; 140 DG dg; 141 Receiver receiver; 142 alias SchedulerAfterSender = ReturnType!(SchedulerType!(Receiver).scheduleAfter); 143 alias Op = OpType!(SchedulerAfterSender, ItemReceiver!(typeof(this))); 144 Op op; 145 bool emitAtStart; 146 @disable this(this); 147 @disable this(ref return scope typeof(this) rhs); 148 this(Duration duration, DG dg, Receiver receiver, bool emitAtStart) { 149 this.duration = duration; 150 this.dg = dg; 151 this.receiver = receiver; 152 this.emitAtStart = emitAtStart; 153 } 154 void start() @trusted nothrow { 155 try { 156 if (emitAtStart) { 157 emitAtStart = false; 158 dg(); 159 if (receiver.getStopToken.isStopRequested) { 160 receiver.setDone(); 161 return; 162 } 163 } 164 load(); 165 } catch (Exception e) { 166 receiver.setError(e); 167 } 168 } 169 private void load() @trusted nothrow { 170 try { 171 op = receiver.getScheduler().scheduleAfter(duration).connect(ItemReceiver!(typeof(this))(&this)); 172 op.start(); 173 } catch (Exception e) { 174 receiver.setError(e); 175 } 176 } 177 } 178 static struct Sender { 179 alias Value = void; 180 Duration duration; 181 DG dg; 182 bool emitAtStart; 183 auto connect(Receiver)(return Receiver receiver) @safe scope return { 184 // ensure NRVO 185 auto op = Op!(Receiver)(duration, dg, receiver, emitAtStart); 186 return op; 187 } 188 } 189 static struct IntervalStream { 190 alias ElementType = void; 191 Duration duration; 192 bool emitAtStart = false; 193 auto collect(DG dg) @safe { 194 return Sender(duration, dg, emitAtStart); 195 } 196 } 197 return IntervalStream(duration, emitAtStart); 198 } 199 200 auto via(Stream, Sender)(Stream stream, Sender sender) if (models!(Sender, isSender) && models!(Stream, isStream)) { 201 alias Properties = StreamProperties!Stream; 202 alias DG = Properties.DG; 203 static struct ViaStreamOp(Receiver) { 204 import std.traits : ReturnType; 205 import concurrency.operations.via : senderVia = via; 206 alias Op = OpType!(ReturnType!(senderVia!(Properties.Sender, Sender)), Receiver); 207 Op op; 208 @disable this(ref return scope typeof(this) rhs); 209 @disable this(this); 210 this(Stream stream, Sender sender, DG dg, Receiver receiver) { 211 op = stream.collect(dg).senderVia(sender).connect(receiver); 212 } 213 void start() nothrow @safe { 214 op.start(); 215 } 216 } 217 return fromStreamOp!(Properties.ElementType, Properties.Value, ViaStreamOp)(stream, sender); 218 } 219 220 auto doneStream() { 221 alias DG = CollectDelegate!void; 222 static struct DoneStreamOp(Receiver) { 223 Receiver receiver; 224 @disable this(ref return scope typeof(this) rhs); 225 @disable this(this); 226 this(DG dg, Receiver receiver) { 227 this.receiver = receiver; 228 } 229 void start() nothrow @safe { 230 receiver.setDone(); 231 } 232 } 233 return fromStreamOp!(void, void, DoneStreamOp)(); 234 } 235 236 auto errorStream(Exception e) { 237 alias DG = CollectDelegate!void; 238 static struct ErrorStreamOp(Receiver) { 239 Exception e; 240 Receiver receiver; 241 @disable this(ref return scope typeof(this) rhs); 242 @disable this(this); 243 this(Exception e, DG dg, Receiver receiver) { 244 this.e = e; 245 this.receiver = receiver; 246 } 247 void start() nothrow @safe { 248 receiver.setError(e); 249 } 250 } 251 return fromStreamOp!(void, void, ErrorStreamOp)(e); 252 } 253 254 /// A SharedStream is used for broadcasting values to zero or more receivers. Receivers can be added and removed at any time. The stream itself never completes, so receivers should themselves terminate their connection. 255 auto sharedStream(T)() { 256 import concurrency.slist; 257 alias DG = CollectDelegate!(T); 258 return SharedStream!(T)(new shared SList!(SharedStream!(T).SubscriberDG)); 259 } 260 261 shared struct SharedStream(T) { 262 alias ElementType = T; 263 alias SubscriberDG = void delegate(T) nothrow @safe shared; 264 import concurrency.slist; 265 private { 266 alias DG = CollectDelegate!T; 267 static struct Op(Receiver) { 268 shared SharedStream!T source; 269 DG dg; 270 Receiver receiver; 271 StopCallback cb; 272 @disable this(ref return scope typeof(this) rhs); 273 @disable this(this); 274 void start() nothrow @trusted { 275 auto stopToken = receiver.getStopToken(); 276 cb = stopToken.onStop(&(cast(shared)this).onStop); 277 if (stopToken.isStopRequested) { 278 cb.dispose(); 279 receiver.setDone(); 280 } else { 281 source.add(&(cast(shared)this).onItem); 282 } 283 } 284 void onStop() nothrow @safe shared { 285 with(unshared) { 286 source.remove(&this.onItem); 287 receiver.setDone(); 288 } 289 } 290 void onItem(T element) nothrow @safe shared { 291 with(unshared) { 292 try { 293 dg(element); 294 } catch (Exception e) { 295 source.remove(&this.onItem); 296 cb.dispose(); 297 receiver.setError(e); 298 } 299 } 300 } 301 private auto ref unshared() nothrow @trusted shared { 302 return cast()this; 303 } 304 } 305 static struct SharedStreamSender { 306 alias Value = void; 307 shared SharedStream!T source; 308 DG dg; 309 auto connect(Receiver)(return Receiver receiver) @safe scope return { 310 // ensure NRVO 311 auto op = Op!(Receiver)(source, dg, receiver); 312 return op; 313 } 314 } 315 shared SList!SubscriberDG dgs; 316 } 317 this(shared SList!SubscriberDG dgs) { 318 this.dgs = dgs; 319 } 320 void emit(T t) nothrow @trusted { 321 foreach(dg; dgs[]) 322 dg(t); 323 } 324 private void remove(SubscriberDG dg) nothrow @trusted { 325 dgs.remove(dg); 326 } 327 private void add(SubscriberDG dg) nothrow @trusted { 328 dgs.pushBack(dg); 329 } 330 auto collect(DG dg) @safe { 331 return SharedStreamSender(this, dg); 332 } 333 }