1 /**
2  * Transport interfaces.
3  *
4  * Copyright: © 2015-2016 Dragos Carp
5  * License: Boost Software License - Version 1.0
6  * Authors: Dragos Carp
7  */
8 module asynchronous.transports;
9 
10 import std.process : Pid, Pipe;
11 import std.socket : Address, Socket;
12 import std.typecons;
13 
14 /**
15  * Interface for transports.
16  */
17 interface BaseTransport
18 {
19     /**
20      * Get optional transport information.
21      */
22 
23     final auto getExtraInfo(string name)()
24     {
25         static if (name == "peername")
26             return getExtraInfoPeername;
27         else static if (name == "socket")
28             return getExtraInfoSocket;
29         else static if (name == "sockname")
30             return getExtraInfoSockname;
31         else
32             assert(0, "Unknown extra info name");
33     }
34 
35     protected string getExtraInfoPeername();
36     protected Socket getExtraInfoSocket();
37     protected string getExtraInfoSockname();
38 
39     /**
40      * Close the transport.
41      *
42      * If the transport has a buffer for outgoing data, buffered data will be
43      * flushed asynchronously. No more data will be received. After all buffered
44      * data is flushed, the protocol’s $(D_PSYMBOL connectionLost()) method will
45      * be called with $(D_KEYWORD null) as its argument.
46      */
47     void close();
48 }
49 
50 /**
51  * Interface for read-only transports.
52  */
53 interface ReadTransport : BaseTransport
54 {
55     /**
56      * Pause the receiving end.
57      *
58      * No data will be passed to the protocol's $(D_PSYMBOL dataReceived())
59      * method until $(D_PSYMBOL resumeReading()) is called.
60      */
61     void pauseReading();
62 
63     /**
64      * Resume the receiving end.
65      *
66      * The protocol’s $(D_PSYMBOL dataReceived()) method will be called once
67      * again if some data is available for reading.
68      */
69     void resumeReading();
70 }
71 
72 alias BufferLimits = Tuple!(size_t, "high", size_t, "low");
73 
74 /**
75  * Interface for write-only transports.
76  */
77 interface WriteTransport : BaseTransport
78 {
79     /**
80      * Close the transport immediately, without waiting for pending operations
81      * to complete.
82      *
83      * Buffered data will be lost. No more data will be received. The protocol's
84      * $(D_PSYMBOL connectionLost()) method will eventually be called with
85      * $(D_KEYWORD null) as its argument.
86      */
87     void abort();
88 
89     /**
90      * Returns: $(D_KEYWORD true) if this transport supports $(D_PSYMBOL
91      * writeEof()), $(D_KEYWORD false) if not.
92      */
93     bool canWriteEof();
94 
95     /**
96      * Returns: the current size of the output buffer used by the transport.
97      */
98     size_t getWriteBufferSize();
99 
100     /**
101      * Get the high- and low-water limits for write flow control.
102      *
103      * Returns: a tuple (low, high) where low and high are positive number of
104      *          bytes.
105      */
106     BufferLimits getWriteBufferLimits();
107 
108     /**
109      * Set the high- and low-water limits for write flow control.
110      *
111      * These two values control when the protocol's $(D_PSYMBOL
112      * pauseWriting()) and $(D_PSYMBOL resumeWriting()) methods are called. If
113      * specified, the low-water limit must be less than or equal to the
114      * high-water limit.
115      *
116      * The defaults are implementation-specific. If only the high-water limit
117      * is given, the low-water limit defaults to an implementation-specific
118      * value less than or equal to the high-water limit. Setting $(D_PSYMBOL
119      * high) to zero forces low to zero as well, and causes $(D_PSYMBOL
120      * pauseWriting()) to be called whenever the buffer becomes
121      * non-empty. Setting $(D_PSYMBOL low) to zero causes $(D_PSYMBOL
122      * resumeWriting()) to be called only once the buffer is empty. Use of
123      * zero for either limit is generally sub-optimal as it reduces
124      * opportunities for doing I/O and computation concurrently.
125      */
126     void setWriteBufferLimits(Nullable!size_t high = Nullable!size_t(),
127         Nullable!size_t low = Nullable!size_t());
128 
129     /**
130      * Write some data bytes to the transport.
131      *
132      * This does not block; it buffers the data and arranges for it to be sent
133      * out asynchronously.
134      */
135     void write(const(void)[] data);
136 
137     /**
138      * Close the write end of the transport after flushing buffered data.
139      *
140      * Data may still be received.
141      *
142      * This method can throw $(D_PSYMBOL NotImplementedException) if the transport
143      * (e.g. SSL) doesn’t support half-closes.
144      */
145     void writeEof();
146 }
147 
148 /**
149  * Interface representing a bidirectional transport.
150  *
151  * There may be several implementations, but typically, the user does not
152  * implement new transports; rather, the platform provides some useful
153  * transports that are implemented using the platform's best practices.
154  *
155  * The user never instantiates a transport directly; they call a utility
156  * function, passing it a protocol factory and other information necessary to
157  * create the transport and protocol. (E.g. $(D_PSYMBOL
158  * EventLoop.createConnection()) or $(D_PSYMBOL EventLoop.createServer()).)
159  *
160  * The utility function will asynchronously create a transport and a protocol
161  * and hook them up by calling the protocol's $(D_PSYMBOL connectionMade())
162  * method, passing it the transport.
163  */
164 interface Transport : ReadTransport, WriteTransport
165 {
166 }
167 
168 /**
169  * Interface for datagram (UDP) transports.
170  */
171 interface DatagramTransport : BaseTransport
172 {
173     /**
174      * Send the data bytes to the remote peer given by address (a
175      * transport-dependent target address).
176      *
177      * If $(D_PSYMBOL address) is $(D_KEYWORD null), the data is sent to the
178      * target address given on transport creation.
179      *
180      * This method does not block; it buffers the data and arranges for it to be
181      * sent out asynchronously.
182      */
183     void sendTo(const(void)[] data, Address address = null);
184 
185     /**
186      * Close the transport immediately, without waiting for pending operations
187      * to complete.
188      *
189      * Buffered data will be lost. No more data will be received. The protocol’s
190      * $(D_PSYMBOL connectionLost()) method will eventually be called with
191      * $(D_KEYWORD null) as its argument.
192      */
193     void abort();
194 }
195 
196 alias SubprocessStatus = Tuple!(bool, "terminated", int, "status");
197 
198 interface SubprocessTransport : BaseTransport
199 {
200     /**
201      * Returns: the subprocess process id as an integer.
202      */
203     Pid getPid();
204 
205     /**
206      * Returns: the transport for the communication pipe corresponding to the
207      * integer file descriptor fd:
208      *
209      * 0: readable streaming transport of the standard input (stdin), or throws
210      *      $(D_PSYMBOL Exception) if the subprocess was not created with
211      *      stdin = PIPE
212      * 1: writable streaming transport of the standard output (stdout), or
213      *      throws $(D_PSYMBOL Exception) if the subprocess was not created with
214      *      stdout = PIPE
215      * 2: writable streaming transport of the standard error (stderr), or throws
216      *      $(D_PSYMBOL Exception) if the subprocess was not created with
217      *      stderr = PIPE
218      * other fd: throws $(D_PSYMBOL Exception)
219      */
220     Pipe getPipeTransport(int fd);
221 
222     /**
223      * Returns: the subprocess exit status, similarly to the $(D_PSYMBOL
224      *          std.process.tryWait).
225      */
226     SubprocessStatus getStatus();
227 
228     /**
229      * Kill the subprocess.
230      *
231      * On POSIX systems, the function sends SIGKILL to the subprocess. On
232      * Windows, this method is an alias for $(D_PSYMBOL terminate()).
233      */
234     void kill();
235 
236     /**
237      * Send the signal number to the subprocess, as in $(D_PSYMBOL
238      * std.process.kill).
239      */
240     void sendSignal(int signal);
241 
242     /**
243      * Ask the subprocess to stop. This method is an alias for the close()
244      * method.
245      *
246      * On POSIX systems, this method sends SIGTERM to the subprocess. On
247      * Windows, the Windows API function TerminateProcess() is called to stop
248      * the subprocess.
249      */
250     void terminate();
251 
252     /**
253      * Ask the subprocess to stop by calling the terminate() method if the
254      * subprocess hasn’t returned yet, and close transports of all pipes (stdin,
255      * stdout and stderr).
256      */
257     void close();
258 }
259 
260 package abstract class AbstractBaseTransport : BaseTransport
261 {
262     protected override string getExtraInfoPeername()
263     {
264         auto socket = getExtraInfoSocket;
265 
266         return socket !is null ? socket.remoteAddress.toString : null;
267     }
268 
269     protected override Socket getExtraInfoSocket()
270     {
271         return null;
272     }
273 
274     protected override string getExtraInfoSockname()
275     {
276         auto socket = getExtraInfoSocket;
277 
278         return socket !is null ? socket.localAddress.toString : null;
279     }
280 }