1 /** 2 * Transport interfaces. 3 * 4 * Copyright: © 2015-2016 Dragos Carp 5 * License: Boost Software License - Version 1.0 6 * Authors: Dragos Carp 7 */ 8 module asynchronous.transports; 9 10 import std.process : Pid, Pipe; 11 import std.socket : Address, Socket; 12 import std.typecons; 13 14 /** 15 * Interface for transports. 16 */ 17 interface BaseTransport 18 { 19 /** 20 * Get optional transport information. 21 */ 22 23 final auto getExtraInfo(string name)() 24 { 25 static if (name == "peername") 26 return getExtraInfoPeername; 27 else static if (name == "socket") 28 return getExtraInfoSocket; 29 else static if (name == "sockname") 30 return getExtraInfoSockname; 31 else 32 assert(0, "Unknown extra info name"); 33 } 34 35 protected string getExtraInfoPeername(); 36 protected Socket getExtraInfoSocket(); 37 protected string getExtraInfoSockname(); 38 39 /** 40 * Close the transport. 41 * 42 * If the transport has a buffer for outgoing data, buffered data will be 43 * flushed asynchronously. No more data will be received. After all buffered 44 * data is flushed, the protocol’s $(D_PSYMBOL connectionLost()) method will 45 * be called with $(D_KEYWORD null) as its argument. 46 */ 47 void close(); 48 } 49 50 /** 51 * Interface for read-only transports. 52 */ 53 interface ReadTransport : BaseTransport 54 { 55 /** 56 * Pause the receiving end. 57 * 58 * No data will be passed to the protocol's $(D_PSYMBOL dataReceived()) 59 * method until $(D_PSYMBOL resumeReading()) is called. 60 */ 61 void pauseReading(); 62 63 /** 64 * Resume the receiving end. 65 * 66 * The protocol’s $(D_PSYMBOL dataReceived()) method will be called once 67 * again if some data is available for reading. 68 */ 69 void resumeReading(); 70 } 71 72 alias BufferLimits = Tuple!(size_t, "high", size_t, "low"); 73 74 /** 75 * Interface for write-only transports. 76 */ 77 interface WriteTransport : BaseTransport 78 { 79 /** 80 * Close the transport immediately, without waiting for pending operations 81 * to complete. 82 * 83 * Buffered data will be lost. No more data will be received. The protocol's 84 * $(D_PSYMBOL connectionLost()) method will eventually be called with 85 * $(D_KEYWORD null) as its argument. 86 */ 87 void abort(); 88 89 /** 90 * Returns: $(D_KEYWORD true) if this transport supports $(D_PSYMBOL 91 * writeEof()), $(D_KEYWORD false) if not. 92 */ 93 bool canWriteEof(); 94 95 /** 96 * Returns: the current size of the output buffer used by the transport. 97 */ 98 size_t getWriteBufferSize(); 99 100 /** 101 * Get the high- and low-water limits for write flow control. 102 * 103 * Returns: a tuple (low, high) where low and high are positive number of 104 * bytes. 105 */ 106 BufferLimits getWriteBufferLimits(); 107 108 /** 109 * Set the high- and low-water limits for write flow control. 110 * 111 * These two values control when the protocol's $(D_PSYMBOL 112 * pauseWriting()) and $(D_PSYMBOL resumeWriting()) methods are called. If 113 * specified, the low-water limit must be less than or equal to the 114 * high-water limit. 115 * 116 * The defaults are implementation-specific. If only the high-water limit 117 * is given, the low-water limit defaults to an implementation-specific 118 * value less than or equal to the high-water limit. Setting $(D_PSYMBOL 119 * high) to zero forces low to zero as well, and causes $(D_PSYMBOL 120 * pauseWriting()) to be called whenever the buffer becomes 121 * non-empty. Setting $(D_PSYMBOL low) to zero causes $(D_PSYMBOL 122 * resumeWriting()) to be called only once the buffer is empty. Use of 123 * zero for either limit is generally sub-optimal as it reduces 124 * opportunities for doing I/O and computation concurrently. 125 */ 126 void setWriteBufferLimits(Nullable!size_t high = Nullable!size_t(), 127 Nullable!size_t low = Nullable!size_t()); 128 129 /** 130 * Write some data bytes to the transport. 131 * 132 * This does not block; it buffers the data and arranges for it to be sent 133 * out asynchronously. 134 */ 135 void write(const(void)[] data); 136 137 /** 138 * Close the write end of the transport after flushing buffered data. 139 * 140 * Data may still be received. 141 * 142 * This method can throw $(D_PSYMBOL NotImplementedException) if the transport 143 * (e.g. SSL) doesn’t support half-closes. 144 */ 145 void writeEof(); 146 } 147 148 /** 149 * Interface representing a bidirectional transport. 150 * 151 * There may be several implementations, but typically, the user does not 152 * implement new transports; rather, the platform provides some useful 153 * transports that are implemented using the platform's best practices. 154 * 155 * The user never instantiates a transport directly; they call a utility 156 * function, passing it a protocol factory and other information necessary to 157 * create the transport and protocol. (E.g. $(D_PSYMBOL 158 * EventLoop.createConnection()) or $(D_PSYMBOL EventLoop.createServer()).) 159 * 160 * The utility function will asynchronously create a transport and a protocol 161 * and hook them up by calling the protocol's $(D_PSYMBOL connectionMade()) 162 * method, passing it the transport. 163 */ 164 interface Transport : ReadTransport, WriteTransport 165 { 166 } 167 168 /** 169 * Interface for datagram (UDP) transports. 170 */ 171 interface DatagramTransport : BaseTransport 172 { 173 /** 174 * Send the data bytes to the remote peer given by address (a 175 * transport-dependent target address). 176 * 177 * If $(D_PSYMBOL address) is $(D_KEYWORD null), the data is sent to the 178 * target address given on transport creation. 179 * 180 * This method does not block; it buffers the data and arranges for it to be 181 * sent out asynchronously. 182 */ 183 void sendTo(const(void)[] data, Address address = null); 184 185 /** 186 * Close the transport immediately, without waiting for pending operations 187 * to complete. 188 * 189 * Buffered data will be lost. No more data will be received. The protocol’s 190 * $(D_PSYMBOL connectionLost()) method will eventually be called with 191 * $(D_KEYWORD null) as its argument. 192 */ 193 void abort(); 194 } 195 196 alias SubprocessStatus = Tuple!(bool, "terminated", int, "status"); 197 198 interface SubprocessTransport : BaseTransport 199 { 200 /** 201 * Returns: the subprocess process id as an integer. 202 */ 203 Pid getPid(); 204 205 /** 206 * Returns: the transport for the communication pipe corresponding to the 207 * integer file descriptor fd: 208 * 209 * 0: readable streaming transport of the standard input (stdin), or throws 210 * $(D_PSYMBOL Exception) if the subprocess was not created with 211 * stdin = PIPE 212 * 1: writable streaming transport of the standard output (stdout), or 213 * throws $(D_PSYMBOL Exception) if the subprocess was not created with 214 * stdout = PIPE 215 * 2: writable streaming transport of the standard error (stderr), or throws 216 * $(D_PSYMBOL Exception) if the subprocess was not created with 217 * stderr = PIPE 218 * other fd: throws $(D_PSYMBOL Exception) 219 */ 220 Pipe getPipeTransport(int fd); 221 222 /** 223 * Returns: the subprocess exit status, similarly to the $(D_PSYMBOL 224 * std.process.tryWait). 225 */ 226 SubprocessStatus getStatus(); 227 228 /** 229 * Kill the subprocess. 230 * 231 * On POSIX systems, the function sends SIGKILL to the subprocess. On 232 * Windows, this method is an alias for $(D_PSYMBOL terminate()). 233 */ 234 void kill(); 235 236 /** 237 * Send the signal number to the subprocess, as in $(D_PSYMBOL 238 * std.process.kill). 239 */ 240 void sendSignal(int signal); 241 242 /** 243 * Ask the subprocess to stop. This method is an alias for the close() 244 * method. 245 * 246 * On POSIX systems, this method sends SIGTERM to the subprocess. On 247 * Windows, the Windows API function TerminateProcess() is called to stop 248 * the subprocess. 249 */ 250 void terminate(); 251 252 /** 253 * Ask the subprocess to stop by calling the terminate() method if the 254 * subprocess hasn’t returned yet, and close transports of all pipes (stdin, 255 * stdout and stderr). 256 */ 257 void close(); 258 } 259 260 package abstract class AbstractBaseTransport : BaseTransport 261 { 262 protected override string getExtraInfoPeername() 263 { 264 auto socket = getExtraInfoSocket; 265 266 return socket !is null ? socket.remoteAddress.toString : null; 267 } 268 269 protected override Socket getExtraInfoSocket() 270 { 271 return null; 272 } 273 274 protected override string getExtraInfoSockname() 275 { 276 auto socket = getExtraInfoSocket; 277 278 return socket !is null ? socket.localAddress.toString : null; 279 } 280 }