WPILibC++  2020.3.2-60-g3011ebe
WorkerThread.h
1 /*----------------------------------------------------------------------------*/
2 /* Copyright (c) 2018-2019 FIRST. All Rights Reserved. */
3 /* Open Source Software - may be modified and shared by FRC teams. The code */
4 /* must be accompanied by the FIRST BSD license file in the root directory of */
5 /* the project. */
6 /*----------------------------------------------------------------------------*/
7 
8 #ifndef WPIUTIL_WPI_WORKERTHREAD_H_
9 #define WPIUTIL_WPI_WORKERTHREAD_H_
10 
11 #include <functional>
12 #include <memory>
13 #include <tuple>
14 #include <utility>
15 #include <vector>
16 
17 #include "wpi/SafeThread.h"
18 #include "wpi/future.h"
19 #include "wpi/uv/Async.h"
20 
21 namespace wpi {
22 
23 namespace detail {
24 
25 template <typename R>
27  using AfterWorkFunction = std::function<void(R)>;
28 
29  ~WorkerThreadAsync() { UnsetLoop(); }
30 
31  void SetLoop(uv::Loop& loop) {
32  auto async = uv::Async<AfterWorkFunction, R>::Create(loop);
33  async->wakeup.connect(
34  [](AfterWorkFunction func, R result) { func(result); });
35  m_async = async;
36  }
37 
38  void UnsetLoop() {
39  if (auto async = m_async.lock()) {
40  async->Close();
41  m_async.reset();
42  }
43  }
44 
45  std::weak_ptr<uv::Async<AfterWorkFunction, R>> m_async;
46 };
47 
48 template <>
49 struct WorkerThreadAsync<void> {
50  using AfterWorkFunction = std::function<void()>;
51 
52  ~WorkerThreadAsync() { RemoveLoop(); }
53 
54  void SetLoop(uv::Loop& loop) {
55  auto async = uv::Async<AfterWorkFunction>::Create(loop);
56  async->wakeup.connect([](AfterWorkFunction func) { func(); });
57  m_async = async;
58  }
59 
60  void RemoveLoop() {
61  if (auto async = m_async.lock()) {
62  async->Close();
63  m_async.reset();
64  }
65  }
66 
67  std::weak_ptr<uv::Async<AfterWorkFunction>> m_async;
68 };
69 
70 template <typename R, typename... T>
72  using WorkFunction = std::function<R(T...)>;
73  using AfterWorkFunction = typename WorkerThreadAsync<R>::AfterWorkFunction;
74 
75  WorkerThreadRequest() = default;
76  WorkerThreadRequest(uint64_t promiseId_, WorkFunction work_,
77  std::tuple<T...> params_)
78  : promiseId(promiseId_),
79  work(std::move(work_)),
80  params(std::move(params_)) {}
81  WorkerThreadRequest(WorkFunction work_, AfterWorkFunction afterWork_,
82  std::tuple<T...> params_)
83  : promiseId(0),
84  work(std::move(work_)),
85  afterWork(std::move(afterWork_)),
86  params(std::move(params_)) {}
87 
88  uint64_t promiseId;
89  WorkFunction work;
90  AfterWorkFunction afterWork;
91  std::tuple<T...> params;
92 };
93 
94 template <typename R, typename... T>
96  public:
97  using Request = WorkerThreadRequest<R, T...>;
98 
99  void Main() override;
100 
101  std::vector<Request> m_requests;
102  PromiseFactory<R> m_promises;
104 };
105 
106 template <typename R, typename... T>
107 void RunWorkerThreadRequest(WorkerThreadThread<R, T...>& thr,
109  R result = std::apply(req.work, std::move(req.params));
110  if (req.afterWork) {
111  if (auto async = thr.m_async.m_async.lock())
112  async->Send(std::move(req.afterWork), std::move(result));
113  } else {
114  thr.m_promises.SetValue(req.promiseId, std::move(result));
115  }
116 }
117 
118 template <typename... T>
119 void RunWorkerThreadRequest(WorkerThreadThread<void, T...>& thr,
120  WorkerThreadRequest<void, T...>& req) {
121  std::apply(req.work, req.params);
122  if (req.afterWork) {
123  if (auto async = thr.m_async.m_async.lock())
124  async->Send(std::move(req.afterWork));
125  } else {
126  thr.m_promises.SetValue(req.promiseId);
127  }
128 }
129 
130 template <typename R, typename... T>
131 void WorkerThreadThread<R, T...>::Main() {
132  std::vector<Request> requests;
133  while (m_active) {
134  std::unique_lock lock(m_mutex);
135  m_cond.wait(lock, [&] { return !m_active || !m_requests.empty(); });
136  if (!m_active) break;
137 
138  // don't want to hold the lock while executing the callbacks
139  requests.swap(m_requests);
140  lock.unlock();
141 
142  for (auto&& req : requests) {
143  if (!m_active) break; // requests may be long-running
144  RunWorkerThreadRequest(*this, req);
145  }
146  requests.clear();
147  m_promises.Notify();
148  }
149 }
150 
151 } // namespace detail
152 
153 template <typename T>
155 
156 template <typename R, typename... T>
157 class WorkerThread<R(T...)> final {
158  using Thread = detail::WorkerThreadThread<R, T...>;
159 
160  public:
161  using WorkFunction = std::function<R(T...)>;
162  using AfterWorkFunction =
163  typename detail::WorkerThreadAsync<R>::AfterWorkFunction;
164 
165  WorkerThread() { m_owner.Start(); }
166 
174  void SetLoop(uv::Loop& loop) {
175  if (auto thr = m_owner.GetThread()) thr->m_async.SetLoop(loop);
176  }
177 
185  void SetLoop(std::shared_ptr<uv::Loop> loop) { SetLoop(*loop); }
186 
191  void UnsetLoop() {
192  if (auto thr = m_owner.GetThread()) thr->m_async.UnsetLoop();
193  }
194 
202  std::shared_ptr<uv::Handle> GetHandle() const {
203  if (auto thr = m_owner.GetThread())
204  return thr->m_async.m_async.lock();
205  else
206  return nullptr;
207  }
208 
221  template <typename... U>
222  future<R> QueueWork(WorkFunction work, U&&... u) {
223  if (auto thr = m_owner.GetThread()) {
224  // create the future
225  uint64_t req = thr->m_promises.CreateRequest();
226 
227  // add the parameters to the input queue
228  thr->m_requests.emplace_back(
229  req, std::move(work), std::forward_as_tuple(std::forward<U>(u)...));
230 
231  // signal the thread
232  thr->m_cond.notify_one();
233 
234  // return future
235  return thr->m_promises.CreateFuture(req);
236  }
237 
238  // XXX: is this the right thing to do?
239  return future<R>();
240  }
241 
256  template <typename... U>
257  void QueueWorkThen(WorkFunction work, AfterWorkFunction afterWork, U&&... u) {
258  if (auto thr = m_owner.GetThread()) {
259  // add the parameters to the input queue
260  thr->m_requests.emplace_back(
261  std::move(work), std::move(afterWork),
262  std::forward_as_tuple(std::forward<U>(u)...));
263 
264  // signal the thread
265  thr->m_cond.notify_one();
266  }
267  }
268 
269  private:
270  SafeThreadOwner<Thread> m_owner;
271 };
272 
273 } // namespace wpi
274 
275 #endif // WPIUTIL_WPI_WORKERTHREAD_H_
wpi::WorkerThread
Definition: WorkerThread.h:154
wpi::detail::WorkerThreadRequest
Definition: WorkerThread.h:71
wpi::SafeThreadOwner< Thread >
wpi::uv::Async::Create
static std::shared_ptr< Async > Create(Loop &loop)
Create an async handle.
Definition: Async.h:57
wpi::SafeThread
Definition: SafeThread.h:22
wpi::WorkerThread< R(T...)>::QueueWork
future< R > QueueWork(WorkFunction work, U &&... u)
Wakeup the worker thread, call the work function, and return a future for the result.
Definition: WorkerThread.h:222
wpi::PromiseFactory< R >
wpi::WorkerThread< R(T...)>::SetLoop
void SetLoop(uv::Loop &loop)
Set the loop.
Definition: WorkerThread.h:174
wpi::PromiseFactory::SetValue
void SetValue(uint64_t request, const T &value)
Sets a value directly for a future without creating a promise object.
Definition: future.h:711
wpi
WPILib C++ utilities (wpiutil) namespace.
Definition: Endian.h:31
wpi::future
A lightweight version of std::future.
Definition: future.h:30
wpi::detail::WorkerThreadThread
Definition: WorkerThread.h:95
wpi::detail::WorkerThreadAsync
Definition: WorkerThread.h:26
wpi::WorkerThread< R(T...)>::QueueWorkThen
void QueueWorkThen(WorkFunction work, AfterWorkFunction afterWork, U &&... u)
Wakeup the worker thread, call the work function, and call the afterWork function with the result on ...
Definition: WorkerThread.h:257
wpi::WorkerThread< R(T...)>::UnsetLoop
void UnsetLoop()
Unset the loop.
Definition: WorkerThread.h:191
wpi::WorkerThread< R(T...)>::SetLoop
void SetLoop(std::shared_ptr< uv::Loop > loop)
Set the loop.
Definition: WorkerThread.h:185
wpi::uv::Loop
Event loop.
Definition: Loop.h:39
wpi::WorkerThread< R(T...)>::GetHandle
std::shared_ptr< uv::Handle > GetHandle() const
Get the handle used by QueueWorkThen() to run afterWork.
Definition: WorkerThread.h:202