1 module asynchronous.libasync.events; 2 3 import std.algorithm; 4 import std.array; 5 import std.datetime; 6 import std.process; 7 import std.socket; 8 import std.string; 9 import std.traits; 10 import std.typecons; 11 import libasync.events : EventLoop_ = EventLoop, NetworkAddress; 12 import libasync.timer : AsyncTimer; 13 import libasync.tcp : AsyncTCPConnection, AsyncTCPListener, TCPEvent; 14 import libasync.threads : destroyAsyncThreads, gs_threads; 15 import libasync.udp : AsyncUDPSocket, UDPEvent; 16 import asynchronous.events; 17 import asynchronous.futures; 18 import asynchronous.protocols; 19 import asynchronous.tasks; 20 import asynchronous.transports; 21 import asynchronous.types; 22 23 alias Protocol = asynchronous.protocols.Protocol; 24 25 shared static ~this() 26 { 27 destroyAsyncThreads; 28 } 29 30 package class LibasyncEventLoop : EventLoop 31 { 32 private EventLoop_ eventLoop; 33 34 alias Timers = ResourcePool!(AsyncTimer, EventLoop_); 35 private Timers timers; 36 37 private Appender!(CallbackHandle[]) nextCallbacks1; 38 private Appender!(CallbackHandle[]) nextCallbacks2; 39 private Appender!(CallbackHandle[])* currentAppender; 40 41 private LibasyncTransport[] pendingConnections; 42 43 alias Listener = Tuple!(ServerImpl, "server", AsyncTCPListener, "listener"); 44 45 private Listener[] activeListeners; 46 47 this() 48 { 49 this.eventLoop = new EventLoop_; 50 this.timers = new Timers(this.eventLoop); 51 this.currentAppender = &nextCallbacks1; 52 } 53 54 override void runForever() 55 { 56 enforce(this.state == State.STOPPED, 57 "Unexpected event loop state %s".format(this.state)); 58 59 this.state = State.RUNNING; 60 61 while (true) 62 { 63 final switch (this.state) 64 { 65 case State.STOPPED: 66 case State.STOPPING: 67 this.state = State.STOPPED; 68 return; 69 case State.RUNNING: 70 auto callbacks = this.currentAppender; 71 72 if (this.currentAppender == &this.nextCallbacks1) 73 this.currentAppender = &this.nextCallbacks2; 74 else 75 this.currentAppender = &this.nextCallbacks1; 76 77 assert(this.currentAppender.data.empty); 78 79 if (callbacks.data.empty) 80 { 81 this.eventLoop.loop(-1.msecs); 82 } 83 else 84 { 85 foreach (callback; callbacks.data.filter!(a => !a.cancelled)) 86 callback(); 87 88 callbacks.clear; 89 } 90 break; 91 case State.CLOSED: 92 throw new Exception("Event loop closed while running."); 93 } 94 } 95 } 96 97 override void scheduleCallback(CallbackHandle callback) 98 { 99 if (!callback.cancelled) 100 currentAppender.put(callback); 101 } 102 103 override void scheduleCallback(Duration delay, CallbackHandle callback) 104 { 105 if (delay <= Duration.zero) 106 { 107 scheduleCallback(callback); 108 return; 109 } 110 111 auto timer = this.timers.acquire(); 112 timer.duration(delay).run({ 113 scheduleCallback(callback); 114 this.timers.release(timer); 115 }); 116 } 117 118 override void scheduleCallback(SysTime when, CallbackHandle callback) 119 { 120 Duration duration = when - time; 121 122 scheduleCallback(duration, callback); 123 } 124 125 @Coroutine 126 override void socketConnect(Socket socket, Address address) 127 { 128 auto asyncTCPConnection = new AsyncTCPConnection(this.eventLoop, 129 socket.handle); 130 NetworkAddress peerAddress; 131 132 (cast(byte*) peerAddress.sockAddr)[0 .. address.nameLen] = 133 (cast(byte*) address.name)[0 .. address.nameLen]; 134 asyncTCPConnection.peer = peerAddress; 135 136 auto connection = new LibasyncTransport(this, socket, 137 asyncTCPConnection); 138 139 asyncTCPConnection.run(&connection.handleTCPEvent); 140 141 this.pendingConnections ~= connection; 142 } 143 144 override Transport makeSocketTransport(Socket socket, Protocol protocol, 145 Waiter waiter) 146 { 147 auto index = this.pendingConnections.countUntil!( 148 a => a.socket == socket); 149 150 enforce(index >= 0, "Internal error"); 151 152 auto transport = this.pendingConnections[index]; 153 154 if (this.pendingConnections.length > 1) 155 { 156 this.pendingConnections[index] = this.pendingConnections[$ - 1]; 157 } 158 this.pendingConnections.length -= 1; 159 160 transport.setProtocol(protocol, waiter); 161 162 return transport; 163 } 164 165 override DatagramTransport makeDatagramTransport(Socket socket, 166 DatagramProtocol datagramProtocol, Address remoteAddress, Waiter waiter) 167 { 168 auto asyncUDPSocket = new AsyncUDPSocket(this.eventLoop, socket.handle); 169 170 auto datagramTransport = new LibasyncDatagramTransport(this, socket, 171 asyncUDPSocket); 172 173 asyncUDPSocket.run(&datagramTransport.handleUDPEvent); 174 175 datagramTransport.setProtocol(datagramProtocol, waiter); 176 177 if (waiter !is null) 178 this.callSoon(&waiter.setResultUnlessCancelled); 179 180 return datagramTransport; 181 } 182 183 @Coroutine 184 override AddressInfo[] getAddressInfo(in char[] host, in char[] service, 185 AddressFamily addressFamily = UNSPECIFIED!AddressFamily, 186 SocketType socketType = UNSPECIFIED!SocketType, 187 ProtocolType protocolType = UNSPECIFIED!ProtocolType, 188 AddressInfoFlags addressInfoFlags = UNSPECIFIED!AddressInfoFlags) 189 { 190 // no async implementation in libasync yet, use the std.socket.getAddresInfo implementation; 191 return std.socket.getAddressInfo(host, service, addressFamily, 192 socketType, protocolType, addressInfoFlags); 193 } 194 195 override void startServing(ProtocolFactory protocolFactory, Socket socket, 196 SslContext sslContext, ServerImpl server) 197 { 198 auto asyncTCPListener = new AsyncTCPListener(this.eventLoop, 199 socket.handle); 200 NetworkAddress localAddress; 201 Address address = socket.localAddress; 202 203 (cast(byte*) localAddress.sockAddr)[0 .. address.nameLen] = 204 (cast(byte*) address.name)[0 .. address.nameLen]; 205 asyncTCPListener.local(localAddress); 206 207 this.activeListeners ~= Listener(server, asyncTCPListener); 208 209 asyncTCPListener.run((AsyncTCPConnection connection) { 210 auto socket1 = new Socket(cast(socket_t) connection.socket, 211 socket.addressFamily); 212 213 auto transport = new LibasyncTransport(this, socket1, connection); 214 215 transport.setProtocol(protocolFactory()); 216 return &transport.handleTCPEvent; 217 }); 218 server.attach; 219 } 220 221 override void stopServing(Socket socket) 222 { 223 auto found = this.activeListeners.find!( 224 l => l.listener.socket == socket.handle); 225 226 assert(!found.empty); 227 228 auto server = found[0].server; 229 auto listener = found[0].listener; 230 231 found[0] = found[$ - 1]; 232 found[$ - 1] = Listener(null, null); 233 --this.activeListeners.length; 234 235 listener.kill; 236 237 // normally detach should be called on closing event of the listening 238 // socket, but libasync does not have such an event. so just call it 239 // 10 milliseconds later. 240 scheduleCallback(10.msecs, callback(this, &server.detach)); 241 } 242 243 244 version (Posix) 245 { 246 override void addSignalHandler(int sig, void delegate() handler) 247 { 248 assert(0, "addSignalHandler not implemented yet"); 249 } 250 251 override void removeSignalHandler(int sig) 252 { 253 assert(0, "removeSignalHandler not implemented yet"); 254 } 255 } 256 257 //void setExceptionHandler(void function(EventLoop, ExceptionContext) handler) 258 //{ 259 260 //} 261 262 //void defaultExceptionHandler(ExceptionContext context) 263 //{ 264 265 //} 266 267 //void callExceptionHandler(ExceptionContext context) 268 //{ 269 270 //} 271 } 272 273 class LibasyncEventLoopPolicy : EventLoopPolicy 274 { 275 override EventLoop newEventLoop() 276 { 277 return new LibasyncEventLoop; 278 } 279 280 //version (Posix) 281 //{ 282 // override ChildWatcher getChildWatcher() 283 // { 284 // assert(0, "Not implemented"); 285 // } 286 287 // override void setChildWatcher(ChildWatcher watcher) 288 // { 289 // assert(0, "Not implemented"); 290 // } 291 //} 292 } 293 294 private final class LibasyncTransport : AbstractBaseTransport, Transport 295 { 296 private EventLoop eventLoop; 297 private Socket _socket; 298 private AsyncTCPConnection connection; 299 private Waiter waiter = null; 300 private Protocol protocol; 301 private uint connectionLost = 0; 302 private bool closing = false; 303 private bool readingPaused = false; 304 private bool writingPaused = false; 305 private void[] writeBuffer; 306 private BufferLimits writeBufferLimits; 307 308 this(EventLoop eventLoop, Socket socket, AsyncTCPConnection connection) 309 in 310 { 311 assert(eventLoop !is null); 312 assert(socket !is null); 313 assert(connection !is null); 314 } 315 body 316 { 317 this.eventLoop = eventLoop; 318 this._socket = socket; 319 this.connection = connection; 320 setWriteBufferLimits; 321 } 322 323 @property 324 Socket socket() 325 { 326 return this._socket; 327 } 328 329 void setProtocol(Protocol protocol, Waiter waiter = null) 330 in 331 { 332 assert(protocol !is null); 333 assert(this.protocol is null); 334 assert(this.waiter is null); 335 } 336 body 337 { 338 this.protocol = protocol; 339 this.waiter = waiter; 340 } 341 342 private void onConnect() 343 in 344 { 345 assert(this.connection.isConnected); 346 assert(this._socket.handle == this.connection.socket); 347 assert(this.protocol !is null); 348 } 349 body 350 { 351 this.eventLoop.callSoon(&this.protocol.connectionMade, this); 352 353 if (this.waiter !is null) 354 this.eventLoop.callSoon(&this.waiter.setResultUnlessCancelled); 355 } 356 357 private void onRead() 358 in 359 { 360 assert(this.protocol !is null); 361 } 362 body 363 { 364 if (this.readingPaused) 365 return; 366 367 static ubyte[] readBuffer = new ubyte[64 * 1024]; 368 Appender!(ubyte[]) receivedData; 369 370 while (true) 371 { 372 auto length = this.connection.recv(readBuffer); 373 374 receivedData ~= readBuffer[0 .. length]; 375 376 if (length < readBuffer.length) 377 break; 378 } 379 380 if (!receivedData.data.empty) 381 { 382 this.eventLoop.callSoon(&this.protocol.dataReceived, 383 receivedData.data); 384 } 385 } 386 387 private void onWrite() 388 in 389 { 390 assert(this.protocol !is null); 391 } 392 body 393 { 394 if (!this.writeBuffer.empty) 395 { 396 auto sent = this.connection.send(cast(ubyte[]) this.writeBuffer); 397 this.writeBuffer = this.writeBuffer[sent .. $]; 398 } 399 400 if (this.writingPaused && 401 this.writeBuffer.length <= this.writeBufferLimits.low) 402 { 403 this.protocol.resumeWriting; 404 this.writingPaused = false; 405 } 406 } 407 408 private void onClose(SocketOSException socketOSException) 409 in 410 { 411 assert(this.protocol !is null); 412 } 413 body 414 { 415 this.connection = null; 416 this.eventLoop.callSoon(&this.protocol.connectionLost, 417 socketOSException); 418 } 419 420 void handleTCPEvent(TCPEvent event) 421 { 422 final switch (event) 423 { 424 case TCPEvent.CONNECT: 425 onConnect(); 426 break; 427 case TCPEvent.READ: 428 onRead(); 429 break; 430 case TCPEvent.WRITE: 431 onWrite(); 432 break; 433 case TCPEvent.CLOSE: 434 onClose(null); 435 break; 436 case TCPEvent.ERROR: 437 onClose(new SocketOSException(connection.error())); 438 } 439 } 440 441 /** 442 * Transport interface 443 */ 444 override Socket getExtraInfoSocket() 445 { 446 return socket; 447 } 448 449 void close() 450 { 451 if (this.closing) 452 return; 453 454 this.closing = true; 455 this.eventLoop.callSoon(&this.connection.kill, false); 456 } 457 458 void pauseReading() 459 { 460 if (this.closing || this.connectionLost) 461 throw new Exception("Cannot pauseReading() when closing"); 462 if (this.readingPaused) 463 throw new Exception("Reading is already paused"); 464 465 this.readingPaused = true; 466 } 467 468 void resumeReading() 469 { 470 if (!this.readingPaused) 471 throw new Exception("Reading is not paused"); 472 this.readingPaused = false; 473 } 474 475 void abort() 476 { 477 if (this.connectionLost) 478 return; 479 480 this.eventLoop.callSoon(&this.connection.kill, true); 481 ++this.connectionLost; 482 } 483 484 bool canWriteEof() 485 { 486 return true; 487 } 488 489 size_t getWriteBufferSize() 490 { 491 return writeBuffer.length; 492 } 493 494 BufferLimits getWriteBufferLimits() 495 { 496 return writeBufferLimits; 497 } 498 499 void setWriteBufferLimits(Nullable!size_t high = Nullable!size_t(), 500 Nullable!size_t low = Nullable!size_t()) 501 { 502 if (high.isNull) 503 { 504 if (low.isNull) 505 high = 64 * 1024; 506 else 507 high = 4 * low; 508 } 509 510 if (low.isNull) 511 low = high / 4; 512 513 if (high < low) 514 low = high; 515 516 this.writeBufferLimits.high = high; 517 this.writeBufferLimits.low = low; 518 } 519 520 void write(const(void)[] data) 521 in 522 { 523 assert(this.protocol !is null); 524 assert(!this.writingPaused); 525 } 526 body 527 { 528 enforce(this.connection !is null, "Disconnected transport"); 529 530 if (this.writeBuffer.empty) 531 { 532 auto sent = this.connection.send(cast(const ubyte[]) data); 533 if (sent < data.length) 534 this.writeBuffer = data[sent .. $].dup; 535 } 536 else 537 { 538 this.writeBuffer ~= data; 539 } 540 541 if (this.writeBuffer.length >= this.writeBufferLimits.high) 542 { 543 this.protocol.pauseWriting; 544 this.writingPaused = true; 545 } 546 } 547 548 void writeEof() 549 { 550 close; 551 } 552 } 553 554 private final class LibasyncDatagramTransport : AbstractBaseTransport, DatagramTransport 555 { 556 private EventLoop eventLoop; 557 private Socket _socket; 558 private AsyncUDPSocket udpSocket; 559 private Waiter waiter = null; 560 private DatagramProtocol datagramProtocol = null; 561 562 this(EventLoop eventLoop, Socket socket, AsyncUDPSocket udpSocket) 563 in 564 { 565 assert(eventLoop !is null); 566 assert(socket !is null); 567 assert(udpSocket !is null); 568 assert(socket.handle == udpSocket.socket); 569 } 570 body 571 { 572 this.eventLoop = eventLoop; 573 this._socket = socket; 574 this.udpSocket = udpSocket; 575 } 576 577 @property 578 Socket socket() 579 { 580 return this._socket; 581 } 582 583 void setProtocol(DatagramProtocol datagramProtocol, Waiter waiter = null) 584 in 585 { 586 assert(datagramProtocol !is null); 587 assert(this.datagramProtocol is null); 588 assert(this.waiter is null); 589 } 590 body 591 { 592 this.datagramProtocol = datagramProtocol; 593 this.waiter = waiter; 594 } 595 596 private void onRead() 597 in 598 { 599 assert(this.datagramProtocol !is null); 600 } 601 body 602 { 603 ubyte[] readBuffer = new ubyte[1501]; 604 NetworkAddress networkAddress; 605 606 auto length = this.udpSocket.recvFrom(readBuffer, networkAddress); 607 608 enforce(length < readBuffer.length, 609 "Unexpected UDP package size > 1500 bytes"); 610 611 Address tmp; 612 Address address = new UnknownAddressReference( 613 cast(typeof(tmp.name)) networkAddress.sockAddr, 614 cast(uint) networkAddress.sockAddrLen); 615 616 this.eventLoop.callSoon(&this.datagramProtocol.datagramReceived, 617 readBuffer[0 .. length], address); 618 } 619 620 private void onWrite() 621 in 622 { 623 assert(this.datagramProtocol !is null); 624 } 625 body 626 { 627 } 628 629 void handleUDPEvent(UDPEvent event) 630 { 631 final switch (event) 632 { 633 case UDPEvent.READ: 634 onRead(); 635 break; 636 case UDPEvent.WRITE: 637 onWrite(); 638 break; 639 case UDPEvent.ERROR: 640 assert(0, udpSocket.error()); 641 } 642 } 643 644 /** 645 * DatagramTransport interface 646 */ 647 648 override Socket getExtraInfoSocket() 649 { 650 return socket; 651 } 652 653 void close() 654 { 655 this.eventLoop.callSoon(&this.udpSocket.kill); 656 } 657 658 void sendTo(const(void)[] data, Address address = null) 659 in 660 { 661 assert(this.datagramProtocol !is null); 662 } 663 body 664 { 665 enforce(address !is null, "default remote not supported yet"); 666 667 NetworkAddress networkAddress; 668 669 (cast(byte*) networkAddress.sockAddr)[0 .. address.nameLen] = 670 (cast(byte*) address.name)[0 .. address.nameLen]; 671 672 this.udpSocket.sendTo(cast(const ubyte[]) data, networkAddress); 673 } 674 675 void abort() 676 { 677 this.eventLoop.callSoon(&this.udpSocket.kill); 678 } 679 }