WPILibC++  unspecified
CallbackManager.h
1 /*----------------------------------------------------------------------------*/
2 /* Copyright (c) FIRST 2017. All Rights Reserved. */
3 /* Open Source Software - may be modified and shared by FRC teams. The code */
4 /* must be accompanied by the FIRST BSD license file in the root directory of */
5 /* the project. */
6 /*----------------------------------------------------------------------------*/
7 
8 #ifndef NT_CALLBACKMANAGER_H_
9 #define NT_CALLBACKMANAGER_H_
10 
11 #include <atomic>
12 #include <climits>
13 #include <condition_variable>
14 #include <functional>
15 #include <mutex>
16 #include <queue>
17 #include <utility>
18 
19 #include "llvm/raw_ostream.h"
20 #include "support/SafeThread.h"
21 #include "support/UidVector.h"
22 
23 namespace nt {
24 
25 namespace impl {
26 
27 template <typename Callback>
28 class ListenerData {
29  public:
30  ListenerData() = default;
31  ListenerData(Callback callback_) : callback(callback_) {}
32  ListenerData(unsigned int poller_uid_) : poller_uid(poller_uid_) {}
33 
34  explicit operator bool() const { return callback || poller_uid != UINT_MAX; }
35 
36  Callback callback;
37  unsigned int poller_uid = UINT_MAX;
38 };
39 
40 // CRTP callback manager thread
41 // @tparam Derived derived class
42 // @tparam NotifierData data buffered for each callback
43 // @tparam ListenerData data stored for each listener
44 // Derived must define the following functions:
45 // bool Matches(const ListenerData& listener, const NotifierData& data);
46 // void SetListener(NotifierData* data, unsigned int listener_uid);
47 // void DoCallback(Callback callback, const NotifierData& data);
48 template <typename Derived, typename TUserInfo,
49  typename TListenerData =
51  typename TNotifierData = TUserInfo>
53  public:
54  typedef TUserInfo UserInfo;
55  typedef TNotifierData NotifierData;
56  typedef TListenerData ListenerData;
57 
58  ~CallbackThread() {
59  // Wake up any blocked pollers
60  for (std::size_t i = 0; i < m_pollers.size(); ++i) {
61  if (auto poller = m_pollers[i]) poller->Terminate();
62  }
63  }
64 
65  void Main() override;
66 
68 
69  std::queue<std::pair<unsigned int, NotifierData>> m_queue;
70  std::condition_variable m_queue_empty;
71 
72  struct Poller {
73  void Terminate() {
74  {
75  std::lock_guard<std::mutex> lock(poll_mutex);
76  terminating = true;
77  }
78  poll_cond.notify_all();
79  }
80  std::queue<NotifierData> poll_queue;
81  std::mutex poll_mutex;
82  std::condition_variable poll_cond;
83  bool terminating = false;
84  bool cancelling = false;
85  };
87 
88  // Must be called with m_mutex held
89  template <typename... Args>
90  void SendPoller(unsigned int poller_uid, Args&&... args) {
91  if (poller_uid > m_pollers.size()) return;
92  auto poller = m_pollers[poller_uid];
93  if (!poller) return;
94  {
95  std::lock_guard<std::mutex> lock(poller->poll_mutex);
96  poller->poll_queue.emplace(std::forward<Args>(args)...);
97  }
98  poller->poll_cond.notify_one();
99  }
100 };
101 
102 template <typename Derived, typename TUserInfo, typename TListenerData,
103  typename TNotifierData>
105  std::unique_lock<std::mutex> lock(m_mutex);
106  while (m_active) {
107  while (m_queue.empty()) {
108  m_cond.wait(lock);
109  if (!m_active) return;
110  }
111 
112  while (!m_queue.empty()) {
113  if (!m_active) return;
114  auto item = std::move(m_queue.front());
115 
116  if (item.first != UINT_MAX) {
117  if (item.first < m_listeners.size()) {
118  auto& listener = m_listeners[item.first];
119  if (listener &&
120  static_cast<Derived*>(this)->Matches(listener, item.second)) {
121  static_cast<Derived*>(this)->SetListener(&item.second, item.first);
122  if (listener.callback) {
123  lock.unlock();
124  static_cast<Derived*>(this)->DoCallback(listener.callback,
125  item.second);
126  lock.lock();
127  } else if (listener.poller_uid != UINT_MAX) {
128  SendPoller(listener.poller_uid, std::move(item.second));
129  }
130  }
131  }
132  } else {
133  // Use index because iterator might get invalidated.
134  for (size_t i = 0; i < m_listeners.size(); ++i) {
135  auto& listener = m_listeners[i];
136  if (!listener) continue;
137  if (!static_cast<Derived*>(this)->Matches(listener, item.second))
138  continue;
139  static_cast<Derived*>(this)->SetListener(&item.second, i);
140  if (listener.callback) {
141  lock.unlock();
142  static_cast<Derived*>(this)->DoCallback(listener.callback,
143  item.second);
144  lock.lock();
145  } else if (listener.poller_uid != UINT_MAX) {
146  SendPoller(listener.poller_uid, item.second);
147  }
148  }
149  }
150  m_queue.pop();
151  }
152 
153  m_queue_empty.notify_all();
154  }
155 }
156 
157 } // namespace impl
158 
159 // CRTP callback manager
160 // @tparam Derived derived class
161 // @tparam Thread custom thread (must be derived from impl::CallbackThread)
162 //
163 // Derived must define the following functions:
164 // void Start();
165 template <typename Derived, typename Thread>
167  friend class RpcServerTest;
168 
169  public:
170  void Stop() { m_owner.Stop(); }
171 
172  void Remove(unsigned int listener_uid) {
173  auto thr = m_owner.GetThread();
174  if (!thr) return;
175  thr->m_listeners.erase(listener_uid);
176  }
177 
178  unsigned int CreatePoller() {
179  static_cast<Derived*>(this)->Start();
180  auto thr = m_owner.GetThread();
181  return thr->m_pollers.emplace_back(
182  std::make_shared<typename Thread::Poller>());
183  }
184 
185  void RemovePoller(unsigned int poller_uid) {
186  auto thr = m_owner.GetThread();
187  if (!thr) return;
188 
189  // Remove any listeners that are associated with this poller
190  for (std::size_t i = 0; i < thr->m_listeners.size(); ++i) {
191  if (thr->m_listeners[i].poller_uid == poller_uid)
192  thr->m_listeners.erase(i);
193  }
194 
195  // Wake up any blocked pollers
196  if (poller_uid >= thr->m_pollers.size()) return;
197  auto poller = thr->m_pollers[poller_uid];
198  if (!poller) return;
199  poller->Terminate();
200  return thr->m_pollers.erase(poller_uid);
201  }
202 
203  bool WaitForQueue(double timeout) {
204  auto thr = m_owner.GetThread();
205  if (!thr) return true;
206 
207  auto& lock = thr.GetLock();
208 #if defined(_MSC_VER) && _MSC_VER < 1900
209  auto timeout_time = std::chrono::steady_clock::now() +
210  std::chrono::duration<int64_t, std::nano>(
211  static_cast<int64_t>(timeout * 1e9));
212 #else
213  auto timeout_time = std::chrono::steady_clock::now() +
214  std::chrono::duration<double>(timeout);
215 #endif
216  while (!thr->m_queue.empty()) {
217  if (!thr->m_active) return true;
218  if (timeout == 0) return false;
219  if (timeout < 0) {
220  thr->m_queue_empty.wait(lock);
221  } else {
222  auto cond_timed_out = thr->m_queue_empty.wait_until(lock, timeout_time);
223  if (cond_timed_out == std::cv_status::timeout) return false;
224  }
225  }
226 
227  return true;
228  }
229 
230  std::vector<typename Thread::UserInfo> Poll(unsigned int poller_uid) {
231  bool timed_out = false;
232  return Poll(poller_uid, -1, &timed_out);
233  }
234 
235  std::vector<typename Thread::UserInfo> Poll(unsigned int poller_uid,
236  double timeout, bool* timed_out) {
237  std::vector<typename Thread::UserInfo> infos;
238  std::shared_ptr<typename Thread::Poller> poller;
239  {
240  auto thr = m_owner.GetThread();
241  if (!thr) return infos;
242  if (poller_uid > thr->m_pollers.size()) return infos;
243  poller = thr->m_pollers[poller_uid];
244  if (!poller) return infos;
245  }
246 
247  std::unique_lock<std::mutex> lock(poller->poll_mutex);
248 #if defined(_MSC_VER) && _MSC_VER < 1900
249  auto timeout_time = std::chrono::steady_clock::now() +
250  std::chrono::duration<int64_t, std::nano>(
251  static_cast<int64_t>(timeout * 1e9));
252 #else
253  auto timeout_time = std::chrono::steady_clock::now() +
254  std::chrono::duration<double>(timeout);
255 #endif
256  *timed_out = false;
257  while (poller->poll_queue.empty()) {
258  if (poller->terminating) return infos;
259  if (poller->cancelling) {
260  // Note: this only works if there's a single thread calling this
261  // function for any particular poller, but that's the intended use.
262  poller->cancelling = false;
263  return infos;
264  }
265  if (timeout == 0) {
266  *timed_out = true;
267  return infos;
268  }
269  if (timeout < 0) {
270  poller->poll_cond.wait(lock);
271  } else {
272  auto cond_timed_out = poller->poll_cond.wait_until(lock, timeout_time);
273  if (cond_timed_out == std::cv_status::timeout) {
274  *timed_out = true;
275  return infos;
276  }
277  }
278  }
279 
280  while (!poller->poll_queue.empty()) {
281  infos.emplace_back(std::move(poller->poll_queue.front()));
282  poller->poll_queue.pop();
283  }
284  return infos;
285  }
286 
287  void CancelPoll(unsigned int poller_uid) {
288  std::shared_ptr<typename Thread::Poller> poller;
289  {
290  auto thr = m_owner.GetThread();
291  if (!thr) return;
292  if (poller_uid > thr->m_pollers.size()) return;
293  poller = thr->m_pollers[poller_uid];
294  if (!poller) return;
295  }
296 
297  {
298  std::lock_guard<std::mutex> lock(poller->poll_mutex);
299  poller->cancelling = true;
300  }
301  poller->poll_cond.notify_one();
302  }
303 
304  protected:
305  template <typename... Args>
306  void DoStart(Args&&... args) {
307  auto thr = m_owner.GetThread();
308  if (!thr) m_owner.Start(new Thread(std::forward<Args>(args)...));
309  }
310 
311  template <typename... Args>
312  unsigned int DoAdd(Args&&... args) {
313  static_cast<Derived*>(this)->Start();
314  auto thr = m_owner.GetThread();
315  return thr->m_listeners.emplace_back(std::forward<Args>(args)...);
316  }
317 
318  template <typename... Args>
319  void Send(unsigned int only_listener, Args&&... args) {
320  auto thr = m_owner.GetThread();
321  if (!thr || thr->m_listeners.empty()) return;
322  thr->m_queue.emplace(std::piecewise_construct,
323  std::make_tuple(only_listener),
324  std::forward_as_tuple(std::forward<Args>(args)...));
325  thr->m_cond.notify_one();
326  }
327 
328  typename wpi::SafeThreadOwner<Thread>::Proxy GetThread() const {
329  return m_owner.GetThread();
330  }
331 
332  private:
334 };
335 
336 } // namespace nt
337 
338 #endif // NT_CALLBACKMANAGER_H_
Definition: SafeThread.h:19
Definition: CallbackManager.h:52
Definition: CallbackManager.h:72
Definition: IEntryNotifier.h:15
Definition: CallbackManager.h:166
Definition: CallbackManager.h:28