1 module concurrency.stream; 2 3 import concurrency.stoptoken; 4 import concurrency.receiver; 5 public import concurrency.stream.stream; 6 public import concurrency.stream.filter; 7 public import concurrency.stream.take; 8 public import concurrency.stream.transform; 9 public import concurrency.stream.scan; 10 public import concurrency.stream.sample; 11 public import concurrency.stream.tolist; 12 public import concurrency.stream.slide; 13 public import concurrency.stream.throttling; 14 public import concurrency.stream.cycle; 15 import concurrency.sender : isSender, OpType; 16 import concepts; 17 import std.traits : hasFunctionAttributes; 18 19 /* 20 catch? 21 combineLatest 22 count 23 debounce 24 distinctUntilChanged 25 drop 26 dropWhile 27 filter 28 first 29 firstOrNull 30 flatMapConcat 31 flatMapLatest 32 flatMapMerge 33 fold 34 map 35 mapLatest 36 merge 37 onEach 38 onEmpty 39 onStart 40 onSubscription 41 reduce (fold with no seed) 42 retry 43 retryWhen 44 runningReduce 45 sample 46 scan (like runningReduce but with initial value) 47 take 48 takeWhile 49 toList 50 transform 51 transformLatest 52 zip 53 */ 54 55 /// Stream that emit the same value until cancelled 56 auto infiniteStream(T)(T t) { 57 alias DG = CollectDelegate!(T); 58 struct Loop { 59 T val; 60 void loop(StopToken)(DG emit, StopToken stopToken) { 61 while(!stopToken.isStopRequested) 62 emit(val); 63 } 64 } 65 return Loop(t).loopStream!T; 66 } 67 68 /// Stream that emits from start..end or until cancelled 69 auto iotaStream(T)(T start, T end) { 70 alias DG = CollectDelegate!(T); 71 struct Loop { 72 T b,e; 73 void loop(StopToken)(DG emit, StopToken stopToken) { 74 foreach(i; b..e) { 75 emit(i); 76 if (stopToken.isStopRequested) 77 break; 78 } 79 } 80 } 81 return Loop(start, end).loopStream!T; 82 } 83 84 /// Stream that emits each value from the array or until cancelled 85 auto arrayStream(T)(T[] arr) { 86 alias DG = CollectDelegate!(T); 87 struct Loop { 88 T[] arr; 89 void loop(StopToken)(DG emit, StopToken stopToken) @safe { 90 foreach(item; arr) { 91 emit(item); 92 if (stopToken.isStopRequested) 93 break; 94 } 95 } 96 } 97 return Loop(arr).loopStream!T; 98 } 99 100 import core.time : Duration; 101 102 auto intervalStream(Duration duration, bool emitAtStart = false) { 103 alias DG = CollectDelegate!(void); 104 static struct ItemReceiver(Op) { 105 Op* op; 106 void setValue() @safe { 107 if (op.receiver.getStopToken.isStopRequested) { 108 op.receiver.setDone(); 109 return; 110 } 111 try { 112 op.dg(); 113 if (op.receiver.getStopToken.isStopRequested) { 114 op.receiver.setDone(); 115 return; 116 } 117 op.load(); 118 } catch (Exception e) { 119 op.receiver.setError(e); 120 } 121 } 122 void setDone() @safe nothrow { 123 op.receiver.setDone(); 124 } 125 void setError(Throwable t) @safe nothrow { 126 op.receiver.setError(t); 127 } 128 auto getStopToken() @safe { 129 return op.receiver.getStopToken(); 130 } 131 auto getScheduler() @safe { 132 return op.receiver.getScheduler(); 133 } 134 } 135 static struct Op(Receiver) { 136 import std.traits : ReturnType; 137 Duration duration; 138 DG dg; 139 Receiver receiver; 140 alias SchedulerAfterSender = ReturnType!(SchedulerType!(Receiver).scheduleAfter); 141 alias Op = OpType!(SchedulerAfterSender, ItemReceiver!(typeof(this))); 142 Op op; 143 bool emitAtStart; 144 @disable this(this); 145 @disable this(ref return scope typeof(this) rhs); 146 this(Duration duration, DG dg, Receiver receiver, bool emitAtStart) { 147 this.duration = duration; 148 this.dg = dg; 149 this.receiver = receiver; 150 this.emitAtStart = emitAtStart; 151 } 152 void start() @trusted nothrow { 153 try { 154 if (emitAtStart) { 155 emitAtStart = false; 156 dg(); 157 if (receiver.getStopToken.isStopRequested) { 158 receiver.setDone(); 159 return; 160 } 161 } 162 load(); 163 } catch (Exception e) { 164 receiver.setError(e); 165 } 166 } 167 private void load() @trusted nothrow { 168 try { 169 op = receiver.getScheduler().scheduleAfter(duration).connect(ItemReceiver!(typeof(this))(&this)); 170 op.start(); 171 } catch (Exception e) { 172 receiver.setError(e); 173 } 174 } 175 } 176 static struct Sender { 177 alias Value = void; 178 Duration duration; 179 DG dg; 180 bool emitAtStart; 181 auto connect(Receiver)(return Receiver receiver) @safe scope return { 182 // ensure NRVO 183 auto op = Op!(Receiver)(duration, dg, receiver, emitAtStart); 184 return op; 185 } 186 } 187 static struct IntervalStream { 188 alias ElementType = void; 189 Duration duration; 190 bool emitAtStart = false; 191 auto collect(DG dg) @safe { 192 return Sender(duration, dg, emitAtStart); 193 } 194 } 195 return IntervalStream(duration, emitAtStart); 196 } 197 198 auto via(Stream, Sender)(Stream stream, Sender sender) if (models!(Sender, isSender) && models!(Stream, isStream)) { 199 alias Properties = StreamProperties!Stream; 200 alias DG = Properties.DG; 201 static struct ViaStreamOp(Receiver) { 202 import std.traits : ReturnType; 203 import concurrency.operations.via : senderVia = via; 204 alias Op = OpType!(ReturnType!(senderVia!(Properties.Sender, Sender)), Receiver); 205 Op op; 206 @disable this(ref return scope typeof(this) rhs); 207 @disable this(this); 208 this(Stream stream, Sender sender, DG dg, Receiver receiver) { 209 op = stream.collect(dg).senderVia(sender).connect(receiver); 210 } 211 void start() nothrow @safe { 212 op.start(); 213 } 214 } 215 return fromStreamOp!(Properties.ElementType, Properties.Value, ViaStreamOp)(stream, sender); 216 } 217 218 auto doneStream() { 219 alias DG = CollectDelegate!void; 220 static struct DoneStreamOp(Receiver) { 221 Receiver receiver; 222 @disable this(ref return scope typeof(this) rhs); 223 @disable this(this); 224 this(DG dg, Receiver receiver) { 225 this.receiver = receiver; 226 } 227 void start() nothrow @safe { 228 receiver.setDone(); 229 } 230 } 231 return fromStreamOp!(void, void, DoneStreamOp)(); 232 } 233 234 auto errorStream(Exception e) { 235 alias DG = CollectDelegate!void; 236 static struct ErrorStreamOp(Receiver) { 237 Exception e; 238 Receiver receiver; 239 @disable this(ref return scope typeof(this) rhs); 240 @disable this(this); 241 this(Exception e, DG dg, Receiver receiver) { 242 this.e = e; 243 this.receiver = receiver; 244 } 245 void start() nothrow @safe { 246 receiver.setError(e); 247 } 248 } 249 return fromStreamOp!(void, void, ErrorStreamOp)(e); 250 } 251 252 /// A SharedStream is used for broadcasting values to zero or more receivers. Receivers can be added and removed at any time. The stream itself never completes, so receivers should themselves terminate their connection. 253 auto sharedStream(T)() { 254 import concurrency.slist; 255 alias DG = CollectDelegate!(T); 256 return SharedStream!(T)(new shared SList!(SharedStream!(T).SubscriberDG)); 257 } 258 259 shared struct SharedStream(T) { 260 alias ElementType = T; 261 alias SubscriberDG = void delegate(T) nothrow @safe shared; 262 import concurrency.slist; 263 private { 264 alias DG = CollectDelegate!T; 265 static struct Op(Receiver) { 266 shared SharedStream!T source; 267 DG dg; 268 Receiver receiver; 269 StopCallback cb; 270 @disable this(ref return scope typeof(this) rhs); 271 @disable this(this); 272 void start() nothrow @trusted { 273 auto stopToken = receiver.getStopToken(); 274 cb = stopToken.onStop(&(cast(shared)this).onStop); 275 if (stopToken.isStopRequested) { 276 cb.dispose(); 277 receiver.setDone(); 278 } else { 279 source.add(&(cast(shared)this).onItem); 280 } 281 } 282 void onStop() nothrow @safe shared { 283 with(unshared) { 284 source.remove(&this.onItem); 285 receiver.setDone(); 286 } 287 } 288 void onItem(T element) nothrow @safe shared { 289 with(unshared) { 290 try { 291 dg(element); 292 } catch (Exception e) { 293 source.remove(&this.onItem); 294 cb.dispose(); 295 receiver.setError(e); 296 } 297 } 298 } 299 private auto ref unshared() nothrow @trusted shared { 300 return cast()this; 301 } 302 } 303 static struct SharedStreamSender { 304 alias Value = void; 305 shared SharedStream!T source; 306 DG dg; 307 auto connect(Receiver)(return Receiver receiver) @safe scope return { 308 // ensure NRVO 309 auto op = Op!(Receiver)(source, dg, receiver); 310 return op; 311 } 312 } 313 shared SList!SubscriberDG dgs; 314 } 315 this(shared SList!SubscriberDG dgs) { 316 this.dgs = dgs; 317 } 318 void emit(T t) nothrow @trusted { 319 foreach(dg; dgs[]) 320 dg(t); 321 } 322 private void remove(SubscriberDG dg) nothrow @trusted { 323 dgs.remove(dg); 324 } 325 private void add(SubscriberDG dg) nothrow @trusted { 326 dgs.pushBack(dg); 327 } 328 auto collect(DG dg) @safe { 329 return SharedStreamSender(this, dg); 330 } 331 }