WPILibC++ 2023.4.3
Stream.h
Go to the documentation of this file.
1// Copyright (c) FIRST and other WPILib contributors.
2// Open Source Software; you can modify and/or share it under the terms of
3// the WPILib BSD license file in the root directory of this project.
4
5#ifndef WPINET_UV_STREAM_H_
6#define WPINET_UV_STREAM_H_
7
8#include <uv.h>
9
10#include <cstdlib>
11#include <functional>
12#include <initializer_list>
13#include <memory>
14#include <span>
15#include <utility>
16
17#include <wpi/Signal.h>
18
19#include "wpinet/uv/Buffer.h"
20#include "wpinet/uv/Handle.h"
21#include "wpinet/uv/Request.h"
22
23namespace wpi::uv {
24
25class Stream;
26
27/**
28 * Shutdown request.
29 */
30class ShutdownReq : public RequestImpl<ShutdownReq, uv_shutdown_t> {
31 public:
33
34 Stream& GetStream() const {
35 return *static_cast<Stream*>(GetRaw()->handle->data);
36 }
37
38 /**
39 * Shutdown completed signal.
40 */
42};
43
44/**
45 * Write request.
46 */
47class WriteReq : public RequestImpl<WriteReq, uv_write_t> {
48 public:
50
51 Stream& GetStream() const {
52 return *static_cast<Stream*>(GetRaw()->handle->data);
53 }
54
55 /**
56 * Write completed signal. This is called even if an error occurred.
57 * @param err error value
58 */
60};
61
62/**
63 * Stream handle.
64 * Stream handles provide an abstraction of a duplex communication channel.
65 * This is an abstract type; there are three stream implementations (Tcp,
66 * Pipe, and Tty).
67 */
68class Stream : public Handle {
69 public:
70 std::shared_ptr<Stream> shared_from_this() {
71 return std::static_pointer_cast<Stream>(Handle::shared_from_this());
72 }
73
74 std::shared_ptr<const Stream> shared_from_this() const {
75 return std::static_pointer_cast<const Stream>(Handle::shared_from_this());
76 }
77
78 /**
79 * Shutdown the outgoing (write) side of a duplex stream. It waits for pending
80 * write requests to complete. HandleShutdownComplete() is called on the
81 * request after shutdown is complete.
82 *
83 * @param req shutdown request
84 */
85 void Shutdown(const std::shared_ptr<ShutdownReq>& req);
86
87 /**
88 * Shutdown the outgoing (write) side of a duplex stream. It waits for pending
89 * write requests to complete. The callback is called after shutdown is
90 * complete. Errors will be reported to the stream error handler.
91 *
92 * @param callback Callback function to call when shutdown completes
93 * @return Connection object for the callback
94 */
95 void Shutdown(std::function<void()> callback = nullptr);
96
97 /**
98 * Start reading data from an incoming stream.
99 *
100 * This will only succeed after a connection has been established.
101 *
102 * A data signal will be emitted several times until there is no more
103 * data to read or `StopRead()` is called.
104 * An end signal will be emitted when there is no more data to read.
105 */
106 void StartRead();
107
108 /**
109 * Stop reading data from the stream.
110 *
111 * This function is idempotent and may be safely called on a stopped stream.
112 */
114
115 /**
116 * Write data to the stream.
117 *
118 * Data are written in order. The lifetime of the data pointers passed in
119 * the `bufs` parameter must exceed the lifetime of the write request.
120 * An easy way to ensure this is to have the write request keep track of
121 * the data and use either its Complete() function or destructor to free the
122 * data.
123 *
124 * The finish signal will be emitted on the request object when the data
125 * has been written (or if an error occurs).
126 * The error signal will be emitted on the request object in case of errors.
127 *
128 * @param bufs The buffers to be written to the stream.
129 * @param req write request
130 */
131 void Write(std::span<const Buffer> bufs,
132 const std::shared_ptr<WriteReq>& req);
133
134 /**
135 * Write data to the stream.
136 *
137 * Data are written in order. The lifetime of the data pointers passed in
138 * the `bufs` parameter must exceed the lifetime of the write request.
139 * An easy way to ensure this is to have the write request keep track of
140 * the data and use either its Complete() function or destructor to free the
141 * data.
142 *
143 * The finish signal will be emitted on the request object when the data
144 * has been written (or if an error occurs).
145 * The error signal will be emitted on the request object in case of errors.
146 *
147 * @param bufs The buffers to be written to the stream.
148 * @param req write request
149 */
150 void Write(std::initializer_list<Buffer> bufs,
151 const std::shared_ptr<WriteReq>& req) {
152 Write({bufs.begin(), bufs.end()}, req);
153 }
154
155 /**
156 * Write data to the stream.
157 *
158 * Data are written in order. The lifetime of the data pointers passed in
159 * the `bufs` parameter must exceed the lifetime of the write request.
160 * The callback can be used to free data after the request completes.
161 *
162 * The callback will be called when the data has been written (even if an
163 * error occurred). Errors will be reported to the stream error handler.
164 *
165 * @param bufs The buffers to be written to the stream.
166 * @param callback Callback function to call when the write completes
167 */
168 void Write(std::span<const Buffer> bufs,
169 std::function<void(std::span<Buffer>, Error)> callback);
170
171 /**
172 * Write data to the stream.
173 *
174 * Data are written in order. The lifetime of the data pointers passed in
175 * the `bufs` parameter must exceed the lifetime of the write request.
176 * The callback can be used to free data after the request completes.
177 *
178 * The callback will be called when the data has been written (even if an
179 * error occurred). Errors will be reported to the stream error handler.
180 *
181 * @param bufs The buffers to be written to the stream.
182 * @param callback Callback function to call when the write completes
183 */
184 void Write(std::initializer_list<Buffer> bufs,
185 std::function<void(std::span<Buffer>, Error)> callback) {
186 Write({bufs.begin(), bufs.end()}, std::move(callback));
187 }
188
189 /**
190 * Queue a write request if it can be completed immediately.
191 *
192 * Same as `Write()`, but won’t queue a write request if it can’t be
193 * completed immediately.
194 * An error signal will be emitted in case of errors.
195 *
196 * @param bufs The buffers to be written to the stream.
197 * @return Number of bytes written.
198 */
199 int TryWrite(std::span<const Buffer> bufs);
200
201 /**
202 * Queue a write request if it can be completed immediately.
203 *
204 * Same as `Write()`, but won’t queue a write request if it can’t be
205 * completed immediately.
206 * An error signal will be emitted in case of errors.
207 *
208 * @param bufs The buffers to be written to the stream.
209 * @return Number of bytes written.
210 */
211 int TryWrite(std::initializer_list<Buffer> bufs) {
212 return TryWrite({bufs.begin(), bufs.end()});
213 }
214
215 /**
216 * Same as TryWrite() and extended write function for sending handles over a
217 * pipe.
218 *
219 * Try to send a handle is not supported on Windows, where it returns
220 * UV_EAGAIN.
221 *
222 * @param bufs The buffers to be written to the stream.
223 * @param send send stream
224 * @return Number of bytes written.
225 */
226 int TryWrite2(std::span<const Buffer> bufs, Stream& send);
227
228 /**
229 * Same as TryWrite() and extended write function for sending handles over a
230 * pipe.
231 *
232 * Try to send a handle is not supported on Windows, where it returns
233 * UV_EAGAIN.
234 *
235 * @param bufs The buffers to be written to the stream.
236 * @param send send stream
237 * @return Number of bytes written.
238 */
239 int TryWrite2(std::initializer_list<Buffer> bufs, Stream& send) {
240 return TryWrite2({bufs.begin(), bufs.end()}, send);
241 }
242
243 /**
244 * Check if the stream is readable.
245 * @return True if the stream is readable, false otherwise.
246 */
247 bool IsReadable() const noexcept {
248 return uv_is_readable(GetRawStream()) == 1;
249 }
250
251 /**
252 * @brief Checks if the stream is writable.
253 * @return True if the stream is writable, false otherwise.
254 */
255 bool IsWritable() const noexcept {
256 return uv_is_writable(GetRawStream()) == 1;
257 }
258
259 /**
260 * Enable or disable blocking mode for a stream.
261 *
262 * When blocking mode is enabled all writes complete synchronously. The
263 * interface remains unchanged otherwise, e.g. completion or failure of the
264 * operation will still be reported through events which are emitted
265 * asynchronously.
266 *
267 * @param enable True to enable blocking mode, false otherwise.
268 * @return True in case of success, false otherwise.
269 */
270 bool SetBlocking(bool enable) noexcept {
271 return uv_stream_set_blocking(GetRawStream(), enable) == 0;
272 }
273
274 /**
275 * Gets the amount of queued bytes waiting to be sent.
276 * @return Amount of queued bytes waiting to be sent.
277 */
278 size_t GetWriteQueueSize() const noexcept {
279 return GetRawStream()->write_queue_size;
280 }
281
282 /**
283 * Get the underlying stream data structure.
284 *
285 * @return The underlying stream data structure.
286 */
287 uv_stream_t* GetRawStream() const noexcept {
288 return reinterpret_cast<uv_stream_t*>(GetRawHandle());
289 }
290
291 /**
292 * Signal generated when data was read on a stream.
293 */
295
296 /**
297 * Signal generated when no more read data is available.
298 */
300
301 protected:
302 explicit Stream(uv_stream_t* uv_stream)
303 : Handle{reinterpret_cast<uv_handle_t*>(uv_stream)} {}
304};
305
306template <typename T, typename U>
307class StreamImpl : public Stream {
308 public:
309 std::shared_ptr<T> shared_from_this() {
310 return std::static_pointer_cast<T>(Handle::shared_from_this());
311 }
312
313 std::shared_ptr<const T> shared_from_this() const {
314 return std::static_pointer_cast<const T>(Handle::shared_from_this());
315 }
316
317 /**
318 * Get the underlying handle data structure.
319 *
320 * @return The underlying handle data structure.
321 */
322 U* GetRaw() const noexcept {
323 return reinterpret_cast<U*>(this->GetRawHandle());
324 }
325
326 protected:
327 StreamImpl() : Stream{static_cast<uv_stream_t*>(std::malloc(sizeof(U)))} {}
328};
329
330} // namespace wpi::uv
331
332#endif // WPINET_UV_STREAM_H_
SignalBase is an implementation of the observer pattern, through the use of an emitting object and sl...
Definition: Signal.h:495
Error code.
Definition: Error.h:15
Handle.
Definition: Handle.h:30
uv_handle_t * GetRawHandle() const noexcept
Get the underlying handle data structure.
Definition: Handle.h:176
bool Invoke(F &&f, Args &&... args) const
Definition: Handle.h:251
Request.
Definition: Request.h:130
uv_shutdown_t * GetRaw() noexcept
Get the underlying request data structure.
Definition: Request.h:145
Shutdown request.
Definition: Stream.h:30
sig::Signal complete
Shutdown completed signal.
Definition: Stream.h:41
Stream & GetStream() const
Definition: Stream.h:34
Stream handle.
Definition: Stream.h:68
std::shared_ptr< Stream > shared_from_this()
Definition: Stream.h:70
void Write(std::span< const Buffer > bufs, std::function< void(std::span< Buffer >, Error)> callback)
Write data to the stream.
sig::Signal end
Signal generated when no more read data is available.
Definition: Stream.h:299
bool IsReadable() const noexcept
Check if the stream is readable.
Definition: Stream.h:247
void StopRead()
Stop reading data from the stream.
Definition: Stream.h:113
void Write(std::span< const Buffer > bufs, const std::shared_ptr< WriteReq > &req)
Write data to the stream.
void Shutdown(std::function< void()> callback=nullptr)
Shutdown the outgoing (write) side of a duplex stream.
bool SetBlocking(bool enable) noexcept
Enable or disable blocking mode for a stream.
Definition: Stream.h:270
Stream(uv_stream_t *uv_stream)
Definition: Stream.h:302
void Write(std::initializer_list< Buffer > bufs, const std::shared_ptr< WriteReq > &req)
Write data to the stream.
Definition: Stream.h:150
int TryWrite(std::initializer_list< Buffer > bufs)
Queue a write request if it can be completed immediately.
Definition: Stream.h:211
size_t GetWriteQueueSize() const noexcept
Gets the amount of queued bytes waiting to be sent.
Definition: Stream.h:278
int TryWrite2(std::initializer_list< Buffer > bufs, Stream &send)
Same as TryWrite() and extended write function for sending handles over a pipe.
Definition: Stream.h:239
void StartRead()
Start reading data from an incoming stream.
int TryWrite2(std::span< const Buffer > bufs, Stream &send)
Same as TryWrite() and extended write function for sending handles over a pipe.
int TryWrite(std::span< const Buffer > bufs)
Queue a write request if it can be completed immediately.
void Write(std::initializer_list< Buffer > bufs, std::function< void(std::span< Buffer >, Error)> callback)
Write data to the stream.
Definition: Stream.h:184
bool IsWritable() const noexcept
Checks if the stream is writable.
Definition: Stream.h:255
uv_stream_t * GetRawStream() const noexcept
Get the underlying stream data structure.
Definition: Stream.h:287
std::shared_ptr< const Stream > shared_from_this() const
Definition: Stream.h:74
void Shutdown(const std::shared_ptr< ShutdownReq > &req)
Shutdown the outgoing (write) side of a duplex stream.
sig::Signal< Buffer &, size_t > data
Signal generated when data was read on a stream.
Definition: Stream.h:294
Definition: Stream.h:307
std::shared_ptr< const T > shared_from_this() const
Definition: Stream.h:313
std::shared_ptr< T > shared_from_this()
Definition: Stream.h:309
StreamImpl()
Definition: Stream.h:327
U * GetRaw() const noexcept
Get the underlying handle data structure.
Definition: Stream.h:322
Write request.
Definition: Stream.h:47
Stream & GetStream() const
Definition: Stream.h:51
sig::Signal< Error > finish
Write completed signal.
Definition: Stream.h:59
Definition: StdDeque.h:50
Definition: ParallelTcpConnector.h:22
Definition: uv.h:441
UV_REQ_FIELDS uv_stream_t * handle
Definition: uv.h:419
Definition: uv.h:497
uv_stream_t * handle
Definition: uv.h:536
UV_EXTERN int uv_read_stop(uv_stream_t *)
UV_EXTERN int uv_stream_set_blocking(uv_stream_t *handle, int blocking)
UV_EXTERN int uv_is_writable(const uv_stream_t *handle)
UV_EXTERN int uv_is_readable(const uv_stream_t *handle)