WPILibC++  2019.1.1-beta-4-29-g6105873
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Modules Pages
WorkerThread.h
1 /*----------------------------------------------------------------------------*/
2 /* Copyright (c) 2018 FIRST. All Rights Reserved. */
3 /* Open Source Software - may be modified and shared by FRC teams. The code */
4 /* must be accompanied by the FIRST BSD license file in the root directory of */
5 /* the project. */
6 /*----------------------------------------------------------------------------*/
7 
8 #ifndef WPIUTIL_WPI_WORKERTHREAD_H_
9 #define WPIUTIL_WPI_WORKERTHREAD_H_
10 
11 #include <functional>
12 #include <memory>
13 #include <tuple>
14 #include <utility>
15 #include <vector>
16 
17 #include "wpi/STLExtras.h"
18 #include "wpi/SafeThread.h"
19 #include "wpi/future.h"
20 #include "wpi/uv/Async.h"
21 
22 namespace wpi {
23 
24 namespace detail {
25 
26 template <typename R>
28  using AfterWorkFunction = std::function<void(R)>;
29 
30  ~WorkerThreadAsync() { UnsetLoop(); }
31 
32  void SetLoop(uv::Loop& loop) {
33  auto async = uv::Async<AfterWorkFunction, R>::Create(loop);
34  async->wakeup.connect(
35  [](AfterWorkFunction func, R result) { func(result); });
36  m_async = async;
37  }
38 
39  void UnsetLoop() {
40  if (auto async = m_async.lock()) {
41  async->Close();
42  m_async.reset();
43  }
44  }
45 
46  std::weak_ptr<uv::Async<AfterWorkFunction, R>> m_async;
47 };
48 
49 template <>
50 struct WorkerThreadAsync<void> {
51  using AfterWorkFunction = std::function<void()>;
52 
53  ~WorkerThreadAsync() { RemoveLoop(); }
54 
55  void SetLoop(uv::Loop& loop) {
56  auto async = uv::Async<AfterWorkFunction>::Create(loop);
57  async->wakeup.connect([](AfterWorkFunction func) { func(); });
58  m_async = async;
59  }
60 
61  void RemoveLoop() {
62  if (auto async = m_async.lock()) {
63  async->Close();
64  m_async.reset();
65  }
66  }
67 
68  std::weak_ptr<uv::Async<AfterWorkFunction>> m_async;
69 };
70 
71 template <typename R, typename... T>
73  using WorkFunction = std::function<R(T...)>;
74  using AfterWorkFunction = typename WorkerThreadAsync<R>::AfterWorkFunction;
75 
76  WorkerThreadRequest() = default;
77  WorkerThreadRequest(uint64_t promiseId_, WorkFunction work_,
78  std::tuple<T...> params_)
79  : promiseId(promiseId_),
80  work(std::move(work_)),
81  params(std::move(params_)) {}
82  WorkerThreadRequest(WorkFunction work_, AfterWorkFunction afterWork_,
83  std::tuple<T...> params_)
84  : promiseId(0),
85  work(std::move(work_)),
86  afterWork(std::move(afterWork_)),
87  params(std::move(params_)) {}
88 
89  uint64_t promiseId;
90  WorkFunction work;
91  AfterWorkFunction afterWork;
92  std::tuple<T...> params;
93 };
94 
95 template <typename R, typename... T>
97  public:
98  using Request = WorkerThreadRequest<R, T...>;
99 
100  void Main() override;
101 
102  std::vector<Request> m_requests;
103  PromiseFactory<R> m_promises;
105 };
106 
107 template <typename R, typename... T>
108 void RunWorkerThreadRequest(WorkerThreadThread<R, T...>& thr,
110  R result = apply_tuple(req.work, std::move(req.params));
111  if (req.afterWork) {
112  if (auto async = thr.m_async.m_async.lock())
113  async->Send(std::move(req.afterWork), std::move(result));
114  } else {
115  thr.m_promises.SetValue(req.promiseId, std::move(result));
116  }
117 }
118 
119 template <typename... T>
120 void RunWorkerThreadRequest(WorkerThreadThread<void, T...>& thr,
121  WorkerThreadRequest<void, T...>& req) {
122  apply_tuple(req.work, req.params);
123  if (req.afterWork) {
124  if (auto async = thr.m_async.m_async.lock())
125  async->Send(std::move(req.afterWork));
126  } else {
127  thr.m_promises.SetValue(req.promiseId);
128  }
129 }
130 
131 template <typename R, typename... T>
132 void WorkerThreadThread<R, T...>::Main() {
133  std::vector<Request> requests;
134  while (m_active) {
135  std::unique_lock<wpi::mutex> lock(m_mutex);
136  m_cond.wait(lock, [&] { return !m_active || !m_requests.empty(); });
137  if (!m_active) break;
138 
139  // don't want to hold the lock while executing the callbacks
140  requests.swap(m_requests);
141  lock.unlock();
142 
143  for (auto&& req : requests) {
144  if (!m_active) break; // requests may be long-running
145  RunWorkerThreadRequest(*this, req);
146  }
147  requests.clear();
148  m_promises.Notify();
149  }
150 }
151 
152 } // namespace detail
153 
154 template <typename T>
156 
157 template <typename R, typename... T>
158 class WorkerThread<R(T...)> final {
159  using Thread = detail::WorkerThreadThread<R, T...>;
160 
161  public:
162  using WorkFunction = std::function<R(T...)>;
163  using AfterWorkFunction =
164  typename detail::WorkerThreadAsync<R>::AfterWorkFunction;
165 
166  WorkerThread() { m_owner.Start(); }
167 
175  void SetLoop(uv::Loop& loop) {
176  if (auto thr = m_owner.GetThread()) thr->m_async.SetLoop(loop);
177  }
178 
186  void SetLoop(std::shared_ptr<uv::Loop> loop) { SetLoop(*loop); }
187 
192  void UnsetLoop() {
193  if (auto thr = m_owner.GetThread()) thr->m_async.UnsetLoop();
194  }
195 
203  std::shared_ptr<uv::Handle> GetHandle() const {
204  if (auto thr = m_owner.GetThread())
205  return thr->m_async.m_async.lock();
206  else
207  return nullptr;
208  }
209 
222  template <typename... U>
223  future<R> QueueWork(WorkFunction work, U&&... u) {
224  if (auto thr = m_owner.GetThread()) {
225  // create the future
226  uint64_t req = thr->m_promises.CreateRequest();
227 
228  // add the parameters to the input queue
229  thr->m_requests.emplace_back(
230  req, std::move(work), std::forward_as_tuple(std::forward<U>(u)...));
231 
232  // signal the thread
233  thr->m_cond.notify_one();
234 
235  // return future
236  return thr->m_promises.CreateFuture(req);
237  }
238 
239  // XXX: is this the right thing to do?
240  return future<R>();
241  }
242 
257  template <typename... U>
258  void QueueWorkThen(WorkFunction work, AfterWorkFunction afterWork, U&&... u) {
259  if (auto thr = m_owner.GetThread()) {
260  // add the parameters to the input queue
261  thr->m_requests.emplace_back(
262  std::move(work), std::move(afterWork),
263  std::forward_as_tuple(std::forward<U>(u)...));
264 
265  // signal the thread
266  thr->m_cond.notify_one();
267  }
268  }
269 
270  private:
271  SafeThreadOwner<Thread> m_owner;
272 };
273 
274 } // namespace wpi
275 
276 #endif // WPIUTIL_WPI_WORKERTHREAD_H_
Definition: SafeThread.h:22
WPILib C++ utilities (wpiutil) namespace.
Definition: SmallString.h:21
void UnsetLoop()
Unset the loop.
Definition: WorkerThread.h:192
void SetLoop(std::shared_ptr< uv::Loop > loop)
Set the loop.
Definition: WorkerThread.h:186
static std::shared_ptr< Async > Create(Loop &loop)
Create an async handle.
Definition: Async.h:58
Definition: WorkerThread.h:96
void SetValue(uint64_t request, const T &value)
Sets a value directly for a future without creating a promise object.
Definition: future.h:711
auto apply_tuple(F &&f, Tuple &&t) -> decltype(detail::apply_tuple_impl(std::forward< F >(f), std::forward< Tuple >(t), build_index_impl< std::tuple_size< typename std::decay< Tuple >::type >::value >
Given an input tuple (a1, a2, ..., an), pass the arguments of the tuple variadically to f as if by ca...
Definition: STLExtras.h:1209
std::shared_ptr< uv::Handle > GetHandle() const
Get the handle used by QueueWorkThen() to run afterWork.
Definition: WorkerThread.h:203
future< R > QueueWork(WorkFunction work, U &&...u)
Wakeup the worker thread, call the work function, and return a future for the result.
Definition: WorkerThread.h:223
Definition: WorkerThread.h:155
Definition: WorkerThread.h:72
Event loop.
Definition: Loop.h:39
void SetLoop(uv::Loop &loop)
Set the loop.
Definition: WorkerThread.h:175
A lightweight version of std::future.
Definition: future.h:30
Definition: WorkerThread.h:27
void QueueWorkThen(WorkFunction work, AfterWorkFunction afterWork, U &&...u)
Wakeup the worker thread, call the work function, and call the afterWork function with the result on ...
Definition: WorkerThread.h:258