WPILibC++ 2023.4.3-108-ge5452e3
AsyncFunction.h
Go to the documentation of this file.
1// Copyright (c) FIRST and other WPILib contributors.
2// Open Source Software; you can modify and/or share it under the terms of
3// the WPILib BSD license file in the root directory of this project.
4
5#ifndef WPINET_UV_ASYNCFUNCTION_H_
6#define WPINET_UV_ASYNCFUNCTION_H_
7
8#include <stdint.h>
9#include <uv.h>
10
11#include <concepts>
12#include <functional>
13#include <memory>
14#include <thread>
15#include <tuple>
16#include <utility>
17#include <vector>
18
19#include <wpi/future.h>
20#include <wpi/mutex.h>
21
22#include "wpinet/uv/Handle.h"
23#include "wpinet/uv/Loop.h"
24
25namespace wpi::uv {
26
27template <typename T>
29
30/**
31 * Function async handle.
32 * Async handles allow the user to "wakeup" the event loop and have a function
33 * called from another thread that returns a result to the calling thread.
34 */
35template <typename R, typename... T>
36class AsyncFunction<R(T...)> final
37 : public HandleImpl<AsyncFunction<R(T...)>, uv_async_t> {
38 struct private_init {};
39
40 public:
41 AsyncFunction(const std::shared_ptr<Loop>& loop,
42 std::function<void(promise<R>, T...)> func, const private_init&)
43 : wakeup{std::move(func)}, m_loop{loop} {}
44 ~AsyncFunction() noexcept override {
45 if (auto loop = m_loop.lock()) {
46 this->Close();
47 } else {
48 this->ForceClosed();
49 }
50 }
51
52 /**
53 * Create an async handle.
54 *
55 * @param loop Loop object where this handle runs.
56 * @param func wakeup function to be called (sets wakeup value); the function
57 * needs to return void, and its first parameter is the promise
58 * for the result. If no value is set on the promise by the
59 * wakeup function, a default-constructed value is "returned".
60 */
61 static std::shared_ptr<AsyncFunction> Create(
62 Loop& loop, std::function<void(promise<R>, T...)> func = nullptr) {
63 return Create(loop.shared_from_this(), std::move(func));
64 }
65
66 /**
67 * Create an async handle.
68 *
69 * @param loop Loop object where this handle runs.
70 * @param func wakeup function to be called (sets wakeup value); the function
71 * needs to return void, and its first parameter is the promise
72 * for the result. If no value is set on the promise by the
73 * wakeup function, a default-constructed value is "returned".
74 */
75 static std::shared_ptr<AsyncFunction> Create(
76 const std::shared_ptr<Loop>& loop,
77 std::function<void(promise<R>, T...)> func = nullptr) {
78 if (loop->IsClosing()) {
79 return nullptr;
80 }
81 auto h =
82 std::make_shared<AsyncFunction>(loop, std::move(func), private_init{});
83 int err =
84 uv_async_init(loop->GetRaw(), h->GetRaw(), [](uv_async_t* handle) {
85 auto& h = *static_cast<AsyncFunction*>(handle->data);
86 std::unique_lock lock(h.m_mutex);
87
88 if (!h.m_params.empty()) {
89 // for each set of parameters in the input queue, call the wakeup
90 // function and put the result in the output queue if the caller is
91 // waiting for it
92 for (auto&& v : h.m_params) {
93 auto p = h.m_promises.CreatePromise(v.first);
94 if (h.wakeup) {
95 std::apply(h.wakeup,
96 std::tuple_cat(std::make_tuple(std::move(p)),
97 std::move(v.second)));
98 }
99 }
100 h.m_params.clear();
101 // wake up any threads that might be waiting for the result
102 lock.unlock();
103 h.m_promises.Notify();
104 }
105 });
106 if (err < 0) {
107 loop->ReportError(err);
108 return nullptr;
109 }
110 h->Keep();
111 return h;
112 }
113
114 /**
115 * Wakeup the event loop, call the async function, and return a future for
116 * the result.
117 *
118 * It’s safe to call this function from any thread including the loop thread.
119 * The async function will be called on the loop thread.
120 *
121 * The future will return a default-constructed result if this handle is
122 * destroyed while waiting for a result.
123 */
124 template <typename... U>
125 future<R> Call(U&&... u) {
126 // create the future
127 uint64_t req = m_promises.CreateRequest();
128
129 auto loop = m_loop.lock();
130 if (loop->IsClosing()) {
131 if constexpr (std::same_as<R, void>) {
132 return m_promises.MakeReadyFuture();
133 } else {
134 return m_promises.MakeReadyFuture({});
135 }
136 }
137 if (loop && loop->GetThreadId() == std::this_thread::get_id()) {
138 // called from within the loop, just call the function directly
139 wakeup(m_promises.CreatePromise(req), std::forward<U>(u)...);
140 return m_promises.CreateFuture(req);
141 }
142
143 // add the parameters to the input queue
144 {
145 std::scoped_lock lock(m_mutex);
146 m_params.emplace_back(std::piecewise_construct,
147 std::forward_as_tuple(req),
148 std::forward_as_tuple(std::forward<U>(u)...));
149 }
150
151 // signal the loop
152 if (loop) {
153 this->Invoke(&uv_async_send, this->GetRaw());
154 }
155
156 // return future
157 return m_promises.CreateFuture(req);
158 }
159
160 template <typename... U>
162 return Call(std::forward<U>(u)...);
163 }
164
165 /**
166 * Function called (on event loop thread) when the async is called.
167 */
168 std::function<void(promise<R>, T...)> wakeup;
169
170 private:
171 wpi::mutex m_mutex;
172 std::vector<std::pair<uint64_t, std::tuple<T...>>> m_params;
173 PromiseFactory<R> m_promises;
174 std::weak_ptr<Loop> m_loop;
175};
176
177} // namespace wpi::uv
178
179#endif // WPINET_UV_ASYNCFUNCTION_H_
A lightweight version of std::future.
Definition: future.h:271
A lightweight version of std::promise.
Definition: future.h:528
future< R > operator()(U &&... u)
Definition: AsyncFunction.h:161
static std::shared_ptr< AsyncFunction > Create(const std::shared_ptr< Loop > &loop, std::function< void(promise< R >, T...)> func=nullptr)
Create an async handle.
Definition: AsyncFunction.h:75
static std::shared_ptr< AsyncFunction > Create(Loop &loop, std::function< void(promise< R >, T...)> func=nullptr)
Create an async handle.
Definition: AsyncFunction.h:61
std::function< void(promise< R >, T...)> wakeup
Function called (on event loop thread) when the async is called.
Definition: AsyncFunction.h:168
AsyncFunction(const std::shared_ptr< Loop > &loop, std::function< void(promise< R >, T...)> func, const private_init &)
Definition: AsyncFunction.h:41
~AsyncFunction() noexcept override
Definition: AsyncFunction.h:44
future< R > Call(U &&... u)
Wakeup the event loop, call the async function, and return a future for the result.
Definition: AsyncFunction.h:125
Definition: AsyncFunction.h:28
Handle.
Definition: Handle.h:273
Event loop.
Definition: Loop.h:37
std::vector< uint8_t > GetRaw(NT_Handle subentry, std::span< const uint8_t > defaultValue)
Get the last published value.
::uint64_t uint64_t
Definition: Meta.h:58
Definition: BFloat16.h:88
static constexpr const unit_t< compound_unit< energy::joules, inverse< temperature::kelvin >, inverse< substance::moles > > > R(8.3144598)
Gas constant.
static constexpr const unit_t< compound_unit< energy::joule, time::seconds > > h(6.626070040e-34)
Planck constant.
Definition: Buffer.h:18
::std::mutex mutex
Definition: mutex.h:17
Definition: uv.h:852
UV_EXTERN int uv_async_init(uv_loop_t *, uv_async_t *async, uv_async_cb async_cb)
UV_EXTERN int uv_async_send(uv_async_t *async)