1 /** 2 * Queues. 3 * 4 * Copyright: © 2015-2016 Dragos Carp 5 * License: Boost Software License - Version 1.0 6 * Authors: Dragos Carp 7 */ 8 module asynchronous.queues; 9 10 import std.algorithm; 11 import std.exception; 12 import std.range; 13 import std.typecons; 14 import asynchronous.events : EventLoop, getEventLoop; 15 import asynchronous.futures : Waiter; 16 import asynchronous.locks : Event; 17 import asynchronous.tasks : waitFor; 18 import asynchronous.types : Coroutine; 19 20 /** 21 * Exception thrown when $(D_PSYMBOL Queue.getNowait()) is called on a 22 * $(D_PSYMBOL Queue) object which is empty. 23 */ 24 class QueueEmptyException : Exception 25 { 26 this(string message = null, string file = __FILE__, size_t line = __LINE__, 27 Throwable next = null) @safe pure nothrow 28 { 29 super(message, file, line, next); 30 } 31 } 32 33 /** 34 * Exception thrown when $(D_PSYMBOL Queue.putNowait()) is called on a 35 * $(D_PSYMBOL Queue) object which is full. 36 */ 37 class QueueFullException : Exception 38 { 39 this(string message = null, string file = __FILE__, size_t line = __LINE__, 40 Throwable next = null) @safe pure nothrow 41 { 42 super(message, file, line, next); 43 } 44 } 45 46 /** 47 * A queue, useful for coordinating producer and consumer coroutines. 48 * 49 * If $(D_PSYMBOL maxSize) is equal to zero, the queue size is infinite. 50 * Otherwise $(D_PSYMBOL put()) will block when the queue reaches maxsize, until 51 * an item is removed by $(D_PSYMBOL get()). 52 * 53 * You can reliably know this $(D_PSYMBOL Queue)'s size with $(D_PSYMBOL 54 * qsize()), since your single-threaded asynchronous application won't be 55 * interrupted between calling $(D_PSYMBOL qsize()) and doing an operation on 56 * the Queue. 57 * 58 * This class is not thread safe. 59 */ 60 class Queue(T, size_t maxSize = 0) 61 { 62 private EventLoop eventLoop; 63 private Waiter[] getters; 64 private Waiter[] putters; 65 private size_t unfinishedTasks = 0; 66 private Event finished; 67 private T[] queue; 68 private size_t start = 0; 69 private size_t length = 0; 70 71 this(EventLoop eventLoop = null) 72 { 73 if (eventLoop is null) 74 this.eventLoop = getEventLoop; 75 else 76 this.eventLoop = eventLoop; 77 78 this.finished = new Event(this.eventLoop); 79 this.finished.set; 80 } 81 82 override string toString() const 83 { 84 import std.format : format; 85 86 auto data = chain(cast(T[]) queue, cast(T[]) queue)[start .. start + length]; 87 88 return "%s(maxsize %s, queue %s, getters %s, putters %s, unfinishedTasks %s)" 89 .format(typeid(this), maxSize, data, getters, putters, unfinishedTasks); 90 } 91 92 protected T get_() 93 { 94 auto result = queue[start]; 95 queue[start] = T.init; 96 ++start; 97 if (start == queue.length) 98 start = 0; 99 --length; 100 return result; 101 } 102 103 protected void put_(T item) 104 { 105 queue[(start + length) % $] = item; 106 ++length; 107 } 108 109 private void ensureCapacity() 110 { 111 if (length < queue.length) 112 return; 113 114 static if (maxSize > 0) 115 assert(length < maxSize); 116 assert(length == queue.length); 117 118 size_t newLength = max(8, length * 2); 119 120 static if (maxSize > 0) 121 newLength = min(newLength, maxSize); 122 123 bringToFront(queue[0 .. start], queue[start .. $]); 124 start = 0; 125 126 queue.length = newLength; 127 128 static if (maxSize == 0) 129 queue.length = queue.capacity; 130 } 131 132 private void wakeupNext(ref Waiter[] waiters) 133 { 134 waiters = waiters.remove!(a => a.done); 135 136 if (!waiters.empty) 137 waiters[0].setResult; 138 } 139 140 private void consumeDonePutters() 141 { 142 // Delete waiters at the head of the put() queue who've timed out. 143 putters = putters.find!(g => !g.done); 144 } 145 146 /** 147 * Return $(D_KEYWORD true) if the queue is empty, $(D_KEYWORD false) 148 * otherwise. 149 */ 150 @property bool empty() 151 { 152 return length == 0; 153 } 154 155 /** 156 * Return $(D_KEYWORD true) if there are maxsize items in the queue. 157 * 158 * Note: if the Queue was initialized with $(D_PSYMBOL maxsize) = 0 (the 159 * default), then $(D_PSYMBOL full()) is never $(D_KEYWORD true). 160 */ 161 @property bool full() 162 { 163 static if (maxSize == 0) 164 return false; 165 else 166 return qsize >= maxSize; 167 } 168 169 /** 170 * Remove and return an item from the queue. 171 * 172 * If queue is empty, wait until an item is available. 173 */ 174 @Coroutine 175 T get() 176 { 177 while (empty) 178 { 179 auto waiter = new Waiter(eventLoop); 180 181 getters ~= waiter; 182 eventLoop.waitFor(waiter); 183 } 184 185 return getNowait; 186 } 187 188 /** 189 * Remove and return an item from the queue. 190 * 191 * Return an item if one is immediately available, else throw $(D_PSYMBOL 192 * QueueEmptyException). 193 */ 194 @Coroutine 195 T getNowait() 196 { 197 enforceEx!QueueEmptyException(length > 0); 198 199 T item = get_; 200 201 wakeupNext(putters); 202 203 return item; 204 } 205 206 /** 207 * Block until all items in the queue have been gotten and processed. 208 * 209 * The count of unfinished tasks goes up whenever an item is added to the 210 * queue. The count goes down whenever a consumer calls $(D_PSYMBOL 211 * taskDone()) to indicate that the item was retrieved and all work on it is 212 * complete. 213 * When the count of unfinished tasks drops to zero, $(D_PSYMBOL join()) 214 * unblocks. 215 */ 216 @Coroutine 217 void join() 218 { 219 if (unfinishedTasks > 0) 220 finished.wait; 221 } 222 223 /** 224 * Put an item into the queue. 225 * 226 * If the queue is full, wait until a free slot is available before adding 227 * item. 228 */ 229 @Coroutine 230 void put(T item) 231 { 232 static if (maxSize > 0) 233 { 234 while (full) 235 { 236 auto waiter = new Waiter(eventLoop); 237 238 putters ~= waiter; 239 eventLoop.waitFor(waiter); 240 } 241 } 242 243 putNowait(item); 244 } 245 246 /** 247 * Put an item into the queue without blocking. 248 * 249 * If no free slot is immediately available, throw $(D_PSYMBOL 250 * QueueFullException). 251 */ 252 void putNowait(T item) 253 { 254 static if (maxSize > 0) 255 enforceEx!QueueFullException(qsize < maxSize); 256 257 ensureCapacity; 258 put_(item); 259 ++unfinishedTasks; 260 finished.clear; 261 262 wakeupNext(getters); 263 } 264 265 /** 266 * Number of items in the queue. 267 */ 268 @property size_t qsize() 269 { 270 return length; 271 } 272 273 /** 274 * Indicate that a formerly enqueued task is complete. 275 * 276 * Used by queue consumers. For each $(D_PSYMBOL get()) used to fetch a 277 * task, a subsequent call to $(D_PSYMBOL taskDone()) tells the queue that 278 * the processing on the task is complete. 279 * 280 * If a $(D_PSYMBOL join()) is currently blocking, it will resume when all 281 * items have been processed (meaning that a $(D_PSYMBOL taskDone()) call 282 * was received for every item that had been $(D_PSYMBOL put()) into the 283 * queue). 284 * 285 * Throws $(D_PSYMBOL Exception) if called more times than there were items 286 * placed in the queue. 287 */ 288 void taskDone() 289 { 290 enforce(unfinishedTasks > 0, "taskDone() called too many times"); 291 292 --unfinishedTasks; 293 if (unfinishedTasks == 0) 294 finished.set; 295 } 296 297 /** 298 * Number of items allowed in the queue. 299 */ 300 @property size_t maxsize() 301 { 302 return maxSize; 303 } 304 } 305 306 unittest 307 { 308 auto queue = new Queue!int; 309 310 foreach (i; iota(200)) 311 queue.putNowait(i); 312 313 foreach (i; iota(200)) 314 assert(queue.getNowait == i); 315 } 316 317 unittest 318 { 319 auto queue = new Queue!(int, 10); 320 321 foreach (i; iota(10)) 322 queue.putNowait(i); 323 324 assertThrown!QueueFullException(queue.putNowait(11)); 325 } 326 327 /** 328 * A subclass of $(D_PSYMBOL Queue); retrieves entries in priority order 329 * (largest first). 330 * 331 * Entries are typically tuples of the form: (priority number, data). 332 */ 333 class PriorityQueue(T, size_t maxSize = 0, alias less = "a < b") : 334 Queue!(T, maxSize) 335 { 336 import std.container : BinaryHeap; 337 338 private BinaryHeap!(T[], less) binaryHeap; 339 340 this(EventLoop eventLoop = null) 341 { 342 super(eventLoop); 343 binaryHeap.acquire(queue, length); 344 } 345 346 protected override T get_() 347 { 348 --length; 349 auto result = binaryHeap.front; 350 binaryHeap.removeFront; 351 return result; 352 } 353 354 protected override void put_(T item) 355 { 356 // underlying store is reallocated 357 if (binaryHeap.capacity != queue.length) 358 binaryHeap.assume(queue, length); 359 360 assert(binaryHeap.capacity == queue.length); 361 362 binaryHeap.insert(item); 363 ++length; 364 } 365 } 366 367 unittest 368 { 369 auto queue = new PriorityQueue!int; 370 371 foreach (i; iota(200)) 372 queue.putNowait(i); 373 374 foreach (i; iota(199, -1, -1)) 375 assert(queue.getNowait == i); 376 } 377 378 /** 379 * A subclass of $(D_PSYMBOL Queue) that retrieves most recently added entries 380 * first. 381 */ 382 class LifoQueue(T, size_t maxSize = 0) : Queue!(T, maxSize) 383 { 384 this(EventLoop eventLoop = null) 385 { 386 super(eventLoop); 387 } 388 389 protected override T get_() 390 { 391 auto result = queue[--length]; 392 queue[length] = T.init; 393 return result; 394 } 395 396 protected override void put_(T item) 397 { 398 queue[length++] = item; 399 } 400 } 401 402 unittest 403 { 404 auto queue = new LifoQueue!int; 405 406 foreach (i; iota(200)) 407 queue.putNowait(i); 408 409 foreach (i; iota(199, -1, -1)) 410 assert(queue.getNowait == i); 411 }