1 /** 2 * libasync event loop integration. 3 * 4 * Copyright: © 2015-2016 Dragos Carp 5 * License: Boost Software License - Version 1.0 6 * Authors: Dragos Carp 7 */ 8 module asynchronous.libasync.events; 9 10 import std.algorithm; 11 import std.array; 12 import std.datetime; 13 import std.exception : enforce; 14 import std.process; 15 import std.socket; 16 import std..string; 17 import std.traits; 18 import std.typecons; 19 import libasync.events : EventLoop_ = EventLoop, NetworkAddress; 20 import libasync.signal : AsyncSignal; 21 import libasync.timer : AsyncTimer; 22 import libasync.tcp : AsyncTCPConnection, AsyncTCPListener, TCPEvent; 23 import libasync.threads : destroyAsyncThreads; 24 import libasync.udp : AsyncUDPSocket, UDPEvent; 25 import asynchronous.events; 26 import asynchronous.futures; 27 import asynchronous.protocols; 28 import asynchronous.tasks; 29 import asynchronous.transports; 30 import asynchronous.types; 31 32 alias Protocol = asynchronous.protocols.Protocol; 33 34 shared static ~this() 35 { 36 destroyAsyncThreads; 37 } 38 39 package class LibasyncEventLoop : EventLoop 40 { 41 private EventLoop_ eventLoop; 42 43 alias Timers = ResourcePool!(AsyncTimer, EventLoop_); 44 private Timers timers; 45 46 private Appender!(CallbackHandle[]) nextCallbacks1; 47 private Appender!(CallbackHandle[]) nextCallbacks2; 48 private Appender!(CallbackHandle[])* currentAppender; 49 50 private Appender!(CallbackHandle[]) nextThreadSafeCallbacks; 51 private shared AsyncSignal newThreadSafeCallbacks; 52 53 private LibasyncTransport[] pendingConnections; 54 55 alias Listener = Tuple!(ServerImpl, "server", AsyncTCPListener, "listener"); 56 57 private Listener[] activeListeners; 58 59 this() 60 { 61 this.eventLoop = new EventLoop_; 62 this.timers = new Timers(this.eventLoop); 63 this.currentAppender = &nextCallbacks1; 64 65 this.newThreadSafeCallbacks = new shared AsyncSignal(this.eventLoop); 66 this.newThreadSafeCallbacks.run(&scheduleThreadSafeCallbacks); 67 } 68 69 override void runForever() 70 { 71 enforce(this.state == State.STOPPED, 72 "Unexpected event loop state %s".format(this.state)); 73 74 this.state = State.RUNNING; 75 76 while (true) 77 { 78 final switch (this.state) 79 { 80 case State.STOPPED: 81 case State.STOPPING: 82 this.state = State.STOPPED; 83 return; 84 case State.RUNNING: 85 if (this.currentAppender.data.empty) 86 this.eventLoop.loop(-1.msecs); 87 88 auto callbacks = this.currentAppender; 89 90 if (this.currentAppender == &this.nextCallbacks1) 91 this.currentAppender = &this.nextCallbacks2; 92 else 93 this.currentAppender = &this.nextCallbacks1; 94 95 assert(this.currentAppender.data.empty); 96 97 foreach (callback; callbacks.data.filter!(a => !a.cancelled)) 98 callback(); 99 100 callbacks.clear; 101 break; 102 case State.CLOSED: 103 throw new Exception("Event loop closed while running."); 104 } 105 } 106 } 107 108 override void scheduleCallback(CallbackHandle callback) 109 { 110 if (!callback.cancelled) 111 currentAppender.put(callback); 112 } 113 114 private void scheduleThreadSafeCallbacks() 115 { 116 synchronized (this) 117 { 118 foreach (callback; nextThreadSafeCallbacks.data) 119 scheduleCallback(callback); 120 121 nextThreadSafeCallbacks.clear; 122 } 123 } 124 125 override void scheduleCallbackThreadSafe(CallbackHandle callback) 126 { 127 synchronized (this) 128 { 129 nextThreadSafeCallbacks ~= callback; 130 } 131 132 newThreadSafeCallbacks.trigger; 133 } 134 135 override void scheduleCallback(Duration delay, CallbackHandle callback) 136 { 137 if (delay <= Duration.zero) 138 { 139 scheduleCallback(callback); 140 return; 141 } 142 143 auto timer = this.timers.acquire(); 144 timer.duration(delay).run({ 145 scheduleCallback(callback); 146 this.timers.release(timer); 147 }); 148 } 149 150 override void scheduleCallback(SysTime when, CallbackHandle callback) 151 { 152 Duration duration = when - time; 153 154 scheduleCallback(duration, callback); 155 } 156 157 @Coroutine 158 override void socketConnect(Socket socket, Address address) 159 { 160 auto asyncTCPConnection = new AsyncTCPConnection(this.eventLoop, 161 socket.handle); 162 NetworkAddress peerAddress; 163 164 (cast(byte*) peerAddress.sockAddr)[0 .. address.nameLen] = 165 (cast(byte*) address.name)[0 .. address.nameLen]; 166 asyncTCPConnection.peer = peerAddress; 167 168 auto connection = new LibasyncTransport(this, socket, 169 asyncTCPConnection); 170 171 asyncTCPConnection.run(&connection.handleTCPEvent); 172 173 this.pendingConnections ~= connection; 174 } 175 176 override Transport makeSocketTransport(Socket socket, Protocol protocol, 177 Waiter waiter) 178 { 179 auto index = this.pendingConnections.countUntil!( 180 a => a.socket == socket); 181 182 enforce(index >= 0, "Internal error"); 183 184 auto transport = this.pendingConnections[index]; 185 186 this.pendingConnections = this.pendingConnections.remove(index); 187 transport.setProtocol(protocol, waiter); 188 189 return transport; 190 } 191 192 override DatagramTransport makeDatagramTransport(Socket socket, 193 DatagramProtocol datagramProtocol, Address remoteAddress, Waiter waiter) 194 { 195 auto asyncUDPSocket = new AsyncUDPSocket(this.eventLoop, socket.handle); 196 197 auto datagramTransport = new LibasyncDatagramTransport(this, socket, 198 asyncUDPSocket); 199 200 asyncUDPSocket.run(&datagramTransport.handleUDPEvent); 201 202 datagramTransport.setProtocol(datagramProtocol, waiter); 203 204 if (waiter !is null) 205 this.callSoon(&waiter.setResultUnlessCancelled); 206 207 return datagramTransport; 208 } 209 210 @Coroutine 211 override AddressInfo[] getAddressInfo(in char[] host, in char[] service, 212 AddressFamily addressFamily = UNSPECIFIED!AddressFamily, 213 SocketType socketType = UNSPECIFIED!SocketType, 214 ProtocolType protocolType = UNSPECIFIED!ProtocolType, 215 AddressInfoFlags addressInfoFlags = UNSPECIFIED!AddressInfoFlags) 216 { 217 // no async implementation in libasync yet, use the 218 // std.socket.getAddresInfo implementation; 219 return std.socket.getAddressInfo(host, service, addressFamily, 220 socketType, protocolType, addressInfoFlags); 221 } 222 223 override void startServing(ProtocolFactory protocolFactory, Socket socket, 224 SslContext sslContext, ServerImpl server) 225 { 226 auto asyncTCPListener = new AsyncTCPListener(this.eventLoop, 227 socket.handle); 228 NetworkAddress localAddress; 229 Address address = socket.localAddress; 230 231 (cast(byte*) localAddress.sockAddr)[0 .. address.nameLen] = 232 (cast(byte*) address.name)[0 .. address.nameLen]; 233 asyncTCPListener.local(localAddress); 234 235 this.activeListeners ~= Listener(server, asyncTCPListener); 236 237 asyncTCPListener.run((AsyncTCPConnection connection) { 238 auto socket1 = new Socket(cast(socket_t) connection.socket, 239 socket.addressFamily); 240 241 auto transport = new LibasyncTransport(this, socket1, connection); 242 243 transport.setProtocol(protocolFactory()); 244 return &transport.handleTCPEvent; 245 }); 246 server.attach; 247 } 248 249 override void stopServing(Socket socket) 250 { 251 auto found = this.activeListeners.find!( 252 l => l.listener.socket == socket.handle); 253 254 assert(!found.empty); 255 256 auto server = found[0].server; 257 auto listener = found[0].listener; 258 259 found[0] = found[$ - 1]; 260 found[$ - 1] = Listener(null, null); 261 --this.activeListeners.length; 262 263 listener.kill; 264 265 // normally detach should be called on closing event of the listening 266 // socket, but libasync does not have such an event. so just call it 267 // 10 milliseconds later. 268 scheduleCallback(10.msecs, callback(this, &server.detach)); 269 } 270 271 version (Posix) 272 { 273 override void addSignalHandler(int sig, void delegate() handler) 274 { 275 assert(0, "addSignalHandler not implemented yet"); 276 } 277 278 override void removeSignalHandler(int sig) 279 { 280 assert(0, "removeSignalHandler not implemented yet"); 281 } 282 } 283 } 284 285 class LibasyncEventLoopPolicy : EventLoopPolicy 286 { 287 override EventLoop newEventLoop() 288 { 289 return new LibasyncEventLoop; 290 } 291 292 //version (Posix) 293 //{ 294 // override ChildWatcher getChildWatcher() 295 // { 296 // assert(0, "Not implemented"); 297 // } 298 299 // override void setChildWatcher(ChildWatcher watcher) 300 // { 301 // assert(0, "Not implemented"); 302 // } 303 //} 304 } 305 306 private final class LibasyncTransport : AbstractBaseTransport, Transport 307 { 308 private enum State : ubyte 309 { 310 CONNECTING, 311 CONNECTED, 312 EOF, 313 DISCONNECTED, 314 } 315 316 private EventLoop eventLoop; 317 private Socket _socket; 318 private AsyncTCPConnection connection; 319 private Waiter waiter = null; 320 private Protocol protocol; 321 private State state; 322 private bool readingPaused = false; 323 private bool writingPaused = false; 324 private void[] writeBuffer; 325 private BufferLimits writeBufferLimits; 326 private Duration writeRescheduleInterval = 1.msecs; 327 328 this(EventLoop eventLoop, Socket socket, AsyncTCPConnection connection) 329 in 330 { 331 assert(eventLoop !is null); 332 assert(socket !is null); 333 assert(connection !is null); 334 } 335 body 336 { 337 this.state = State.CONNECTING; 338 this.eventLoop = eventLoop; 339 this._socket = socket; 340 this.connection = connection; 341 setWriteBufferLimits; 342 } 343 344 @property 345 Socket socket() 346 { 347 return this._socket; 348 } 349 350 void setProtocol(Protocol protocol, Waiter waiter = null) 351 in 352 { 353 assert(this.state == State.CONNECTING); 354 assert(protocol !is null); 355 assert(this.protocol is null); 356 assert(this.waiter is null); 357 } 358 body 359 { 360 this.protocol = protocol; 361 this.waiter = waiter; 362 } 363 364 private void onConnect() 365 in 366 { 367 assert(this.state == State.CONNECTING); 368 assert(this.connection.isConnected); 369 assert(this._socket.handle == this.connection.socket); 370 assert(this.protocol !is null); 371 } 372 body 373 { 374 state = State.CONNECTED; 375 376 if (this.waiter !is null) 377 { 378 this.eventLoop.callSoon(&this.waiter.setResultUnlessCancelled); 379 this.waiter = null; 380 } 381 382 this.eventLoop.callSoon(&this.protocol.connectionMade, this); 383 } 384 385 private void onRead() 386 in 387 { 388 assert(this.state == State.CONNECTED); 389 assert(this.protocol !is null); 390 } 391 body 392 { 393 if (this.readingPaused) 394 return; 395 396 static ubyte[] readBuffer = new ubyte[64 * 1024]; 397 Appender!(ubyte[]) receivedData; 398 399 while (true) 400 { 401 auto length = this.connection.recv(readBuffer); 402 403 receivedData ~= readBuffer[0 .. length]; 404 405 if (length < readBuffer.length) 406 break; 407 } 408 409 if (!receivedData.data.empty) 410 this.eventLoop.callSoon(&this.protocol.dataReceived, 411 receivedData.data); 412 } 413 414 private void onWrite() 415 in 416 { 417 assert(this.state == State.CONNECTED || this.state == State.EOF); 418 assert(this.protocol !is null); 419 } 420 body 421 { 422 if (!this.writeBuffer.empty) 423 { 424 auto sent = this.connection.send(cast(ubyte[]) this.writeBuffer); 425 this.writeBuffer = this.writeBuffer[sent .. $]; 426 } 427 428 if (this.writingPaused && 429 this.writeBuffer.length <= this.writeBufferLimits.low) 430 { 431 this.protocol.resumeWriting; 432 this.writingPaused = false; 433 } 434 } 435 436 private void onClose() 437 in 438 { 439 assert(this.state != State.CONNECTING); 440 assert(this.protocol !is null); 441 } 442 body 443 { 444 if (this.connection is null) 445 return; 446 447 this.connection = null; 448 this.state = State.DISCONNECTED; 449 this.eventLoop.callSoon(&this.protocol.connectionLost, null); 450 } 451 452 private void onError() 453 in 454 { 455 assert(this.protocol !is null); 456 } 457 body 458 { 459 if (this.connection is null) 460 return; 461 462 if (this.state == State.CONNECTING) 463 { 464 if (this.waiter) 465 waiter.setException(new SocketOSException(connection.error())); 466 } 467 else 468 { 469 this.eventLoop.callSoon(&this.protocol.connectionLost, 470 new SocketOSException(connection.error())); 471 } 472 473 this.connection = null; 474 this.state = State.DISCONNECTED; 475 } 476 477 void handleTCPEvent(TCPEvent event) 478 { 479 final switch (event) 480 { 481 case TCPEvent.CONNECT: 482 onConnect; 483 break; 484 case TCPEvent.READ: 485 onRead; 486 break; 487 case TCPEvent.WRITE: 488 onWrite; 489 break; 490 case TCPEvent.CLOSE: 491 onClose; 492 break; 493 case TCPEvent.ERROR: 494 onError; 495 break; 496 } 497 } 498 499 /** 500 * Transport interface 501 */ 502 override Socket getExtraInfoSocket() 503 { 504 return socket; 505 } 506 507 void close() 508 { 509 if (this.state == State.DISCONNECTED) 510 return; 511 512 this.state = State.DISCONNECTED; 513 this.eventLoop.callSoon(&this.connection.kill, false); 514 } 515 516 void pauseReading() 517 { 518 if (this.state != State.CONNECTED) 519 throw new Exception("Cannot pauseReading() when closing"); 520 if (this.readingPaused) 521 throw new Exception("Reading is already paused"); 522 523 this.readingPaused = true; 524 } 525 526 void resumeReading() 527 { 528 if (!this.readingPaused) 529 throw new Exception("Reading is not paused"); 530 this.readingPaused = false; 531 } 532 533 void abort() 534 { 535 if (this.state == State.DISCONNECTED) 536 return; 537 538 this.state = State.DISCONNECTED; 539 this.eventLoop.callSoon(&this.connection.kill, true); 540 } 541 542 bool canWriteEof() 543 { 544 return true; 545 } 546 547 size_t getWriteBufferSize() 548 { 549 return writeBuffer.length; 550 } 551 552 BufferLimits getWriteBufferLimits() 553 { 554 return writeBufferLimits; 555 } 556 557 void setWriteBufferLimits(Nullable!size_t high = Nullable!size_t(), 558 Nullable!size_t low = Nullable!size_t()) 559 { 560 if (high.isNull) 561 { 562 if (low.isNull) 563 high = 64 * 1024; 564 else 565 high = 4 * low; 566 } 567 568 if (low.isNull) 569 low = high / 4; 570 571 if (high < low) 572 low = high; 573 574 this.writeBufferLimits.high = high; 575 this.writeBufferLimits.low = low; 576 } 577 578 void write(const(void)[] data) 579 in 580 { 581 assert(this.protocol !is null); 582 assert(!this.writingPaused); 583 } 584 body 585 { 586 enforce(this.connection !is null, "Disconnected transport"); 587 588 if (this.writeBuffer.empty) 589 { 590 auto sent = this.connection.send(cast(const ubyte[]) data); 591 if (sent < data.length) 592 this.writeBuffer = data[sent .. $].dup; 593 } 594 else 595 { 596 this.writeBuffer ~= data; 597 if (!this.writingPaused && this.writeBuffer.length >= this.writeBufferLimits.high) 598 { 599 this.protocol.pauseWriting; 600 this.writingPaused = true; 601 } 602 rescheduleOnWrite; 603 } 604 } 605 606 private void rescheduleOnWrite() 607 { 608 if (this.state == State.DISCONNECTED) 609 return; 610 611 if (this.writeBuffer.empty) 612 return; 613 614 auto bufferLength = this.writeBuffer.length; 615 616 onWrite; 617 if (this.writeBuffer.empty) 618 return; 619 620 if (this.writeBuffer.length < bufferLength) 621 { 622 this.writeRescheduleInterval = 1.msecs; 623 this.eventLoop.callSoon(&this.rescheduleOnWrite); 624 } 625 else 626 { 627 this.writeRescheduleInterval = min(this.writeRescheduleInterval * 2, 2.seconds); 628 this.eventLoop.callLater(this.writeRescheduleInterval, &this.rescheduleOnWrite); 629 } 630 } 631 632 void writeEof() 633 { 634 close; 635 } 636 } 637 638 private final class LibasyncDatagramTransport : AbstractBaseTransport, DatagramTransport 639 { 640 private EventLoop eventLoop; 641 private Socket _socket; 642 private AsyncUDPSocket udpSocket; 643 private Waiter waiter = null; 644 private DatagramProtocol datagramProtocol = null; 645 646 this(EventLoop eventLoop, Socket socket, AsyncUDPSocket udpSocket) 647 in 648 { 649 assert(eventLoop !is null); 650 assert(socket !is null); 651 assert(udpSocket !is null); 652 assert(socket.handle == udpSocket.socket); 653 } 654 body 655 { 656 this.eventLoop = eventLoop; 657 this._socket = socket; 658 this.udpSocket = udpSocket; 659 } 660 661 @property 662 Socket socket() 663 { 664 return this._socket; 665 } 666 667 void setProtocol(DatagramProtocol datagramProtocol, Waiter waiter = null) 668 in 669 { 670 assert(datagramProtocol !is null); 671 assert(this.datagramProtocol is null); 672 assert(this.waiter is null); 673 } 674 body 675 { 676 this.datagramProtocol = datagramProtocol; 677 this.waiter = waiter; 678 } 679 680 private void onRead() 681 in 682 { 683 assert(this.datagramProtocol !is null); 684 } 685 body 686 { 687 ubyte[] readBuffer = new ubyte[1501]; 688 NetworkAddress networkAddress; 689 690 auto length = this.udpSocket.recvFrom(readBuffer, networkAddress); 691 692 enforce(length < readBuffer.length, 693 "Unexpected UDP package size > 1500 bytes"); 694 695 Address address = new UnknownAddressReference( 696 cast(typeof(Address.init.name)) networkAddress.sockAddr, 697 cast(uint) networkAddress.sockAddrLen); 698 699 this.eventLoop.callSoon(&this.datagramProtocol.datagramReceived, 700 readBuffer[0 .. length], address); 701 } 702 703 private void onWrite() 704 in 705 { 706 assert(this.datagramProtocol !is null); 707 } 708 body 709 { 710 } 711 712 private void onError() 713 in 714 { 715 assert(this.datagramProtocol !is null); 716 } 717 body 718 { 719 this.eventLoop.callSoon(&this.datagramProtocol.errorReceived, 720 new SocketOSException(udpSocket.error())); 721 } 722 723 void handleUDPEvent(UDPEvent event) 724 { 725 final switch (event) 726 { 727 case UDPEvent.READ: 728 onRead; 729 break; 730 case UDPEvent.WRITE: 731 onWrite; 732 break; 733 case UDPEvent.ERROR: 734 onError; 735 break; 736 } 737 } 738 739 /** 740 * DatagramTransport interface 741 */ 742 743 override Socket getExtraInfoSocket() 744 { 745 return socket; 746 } 747 748 void close() 749 { 750 this.eventLoop.callSoon(&this.udpSocket.kill); 751 } 752 753 void sendTo(const(void)[] data, Address address = null) 754 in 755 { 756 assert(this.datagramProtocol !is null); 757 } 758 body 759 { 760 enforce(address !is null, "default remote not supported yet"); 761 762 NetworkAddress networkAddress; 763 764 (cast(byte*) networkAddress.sockAddr)[0 .. address.nameLen] = 765 (cast(byte*) address.name)[0 .. address.nameLen]; 766 767 this.udpSocket.sendTo(cast(const ubyte[]) data, networkAddress); 768 } 769 770 void abort() 771 { 772 this.eventLoop.callSoon(&this.udpSocket.kill); 773 } 774 }