1 /** 2 * Queues. 3 */ 4 module asynchronous.queues; 5 6 import std.algorithm; 7 import std.exception; 8 import std.range; 9 import std.typecons; 10 import asynchronous.events : EventLoop, getEventLoop; 11 import asynchronous.futures : Waiter; 12 import asynchronous.locks : Event; 13 import asynchronous.tasks : waitFor; 14 import asynchronous.types : Coroutine; 15 16 /** 17 * Exception thrown when $(D_PSYMBOL Queue.getNowait()) is called on a 18 * $(D_PSYMBOL Queue) object which is empty. 19 */ 20 class QueueEmptyException : Exception 21 { 22 this(string message = null, string file = __FILE__, size_t line = __LINE__, 23 Throwable next = null) @safe pure nothrow 24 { 25 super(message, file, line, next); 26 } 27 } 28 29 /** 30 * Exception thrown when $(D_PSYMBOL Queue.putNowait()) is called on a 31 * $(D_PSYMBOL Queue) object which is full. 32 */ 33 class QueueFullException : Exception 34 { 35 this(string message = null, string file = __FILE__, size_t line = __LINE__, 36 Throwable next = null) @safe pure nothrow 37 { 38 super(message, file, line, next); 39 } 40 } 41 42 /** 43 * A queue, useful for coordinating producer and consumer coroutines. 44 * 45 * If $(D_PSYMBOL maxSize) is equal to zero, the queue size is infinite. 46 * Otherwise $(D_PSYMBOL put()) will block when the queue reaches maxsize, until 47 * an item is removed by $(D_PSYMBOL get()). 48 * 49 * You can reliably know this $(D_PSYMBOL Queue)'s size with $(D_PSYMBOL 50 * qsize()), since your single-threaded asynchronous application won't be 51 * interrupted between calling $(D_PSYMBOL qsize()) and doing an operation on 52 * the Queue. 53 * 54 * This class is not thread safe. 55 */ 56 class Queue(T, size_t maxSize = 0) 57 { 58 private EventLoop eventLoop; 59 private Waiter[] getters; 60 private Waiter[] putters; 61 private size_t unfinishedTasks = 0; 62 private Event finished; 63 private T[] queue; 64 private size_t start = 0; 65 private size_t length = 0; 66 67 this(EventLoop eventLoop = null) 68 { 69 if (eventLoop is null) 70 this.eventLoop = getEventLoop; 71 else 72 this.eventLoop = eventLoop; 73 74 this.finished = new Event(this.eventLoop); 75 this.finished.set; 76 } 77 78 override string toString() const 79 { 80 import std.format : format; 81 82 import std.format : format; 83 84 auto data = chain(cast(T[]) queue, cast(T[]) queue)[start .. start + length]; 85 86 return "%s(maxsize %s, queue %s, getters %s, putters %s, unfinishedTasks %s)" 87 .format(typeid(this), maxSize, data, getters, putters, unfinishedTasks); 88 } 89 90 protected T get_() 91 { 92 auto result = queue[start]; 93 queue[start] = T.init; 94 ++start; 95 if (start == queue.length) 96 start = 0; 97 --length; 98 return result; 99 } 100 101 protected void put_(T item) 102 { 103 queue[(start + length) % $] = item; 104 ++length; 105 } 106 107 private void ensureCapacity() 108 { 109 if (length < queue.length) 110 return; 111 112 static if (maxSize > 0) 113 assert(length < maxSize); 114 assert(length == queue.length); 115 116 size_t newLength = max(8, length * 2); 117 118 static if (maxSize > 0) 119 newLength = min(newLength, maxSize); 120 121 bringToFront(queue[0 .. start], queue[start .. $]); 122 start = 0; 123 124 queue.length = newLength; 125 126 static if (maxSize == 0) 127 queue.length = queue.capacity; 128 } 129 130 private void wakeupNext(ref Waiter[] waiters) 131 { 132 waiters = waiters.remove!(a => a.done); 133 134 if (!waiters.empty) 135 waiters[0].setResult; 136 } 137 138 private void consumeDonePutters() 139 { 140 // Delete waiters at the head of the put() queue who've timed out. 141 putters = putters.find!(g => !g.done); 142 } 143 144 /** 145 * Return $(D_KEYWORD true) if the queue is empty, $(D_KEYWORD false) 146 * otherwise. 147 */ 148 @property bool empty() 149 { 150 return length == 0; 151 } 152 153 /** 154 * Return $(D_KEYWORD true) if there are maxsize items in the queue. 155 * 156 * Note: if the Queue was initialized with $(D_PSYMBOL maxsize) = 0 (the 157 * default), then $(D_PSYMBOL full()) is never $(D_KEYWORD true). 158 */ 159 @property bool full() 160 { 161 static if (maxSize == 0) 162 return false; 163 else 164 return qsize >= maxSize; 165 } 166 167 /** 168 * Remove and return an item from the queue. 169 * 170 * If queue is empty, wait until an item is available. 171 */ 172 @Coroutine 173 T get() 174 { 175 while (empty) 176 { 177 auto waiter = new Waiter(eventLoop); 178 179 getters ~= waiter; 180 eventLoop.waitFor(waiter); 181 } 182 183 return getNowait; 184 } 185 186 /** 187 * Remove and return an item from the queue. 188 * 189 * Return an item if one is immediately available, else throw $(D_PSYMBOL 190 * QueueEmptyException). 191 */ 192 @Coroutine 193 T getNowait() 194 { 195 enforceEx!QueueEmptyException(length > 0); 196 197 T item = get_; 198 199 wakeupNext(putters); 200 201 return item; 202 } 203 204 /** 205 * Block until all items in the queue have been gotten and processed. 206 * 207 * The count of unfinished tasks goes up whenever an item is added to the 208 * queue. The count goes down whenever a consumer calls $(D_PSYMBOL 209 * taskDone()) to indicate that the item was retrieved and all work on it is 210 * complete. 211 * When the count of unfinished tasks drops to zero, $(D_PSYMBOL join()) 212 * unblocks. 213 */ 214 @Coroutine 215 void join() 216 { 217 if (unfinishedTasks > 0) 218 finished.wait; 219 } 220 221 /** 222 * Put an item into the queue. 223 * 224 * If the queue is full, wait until a free slot is available before adding 225 * item. 226 */ 227 @Coroutine 228 void put(T item) 229 { 230 static if (maxSize > 0) 231 { 232 while (full) 233 { 234 auto waiter = new Waiter(eventLoop); 235 236 putters ~= waiter; 237 eventLoop.waitFor(waiter); 238 } 239 } 240 241 putNowait(item); 242 } 243 244 /** 245 * Put an item into the queue without blocking. 246 * 247 * If no free slot is immediately available, throw $(D_PSYMBOL 248 * QueueFullException). 249 */ 250 void putNowait(T item) 251 { 252 static if (maxSize > 0) 253 enforceEx!QueueFullException(qsize < maxSize); 254 255 ensureCapacity; 256 put_(item); 257 ++unfinishedTasks; 258 finished.clear; 259 260 wakeupNext(getters); 261 } 262 263 /** 264 * Number of items in the queue. 265 */ 266 @property size_t qsize() 267 { 268 return length; 269 } 270 271 /** 272 * Indicate that a formerly enqueued task is complete. 273 * 274 * Used by queue consumers. For each $(D_PSYMBOL get()) used to fetch a 275 * task, a subsequent call to $(D_PSYMBOL taskDone()) tells the queue that 276 * the processing on the task is complete. 277 * 278 * If a $(D_PSYMBOL join()) is currently blocking, it will resume when all 279 * items have been processed (meaning that a $(D_PSYMBOL taskDone()) call 280 * was received for every item that had been $(D_PSYMBOL put()) into the 281 * queue). 282 * 283 * Throws $(D_PSYMBOL Exception) if called more times than there were items 284 * placed in the queue. 285 */ 286 void taskDone() 287 { 288 enforce(unfinishedTasks > 0, "taskDone() called too many times"); 289 290 --unfinishedTasks; 291 if (unfinishedTasks == 0) 292 finished.set; 293 } 294 295 /** 296 * Number of items allowed in the queue. 297 */ 298 @property size_t maxsize() 299 { 300 return maxSize; 301 } 302 } 303 304 unittest 305 { 306 auto queue = new Queue!int; 307 308 foreach (i; iota(200)) 309 queue.putNowait(i); 310 311 foreach (i; iota(200)) 312 assert(queue.getNowait == i); 313 } 314 315 unittest 316 { 317 auto queue = new Queue!(int, 10); 318 319 foreach (i; iota(10)) 320 queue.putNowait(i); 321 322 assertThrown!QueueFullException(queue.putNowait(11)); 323 } 324 325 /** 326 * A subclass of $(D_PSYMBOL Queue); retrieves entries in priority order 327 * (largest first). 328 * 329 * Entries are typically tuples of the form: (priority number, data). 330 */ 331 class PriorityQueue(T, size_t maxSize = 0, alias less = "a < b") : 332 Queue!(T, maxSize) 333 { 334 import std.container : BinaryHeap; 335 336 private BinaryHeap!(T[], less) binaryHeap; 337 338 this(EventLoop eventLoop = null) 339 { 340 super(eventLoop); 341 binaryHeap.acquire(queue, length); 342 } 343 344 protected override T get_() 345 { 346 --length; 347 auto result = binaryHeap.front; 348 binaryHeap.removeFront; 349 return result; 350 } 351 352 protected override void put_(T item) 353 { 354 // underlying store is reallocated 355 if (binaryHeap.capacity != queue.length) 356 binaryHeap.assume(queue, length); 357 358 assert(binaryHeap.capacity == queue.length); 359 360 binaryHeap.insert(item); 361 ++length; 362 } 363 } 364 365 unittest 366 { 367 auto queue = new PriorityQueue!int; 368 369 foreach (i; iota(200)) 370 queue.putNowait(i); 371 372 foreach (i; iota(199, -1, -1)) 373 assert(queue.getNowait == i); 374 } 375 376 /** 377 * A subclass of $(D_PSYMBOL Queue) that retrieves most recently added entries 378 * first. 379 */ 380 class LifoQueue(T, size_t maxSize = 0) : Queue!(T, maxSize) 381 { 382 this(EventLoop eventLoop = null) 383 { 384 super(eventLoop); 385 } 386 387 protected override T get_() 388 { 389 auto result = queue[--length]; 390 queue[length] = T.init; 391 return result; 392 } 393 394 protected override void put_(T item) 395 { 396 queue[length++] = item; 397 } 398 } 399 400 unittest 401 { 402 auto queue = new LifoQueue!int; 403 404 foreach (i; iota(200)) 405 queue.putNowait(i); 406 407 foreach (i; iota(199, -1, -1)) 408 assert(queue.getNowait == i); 409 }