WPILibC++  2018.4.1-20180925013224-1203-g32ec07e
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Modules Pages
AsyncFunction.h
1 /*----------------------------------------------------------------------------*/
2 /* Copyright (c) 2018 FIRST. All Rights Reserved. */
3 /* Open Source Software - may be modified and shared by FRC teams. The code */
4 /* must be accompanied by the FIRST BSD license file in the root directory of */
5 /* the project. */
6 /*----------------------------------------------------------------------------*/
7 
8 #ifndef WPIUTIL_WPI_UV_ASYNCFUNCTION_H_
9 #define WPIUTIL_WPI_UV_ASYNCFUNCTION_H_
10 
11 #include <uv.h>
12 
13 #include <algorithm>
14 #include <functional>
15 #include <memory>
16 #include <thread>
17 #include <tuple>
18 #include <utility>
19 #include <vector>
20 
21 #include "wpi/STLExtras.h"
22 #include "wpi/condition_variable.h"
23 #include "wpi/mutex.h"
24 #include "wpi/uv/Handle.h"
25 #include "wpi/uv/Loop.h"
26 
27 namespace wpi {
28 namespace uv {
29 
30 namespace detail {
31 
33  public:
34  virtual ~AsyncFunctionBase() {
35  m_active = false;
36  m_resultCv.notify_all(); // wake up any waiters
37  }
38 
39  protected:
40  wpi::mutex m_mutex;
41  std::atomic_bool m_active{true};
42  wpi::condition_variable m_resultCv;
43 };
44 
45 template <typename R, typename... T>
47  inline void RunCall(const std::function<R(T...)>& func,
48  std::pair<std::thread::id, std::tuple<T...>>& v) {
49  m_results.emplace_back(
50  std::piecewise_construct, std::forward_as_tuple(v.first),
51  std::forward_as_tuple(apply_tuple(func, std::move(v.second))));
52  }
53 
54  inline R GetCallResult(std::thread::id from) {
55  // wait for response
56  std::unique_lock<wpi::mutex> lock(m_mutex);
57  while (m_active) {
58  // Did we get a response to *our* request?
59  auto it = std::find_if(m_results.begin(), m_results.end(),
60  [=](const auto& r) { return r.first == from; });
61  if (it != m_results.end()) {
62  // Yes, remove it from the vector and we're done.
63  auto rv = std::move(it->second);
64  m_results.erase(it);
65  return rv;
66  }
67  // No, keep waiting for a response
68  m_resultCv.wait(lock);
69  }
70 
71  return R();
72  }
73 
74  private:
75  std::vector<std::pair<std::thread::id, R>> m_results;
76 };
77 
78 // void return value partial specialization
79 template <typename... T>
80 struct AsyncFunctionHelper<void, T...> : public AsyncFunctionBase {
81  inline void RunCall(const std::function<void(T...)>& func,
82  std::pair<std::thread::id, std::tuple<T...>>& v) {
83  apply_tuple(func, std::move(v.second));
84  m_results.emplace_back(v.first);
85  }
86 
87  inline void GetCallResult(std::thread::id from) {
88  // wait for response
89  std::unique_lock<wpi::mutex> lock(m_mutex);
90  while (m_active) {
91  // Did we get a response to *our* request?
92  auto it = std::find(m_results.begin(), m_results.end(), from);
93  if (it != m_results.end()) {
94  // Yes, remove it from the vector and we're done.
95  m_results.erase(it);
96  return;
97  }
98  // No, keep waiting for a response
99  m_resultCv.wait(lock);
100  }
101  }
102 
103  private:
104  std::vector<std::thread::id> m_results;
105 };
106 
107 } // namespace detail
108 
109 template <typename T>
111 
117 template <typename R, typename... T>
118 class AsyncFunction<R(T...)> final
119  : public HandleImpl<AsyncFunction<R(T...)>, uv_async_t>,
120  private detail::AsyncFunctionHelper<R, T...> {
121  struct private_init {};
122 
123  public:
124  AsyncFunction(std::function<R(T...)> func, const private_init&)
125  : wakeup{func} {}
126  ~AsyncFunction() noexcept override = default;
127 
134  static std::shared_ptr<AsyncFunction> Create(
135  Loop& loop, std::function<R(T...)> func = nullptr) {
136  auto h = std::make_shared<AsyncFunction>(std::move(func), private_init{});
137  int err = uv_async_init(loop.GetRaw(), h->GetRaw(), [](uv_async_t* handle) {
138  auto& h = *static_cast<AsyncFunction*>(handle->data);
139  std::lock_guard<wpi::mutex> lock(h.m_mutex);
140 
141  if (!h.m_params.empty()) {
142  // for each set of parameters in the input queue
143  for (auto&& v : h.m_params) {
144  // call the wakeup function and put the result in the output queue
145  // if the caller is waiting for it
146  if (v.first == std::thread::id{})
147  apply_tuple(h.wakeup, std::move(v.second));
148  else
149  h.RunCall(h.wakeup, v);
150  }
151  h.m_params.clear();
152  // wake up any threads that might be waiting for the result
153  h.m_resultCv.notify_all();
154  }
155  });
156  if (err < 0) {
157  loop.ReportError(err);
158  return nullptr;
159  }
160  h->Keep();
161  return h;
162  }
163 
170  static std::shared_ptr<AsyncFunction> Create(
171  const std::shared_ptr<Loop>& loop,
172  std::function<R(T...)> func = nullptr) {
173  return Create(*loop, std::move(func));
174  }
175 
183  template <typename... U>
184  void Send(U&&... u) {
185  // add the parameters to the input queue
186  {
187  std::lock_guard<wpi::mutex> lock(this->m_mutex);
188  m_params.emplace_back(std::piecewise_construct,
189  std::forward_as_tuple(std::thread::id{}),
190  std::forward_as_tuple(std::forward<U>(u)...));
191  }
192 
193  // signal the loop
194  this->Invoke(&uv_async_send, this->GetRaw());
195  }
196 
207  template <typename... U>
208  R Call(U&&... u) {
209  std::thread::id from = std::this_thread::get_id();
210 
211  // add the parameters to the input queue
212  {
213  std::lock_guard<wpi::mutex> lock(this->m_mutex);
214  m_params.emplace_back(std::piecewise_construct,
215  std::forward_as_tuple(from),
216  std::forward_as_tuple(std::forward<U>(u)...));
217  }
218 
219  // signal the loop
220  this->Invoke(&uv_async_send, this->GetRaw());
221 
222  // wait for response
223  return this->GetCallResult(from);
224  }
225 
229  std::function<R(T...)> wakeup;
230 
231  private:
232  std::vector<std::pair<std::thread::id, std::tuple<T...>>> m_params;
233 };
234 
235 } // namespace uv
236 } // namespace wpi
237 
238 #endif // WPIUTIL_WPI_UV_ASYNCFUNCTION_H_
Definition: AsyncFunction.h:110
static std::shared_ptr< AsyncFunction > Create(Loop &loop, std::function< R(T...)> func=nullptr)
Create an async handle.
Definition: AsyncFunction.h:134
Handle.
Definition: Handle.h:246
void Send(U &&...u)
Wakeup the event loop, call the async function, and ignore any result.
Definition: AsyncFunction.h:184
Definition: AsyncFunction.h:46
void ReportError(int err)
Reports error.
Definition: Loop.h:235
uv_loop_t * GetRaw() const noexcept
Get the underlying event loop data structure.
Definition: Loop.h:209
WPILib C++ utilities (wpiutil) namespace.
Definition: SmallString.h:21
std::function< R(T...)> wakeup
Function called (on event loop thread) when the async is called.
Definition: AsyncFunction.h:229
auto apply_tuple(F &&f, Tuple &&t) -> decltype(detail::apply_tuple_impl(std::forward< F >(f), std::forward< Tuple >(t), build_index_impl< std::tuple_size< typename std::decay< Tuple >::type >::value >
Given an input tuple (a1, a2, ..., an), pass the arguments of the tuple variadically to f as if by ca...
Definition: STLExtras.h:1209
Definition: uv.h:768
Event loop.
Definition: Loop.h:37
R Call(U &&...u)
Wakeup the event loop, call the async function, and return the result.
Definition: AsyncFunction.h:208
static std::shared_ptr< AsyncFunction > Create(const std::shared_ptr< Loop > &loop, std::function< R(T...)> func=nullptr)
Create an async handle.
Definition: AsyncFunction.h:170
Definition: AsyncFunction.h:32