WPILibC++ 2023.4.3
AsyncFunction.h
Go to the documentation of this file.
1// Copyright (c) FIRST and other WPILib contributors.
2// Open Source Software; you can modify and/or share it under the terms of
3// the WPILib BSD license file in the root directory of this project.
4
5#ifndef WPINET_UV_ASYNCFUNCTION_H_
6#define WPINET_UV_ASYNCFUNCTION_H_
7
8#include <stdint.h>
9#include <uv.h>
10
11#include <functional>
12#include <memory>
13#include <thread>
14#include <tuple>
15#include <utility>
16#include <vector>
17
18#include <wpi/future.h>
19#include <wpi/mutex.h>
20
21#include "wpinet/uv/Handle.h"
22#include "wpinet/uv/Loop.h"
23
24namespace wpi::uv {
25
26template <typename T>
28
29/**
30 * Function async handle.
31 * Async handles allow the user to "wakeup" the event loop and have a function
32 * called from another thread that returns a result to the calling thread.
33 */
34template <typename R, typename... T>
35class AsyncFunction<R(T...)> final
36 : public HandleImpl<AsyncFunction<R(T...)>, uv_async_t> {
37 struct private_init {};
38
39 public:
40 AsyncFunction(const std::shared_ptr<Loop>& loop,
41 std::function<void(promise<R>, T...)> func, const private_init&)
42 : wakeup{std::move(func)}, m_loop{loop} {}
43 ~AsyncFunction() noexcept override {
44 if (auto loop = m_loop.lock()) {
45 this->Close();
46 } else {
47 this->ForceClosed();
48 }
49 }
50
51 /**
52 * Create an async handle.
53 *
54 * @param loop Loop object where this handle runs.
55 * @param func wakeup function to be called (sets wakeup value); the function
56 * needs to return void, and its first parameter is the promise
57 * for the result. If no value is set on the promise by the
58 * wakeup function, a default-constructed value is "returned".
59 */
60 static std::shared_ptr<AsyncFunction> Create(
61 Loop& loop, std::function<void(promise<R>, T...)> func = nullptr) {
62 return Create(loop.shared_from_this(), std::move(func));
63 }
64
65 /**
66 * Create an async handle.
67 *
68 * @param loop Loop object where this handle runs.
69 * @param func wakeup function to be called (sets wakeup value); the function
70 * needs to return void, and its first parameter is the promise
71 * for the result. If no value is set on the promise by the
72 * wakeup function, a default-constructed value is "returned".
73 */
74 static std::shared_ptr<AsyncFunction> Create(
75 const std::shared_ptr<Loop>& loop,
76 std::function<void(promise<R>, T...)> func = nullptr) {
77 if (loop->IsClosing()) {
78 return nullptr;
79 }
80 auto h =
81 std::make_shared<AsyncFunction>(loop, std::move(func), private_init{});
82 int err =
83 uv_async_init(loop->GetRaw(), h->GetRaw(), [](uv_async_t* handle) {
84 auto& h = *static_cast<AsyncFunction*>(handle->data);
85 std::unique_lock lock(h.m_mutex);
86
87 if (!h.m_params.empty()) {
88 // for each set of parameters in the input queue, call the wakeup
89 // function and put the result in the output queue if the caller is
90 // waiting for it
91 for (auto&& v : h.m_params) {
92 auto p = h.m_promises.CreatePromise(v.first);
93 if (h.wakeup) {
94 std::apply(h.wakeup,
95 std::tuple_cat(std::make_tuple(std::move(p)),
96 std::move(v.second)));
97 }
98 }
99 h.m_params.clear();
100 // wake up any threads that might be waiting for the result
101 lock.unlock();
102 h.m_promises.Notify();
103 }
104 });
105 if (err < 0) {
106 loop->ReportError(err);
107 return nullptr;
108 }
109 h->Keep();
110 return h;
111 }
112
113 /**
114 * Wakeup the event loop, call the async function, and return a future for
115 * the result.
116 *
117 * It’s safe to call this function from any thread including the loop thread.
118 * The async function will be called on the loop thread.
119 *
120 * The future will return a default-constructed result if this handle is
121 * destroyed while waiting for a result.
122 */
123 template <typename... U>
124 future<R> Call(U&&... u) {
125 // create the future
126 uint64_t req = m_promises.CreateRequest();
127
128 auto loop = m_loop.lock();
129 if (loop->IsClosing()) {
130 if constexpr (std::is_same_v<R, void>) {
131 return m_promises.MakeReadyFuture();
132 } else {
133 return m_promises.MakeReadyFuture({});
134 }
135 }
136 if (loop && loop->GetThreadId() == std::this_thread::get_id()) {
137 // called from within the loop, just call the function directly
138 wakeup(m_promises.CreatePromise(req), std::forward<U>(u)...);
139 return m_promises.CreateFuture(req);
140 }
141
142 // add the parameters to the input queue
143 {
144 std::scoped_lock lock(m_mutex);
145 m_params.emplace_back(std::piecewise_construct,
146 std::forward_as_tuple(req),
147 std::forward_as_tuple(std::forward<U>(u)...));
148 }
149
150 // signal the loop
151 if (loop) {
152 this->Invoke(&uv_async_send, this->GetRaw());
153 }
154
155 // return future
156 return m_promises.CreateFuture(req);
157 }
158
159 template <typename... U>
161 return Call(std::forward<U>(u)...);
162 }
163
164 /**
165 * Function called (on event loop thread) when the async is called.
166 */
167 std::function<void(promise<R>, T...)> wakeup;
168
169 private:
170 wpi::mutex m_mutex;
171 std::vector<std::pair<uint64_t, std::tuple<T...>>> m_params;
172 PromiseFactory<R> m_promises;
173 std::weak_ptr<Loop> m_loop;
174};
175
176} // namespace wpi::uv
177
178#endif // WPINET_UV_ASYNCFUNCTION_H_
A lightweight version of std::future.
Definition: future.h:271
A lightweight version of std::promise.
Definition: future.h:528
future< R > operator()(U &&... u)
Definition: AsyncFunction.h:160
static std::shared_ptr< AsyncFunction > Create(const std::shared_ptr< Loop > &loop, std::function< void(promise< R >, T...)> func=nullptr)
Create an async handle.
Definition: AsyncFunction.h:74
static std::shared_ptr< AsyncFunction > Create(Loop &loop, std::function< void(promise< R >, T...)> func=nullptr)
Create an async handle.
Definition: AsyncFunction.h:60
std::function< void(promise< R >, T...)> wakeup
Function called (on event loop thread) when the async is called.
Definition: AsyncFunction.h:167
AsyncFunction(const std::shared_ptr< Loop > &loop, std::function< void(promise< R >, T...)> func, const private_init &)
Definition: AsyncFunction.h:40
~AsyncFunction() noexcept override
Definition: AsyncFunction.h:43
future< R > Call(U &&... u)
Wakeup the event loop, call the async function, and return a future for the result.
Definition: AsyncFunction.h:124
Definition: AsyncFunction.h:27
Handle.
Definition: Handle.h:273
Event loop.
Definition: Loop.h:37
std::vector< uint8_t > GetRaw(NT_Handle subentry, std::span< const uint8_t > defaultValue)
Get the last published value.
::uint64_t uint64_t
Definition: Meta.h:58
Definition: StdDeque.h:50
static constexpr const unit_t< compound_unit< energy::joules, inverse< temperature::kelvin >, inverse< substance::moles > > > R(8.3144598)
Gas constant.
static constexpr const unit_t< compound_unit< energy::joule, time::seconds > > h(6.626070040e-34)
Planck constant.
Definition: ParallelTcpConnector.h:22
::std::mutex mutex
Definition: mutex.h:17
Definition: uv.h:852
UV_EXTERN int uv_async_init(uv_loop_t *, uv_async_t *async, uv_async_cb async_cb)
UV_EXTERN int uv_async_send(uv_async_t *async)