1 /** 2 * Transport interfaces. 3 */ 4 module asynchronous.transports; 5 6 import std.process : Pid, Pipe; 7 import std.socket : Address, Socket; 8 import std.typecons; 9 10 /** 11 * Interface for transports. 12 */ 13 interface BaseTransport 14 { 15 /** 16 * Get optional transport information. 17 */ 18 19 final auto getExtraInfo(string name)() 20 { 21 static if (name == "peername") 22 return getExtraInfoPeername; 23 else static if (name == "socket") 24 return getExtraInfoSocket; 25 else static if (name == "sockname") 26 return getExtraInfoSockname; 27 else 28 assert(0, "Unknown extra info name"); 29 } 30 31 protected string getExtraInfoPeername(); 32 protected Socket getExtraInfoSocket(); 33 protected string getExtraInfoSockname(); 34 35 /** 36 * Close the transport. 37 * 38 * If the transport has a buffer for outgoing data, buffered data will be 39 * flushed asynchronously. No more data will be received. After all buffered 40 * data is flushed, the protocol’s $(D_PSYMBOL connectionLost()) method will 41 * be called with $(D_KEYWORD null) as its argument. 42 */ 43 void close(); 44 } 45 46 /** 47 * Interface for read-only transports. 48 */ 49 interface ReadTransport : BaseTransport 50 { 51 /** 52 * Pause the receiving end. 53 * 54 * No data will be passed to the protocol's $(D_PSYMBOL dataReceived()) 55 * method until $(D_PSYMBOL resumeReading()) is called. 56 */ 57 void pauseReading(); 58 59 /** 60 * Resume the receiving end. 61 * 62 * The protocol’s $(D_PSYMBOL dataReceived()) method will be called once 63 * again if some data is available for reading. 64 */ 65 void resumeReading(); 66 } 67 68 alias BufferLimits = Tuple!(size_t, "high", size_t, "low"); 69 70 /** 71 * Interface for write-only transports. 72 */ 73 interface WriteTransport : BaseTransport 74 { 75 /** 76 * Close the transport immediately, without waiting for pending operations 77 * to complete. 78 * 79 * Buffered data will be lost. No more data will be received. The protocol's 80 * $(D_PSYMBOL connectionLost()) method will eventually be called with 81 * $(D_KEYWORD null) as its argument. 82 */ 83 void abort(); 84 85 /** 86 * Returns: $(D_KEYWORD true) if this transport supports $(D_PSYMBOL 87 * writeEof()), $(D_KEYWORD false) if not. 88 */ 89 bool canWriteEof(); 90 91 /** 92 * Returns: the current size of the output buffer used by the transport. 93 */ 94 size_t getWriteBufferSize(); 95 96 /** 97 * Get the high- and low-water limits for write flow control. 98 * 99 * Returns: a tuple (low, high) where low and high are positive number of 100 * bytes. 101 */ 102 BufferLimits getWriteBufferLimits(); 103 104 /** 105 * Set the high- and low-water limits for write flow control. 106 * 107 * These two values control when call the protocol's $(D_PSYMBOL 108 * pauseWriting()) and $(D_PSYMBOL resumeWriting()) methods are called. If 109 * specified, the low-water limit must be less than or equal to the 110 * high-water limit. 111 * 112 * The defaults are implementation-specific. If only the high-water limit 113 * is given, the low-water limit defaults to a implementation-specific value 114 * less than or equal to the high-water limit. Setting $(D_PSYMBOL high) to 115 * zero forces low to zero as well, and causes $(D_PSYMBOL pauseWriting()) 116 * to be called whenever the buffer becomes non-empty. Setting $(D_PSYMBOL 117 * low) to zero causes $(D_PSYMBOL resumeWriting()) to be called only once 118 * the buffer is empty. Use of zero for either limit is generally 119 * sub-optimal as it reduces opportunities for doing I/O and computation 120 * concurrently. 121 */ 122 void setWriteBufferLimits(Nullable!size_t high = Nullable!size_t(), 123 Nullable!size_t low = Nullable!size_t()); 124 125 /** 126 * Write some data bytes to the transport. 127 * 128 * This does not block; it buffers the data and arranges for it to be sent 129 * out asynchronously. 130 */ 131 void write(const(void)[] data); 132 133 /** 134 * Close the write end of the transport after flushing buffered data. 135 * 136 * Data may still be received. 137 * 138 * This method can throw $(D_PSYMBOL NotImplementedError) if the transport 139 * (e.g. SSL) doesn’t support half-closes. 140 */ 141 void writeEof(); 142 } 143 144 /** 145 * Interface representing a bidirectional transport. 146 * 147 * There may be several implementations, but typically, the user does not 148 * implement new transports; rather, the platform provides some useful 149 * transports that are implemented using the platform's best practices. 150 * 151 * The user never instantiates a transport directly; they call a utility 152 * function, passing it a protocol factory and other information necessary to 153 * create the transport and protocol. (E.g. $(D_PSYMBOL 154 * EventLoop.createConnection()) or $(D_PSYMBOL EventLoop.createServer()).) 155 * 156 * The utility function will asynchronously create a transport and a protocol 157 * and hook them up by calling the protocol's $(D_PSYMBOL connectionMade()) 158 * method, passing it the transport. 159 */ 160 interface Transport : ReadTransport, WriteTransport 161 { 162 } 163 164 /** 165 * Interface for datagram (UDP) transports. 166 */ 167 interface DatagramTransport : BaseTransport 168 { 169 /** 170 * Send the data bytes to the remote peer given by address (a 171 * transport-dependent target address). 172 * 173 * If $(D_PSYMBOL address) is $(D_KEYWORD null), the data is sent to the 174 * target address given on transport 175 * creation. 176 * 177 * This method does not block; it buffers the data and arranges for it to be 178 * sent out asynchronously. 179 */ 180 void sendTo(const(void)[] data, Address address = null); 181 182 /** 183 * Close the transport immediately, without waiting for pending operations 184 * to complete. 185 * 186 * Buffered data will be lost. No more data will be received. The protocol’s 187 * $(D_PSYMBOL connectionLost()) method will eventually be called with 188 * $(D_KEYWORD null) as its argument. 189 */ 190 void abort(); 191 } 192 193 alias SubprocessStatus = Tuple!(bool, "terminated", int, "status"); 194 195 interface SubprocessTransport : BaseTransport 196 { 197 /** 198 * Returns: the subprocess process id as an integer. 199 */ 200 Pid getPid(); 201 202 203 /** 204 * Returns: the transport for the communication pipe corresponding to the 205 * integer file descriptor fd: 206 * 207 * 0: readable streaming transport of the standard input (stdin), or throws 208 * $(D_PSYMBOL Exception) if the subprocess was not created with 209 * stdin = PIPE 210 * 1: writable streaming transport of the standard output (stdout), or 211 * throws $(D_PSYMBOL Exception) if the subprocess was not created with 212 * stdout = PIPE 213 * 2: writable streaming transport of the standard error (stderr), or throws 214 * $(D_PSYMBOL Exception) if the subprocess was not created with 215 * stderr = PIPE 216 * other fd: throws $(D_PSYMBOL Exception) 217 */ 218 Pipe getPipeTransport(int fd); 219 220 /** 221 * Returns: the subprocess exit status, similarly to the $(D_PSYMBOL 222 * std.process.tryWait). 223 */ 224 SubprocessStatus getStatus(); 225 226 /** 227 * Kill the subprocess. 228 * 229 * On POSIX systems, the function sends SIGKILL to the subprocess. On 230 * Windows, this method is an alias for $(D_PSYMBOL terminate()). 231 */ 232 void kill(); 233 234 /** 235 * Send the signal number to the subprocess, as in $(D_PSYMBOL 236 * std.process.kill). 237 */ 238 void sendSignal(int signal); 239 240 /** 241 * Ask the subprocess to stop. This method is an alias for the close() 242 * method. 243 * 244 * On POSIX systems, this method sends SIGTERM to the subprocess. On 245 * Windows, the Windows API function TerminateProcess() is called to stop 246 * the subprocess. 247 */ 248 void terminate(); 249 250 /** 251 * Ask the subprocess to stop by calling the terminate() method if the 252 * subprocess hasn’t returned yet, and close transports of all pipes (stdin, 253 * stdout and stderr). 254 */ 255 void close(); 256 } 257 258 package abstract class AbstractBaseTransport : BaseTransport 259 { 260 protected override string getExtraInfoPeername() 261 { 262 auto socket = getExtraInfoSocket; 263 264 return socket !is null ? socket.remoteAddress.toString : null; 265 } 266 267 protected override Socket getExtraInfoSocket() 268 { 269 return null; 270 } 271 272 protected override string getExtraInfoSockname() 273 { 274 auto socket = getExtraInfoSocket; 275 276 return socket !is null ? socket.localAddress.toString : null; 277 } 278 }