8 #ifndef WPIUTIL_WPI_WORKERTHREAD_H_
9 #define WPIUTIL_WPI_WORKERTHREAD_H_
17 #include "wpi/STLExtras.h"
18 #include "wpi/SafeThread.h"
19 #include "wpi/future.h"
20 #include "wpi/uv/Async.h"
28 using AfterWorkFunction = std::function<void(R)>;
34 async->wakeup.connect(
35 [](AfterWorkFunction func, R result) { func(result); });
40 if (
auto async = m_async.lock()) {
46 std::weak_ptr<uv::Async<AfterWorkFunction, R>> m_async;
51 using AfterWorkFunction = std::function<void()>;
57 async->wakeup.connect([](AfterWorkFunction func) { func(); });
62 if (
auto async = m_async.lock()) {
68 std::weak_ptr<uv::Async<AfterWorkFunction>> m_async;
71 template <
typename R,
typename... T>
73 using WorkFunction = std::function<R(T...)>;
74 using AfterWorkFunction =
typename WorkerThreadAsync<R>::AfterWorkFunction;
78 std::tuple<T...> params_)
79 : promiseId(promiseId_),
80 work(std::move(work_)),
81 params(std::move(params_)) {}
83 std::tuple<T...> params_)
85 work(std::move(work_)),
86 afterWork(std::move(afterWork_)),
87 params(std::move(params_)) {}
91 AfterWorkFunction afterWork;
92 std::tuple<T...> params;
95 template <
typename R,
typename... T>
100 void Main()
override;
102 std::vector<Request> m_requests;
107 template <
typename R,
typename... T>
110 R result =
apply_tuple(req.work, std::move(req.params));
112 if (
auto async = thr.m_async.m_async.lock())
113 async->Send(std::move(req.afterWork), std::move(result));
115 thr.m_promises.
SetValue(req.promiseId, std::move(result));
119 template <
typename... T>
120 void RunWorkerThreadRequest(WorkerThreadThread<void, T...>& thr,
121 WorkerThreadRequest<void, T...>& req) {
124 if (
auto async = thr.m_async.m_async.lock())
125 async->Send(std::move(req.afterWork));
127 thr.m_promises.SetValue(req.promiseId);
131 template <
typename R,
typename... T>
132 void WorkerThreadThread<R, T...>::Main() {
133 std::vector<Request> requests;
135 std::unique_lock<wpi::mutex> lock(m_mutex);
136 m_cond.wait(lock, [&] {
return !m_active || !m_requests.empty(); });
137 if (!m_active)
break;
140 requests.swap(m_requests);
143 for (
auto&& req : requests) {
144 if (!m_active)
break;
145 RunWorkerThreadRequest(*
this, req);
154 template <
typename T>
157 template <
typename R,
typename... T>
162 using WorkFunction = std::function<R(T...)>;
163 using AfterWorkFunction =
164 typename detail::WorkerThreadAsync<R>::AfterWorkFunction;
176 if (
auto thr = m_owner.GetThread()) thr->m_async.SetLoop(loop);
193 if (
auto thr = m_owner.GetThread()) thr->m_async.UnsetLoop();
204 if (
auto thr = m_owner.GetThread())
205 return thr->m_async.m_async.lock();
222 template <
typename... U>
224 if (
auto thr = m_owner.GetThread()) {
226 uint64_t req = thr->m_promises.CreateRequest();
229 thr->m_requests.emplace_back(
230 req, std::move(work), std::forward_as_tuple(std::forward<U>(u)...));
233 thr->m_cond.notify_one();
236 return thr->m_promises.CreateFuture(req);
257 template <
typename... U>
258 void QueueWorkThen(WorkFunction work, AfterWorkFunction afterWork, U&&... u) {
259 if (
auto thr = m_owner.GetThread()) {
261 thr->m_requests.emplace_back(
262 std::move(work), std::move(afterWork),
263 std::forward_as_tuple(std::forward<U>(u)...));
266 thr->m_cond.notify_one();
276 #endif // WPIUTIL_WPI_WORKERTHREAD_H_
Definition: SafeThread.h:22
WPILib C++ utilities (wpiutil) namespace.
Definition: SmallString.h:21
void UnsetLoop()
Unset the loop.
Definition: WorkerThread.h:192
void SetLoop(std::shared_ptr< uv::Loop > loop)
Set the loop.
Definition: WorkerThread.h:186
static std::shared_ptr< Async > Create(Loop &loop)
Create an async handle.
Definition: Async.h:58
Definition: WorkerThread.h:96
void SetValue(uint64_t request, const T &value)
Sets a value directly for a future without creating a promise object.
Definition: future.h:711
auto apply_tuple(F &&f, Tuple &&t) -> decltype(detail::apply_tuple_impl(std::forward< F >(f), std::forward< Tuple >(t), build_index_impl< std::tuple_size< typename std::decay< Tuple >::type >::value >
Given an input tuple (a1, a2, ..., an), pass the arguments of the tuple variadically to f as if by ca...
Definition: STLExtras.h:1209
std::shared_ptr< uv::Handle > GetHandle() const
Get the handle used by QueueWorkThen() to run afterWork.
Definition: WorkerThread.h:203
future< R > QueueWork(WorkFunction work, U &&...u)
Wakeup the worker thread, call the work function, and return a future for the result.
Definition: WorkerThread.h:223
Definition: WorkerThread.h:155
Definition: WorkerThread.h:72
Event loop.
Definition: Loop.h:39
void SetLoop(uv::Loop &loop)
Set the loop.
Definition: WorkerThread.h:175
A lightweight version of std::future.
Definition: future.h:30
Definition: WorkerThread.h:27
void QueueWorkThen(WorkFunction work, AfterWorkFunction afterWork, U &&...u)
Wakeup the worker thread, call the work function, and call the afterWork function with the result on ...
Definition: WorkerThread.h:258