WPILibC++  unspecified
CallbackManager.h
1 /*----------------------------------------------------------------------------*/
2 /* Copyright (c) FIRST 2017-2018. All Rights Reserved. */
3 /* Open Source Software - may be modified and shared by FRC teams. The code */
4 /* must be accompanied by the FIRST BSD license file in the root directory of */
5 /* the project. */
6 /*----------------------------------------------------------------------------*/
7 
8 #ifndef NTCORE_CALLBACKMANAGER_H_
9 #define NTCORE_CALLBACKMANAGER_H_
10 
11 #include <atomic>
12 #include <climits>
13 #include <functional>
14 #include <memory>
15 #include <queue>
16 #include <utility>
17 #include <vector>
18 
19 #include <llvm/raw_ostream.h>
20 #include <support/SafeThread.h>
21 #include <support/UidVector.h>
22 #include <support/condition_variable.h>
23 #include <support/mutex.h>
24 
25 namespace nt {
26 
27 namespace impl {
28 
29 template <typename Callback>
30 class ListenerData {
31  public:
32  ListenerData() = default;
33  explicit ListenerData(Callback callback_) : callback(callback_) {}
34  explicit ListenerData(unsigned int poller_uid_) : poller_uid(poller_uid_) {}
35 
36  explicit operator bool() const { return callback || poller_uid != UINT_MAX; }
37 
38  Callback callback;
39  unsigned int poller_uid = UINT_MAX;
40 };
41 
42 // CRTP callback manager thread
43 // @tparam Derived derived class
44 // @tparam NotifierData data buffered for each callback
45 // @tparam ListenerData data stored for each listener
46 // Derived must define the following functions:
47 // bool Matches(const ListenerData& listener, const NotifierData& data);
48 // void SetListener(NotifierData* data, unsigned int listener_uid);
49 // void DoCallback(Callback callback, const NotifierData& data);
50 template <typename Derived, typename TUserInfo,
51  typename TListenerData =
53  typename TNotifierData = TUserInfo>
55  public:
56  typedef TUserInfo UserInfo;
57  typedef TNotifierData NotifierData;
58  typedef TListenerData ListenerData;
59 
60  ~CallbackThread() {
61  // Wake up any blocked pollers
62  for (size_t i = 0; i < m_pollers.size(); ++i) {
63  if (auto poller = m_pollers[i]) poller->Terminate();
64  }
65  }
66 
67  void Main() override;
68 
70 
71  std::queue<std::pair<unsigned int, NotifierData>> m_queue;
72  wpi::condition_variable m_queue_empty;
73 
74  struct Poller {
75  void Terminate() {
76  {
77  std::lock_guard<wpi::mutex> lock(poll_mutex);
78  terminating = true;
79  }
80  poll_cond.notify_all();
81  }
82  std::queue<NotifierData> poll_queue;
83  wpi::mutex poll_mutex;
84  wpi::condition_variable poll_cond;
85  bool terminating = false;
86  bool cancelling = false;
87  };
89 
90  // Must be called with m_mutex held
91  template <typename... Args>
92  void SendPoller(unsigned int poller_uid, Args&&... args) {
93  if (poller_uid > m_pollers.size()) return;
94  auto poller = m_pollers[poller_uid];
95  if (!poller) return;
96  {
97  std::lock_guard<wpi::mutex> lock(poller->poll_mutex);
98  poller->poll_queue.emplace(std::forward<Args>(args)...);
99  }
100  poller->poll_cond.notify_one();
101  }
102 };
103 
104 template <typename Derived, typename TUserInfo, typename TListenerData,
105  typename TNotifierData>
107  std::unique_lock<wpi::mutex> lock(m_mutex);
108  while (m_active) {
109  while (m_queue.empty()) {
110  m_cond.wait(lock);
111  if (!m_active) return;
112  }
113 
114  while (!m_queue.empty()) {
115  if (!m_active) return;
116  auto item = std::move(m_queue.front());
117 
118  if (item.first != UINT_MAX) {
119  if (item.first < m_listeners.size()) {
120  auto& listener = m_listeners[item.first];
121  if (listener &&
122  static_cast<Derived*>(this)->Matches(listener, item.second)) {
123  static_cast<Derived*>(this)->SetListener(&item.second, item.first);
124  if (listener.callback) {
125  lock.unlock();
126  static_cast<Derived*>(this)->DoCallback(listener.callback,
127  item.second);
128  lock.lock();
129  } else if (listener.poller_uid != UINT_MAX) {
130  SendPoller(listener.poller_uid, std::move(item.second));
131  }
132  }
133  }
134  } else {
135  // Use index because iterator might get invalidated.
136  for (size_t i = 0; i < m_listeners.size(); ++i) {
137  auto& listener = m_listeners[i];
138  if (!listener) continue;
139  if (!static_cast<Derived*>(this)->Matches(listener, item.second))
140  continue;
141  static_cast<Derived*>(this)->SetListener(&item.second, i);
142  if (listener.callback) {
143  lock.unlock();
144  static_cast<Derived*>(this)->DoCallback(listener.callback,
145  item.second);
146  lock.lock();
147  } else if (listener.poller_uid != UINT_MAX) {
148  SendPoller(listener.poller_uid, item.second);
149  }
150  }
151  }
152  m_queue.pop();
153  }
154 
155  m_queue_empty.notify_all();
156  }
157 }
158 
159 } // namespace impl
160 
161 // CRTP callback manager
162 // @tparam Derived derived class
163 // @tparam Thread custom thread (must be derived from impl::CallbackThread)
164 //
165 // Derived must define the following functions:
166 // void Start();
167 template <typename Derived, typename Thread>
169  friend class RpcServerTest;
170 
171  public:
172  void Stop() { m_owner.Stop(); }
173 
174  void Remove(unsigned int listener_uid) {
175  auto thr = m_owner.GetThread();
176  if (!thr) return;
177  thr->m_listeners.erase(listener_uid);
178  }
179 
180  unsigned int CreatePoller() {
181  static_cast<Derived*>(this)->Start();
182  auto thr = m_owner.GetThread();
183  return thr->m_pollers.emplace_back(
184  std::make_shared<typename Thread::Poller>());
185  }
186 
187  void RemovePoller(unsigned int poller_uid) {
188  auto thr = m_owner.GetThread();
189  if (!thr) return;
190 
191  // Remove any listeners that are associated with this poller
192  for (size_t i = 0; i < thr->m_listeners.size(); ++i) {
193  if (thr->m_listeners[i].poller_uid == poller_uid)
194  thr->m_listeners.erase(i);
195  }
196 
197  // Wake up any blocked pollers
198  if (poller_uid >= thr->m_pollers.size()) return;
199  auto poller = thr->m_pollers[poller_uid];
200  if (!poller) return;
201  poller->Terminate();
202  return thr->m_pollers.erase(poller_uid);
203  }
204 
205  bool WaitForQueue(double timeout) {
206  auto thr = m_owner.GetThread();
207  if (!thr) return true;
208 
209  auto& lock = thr.GetLock();
210 #if defined(_MSC_VER) && _MSC_VER < 1900
211  auto timeout_time = std::chrono::steady_clock::now() +
212  std::chrono::duration<int64_t, std::nano>(
213  static_cast<int64_t>(timeout * 1e9));
214 #else
215  auto timeout_time = std::chrono::steady_clock::now() +
216  std::chrono::duration<double>(timeout);
217 #endif
218  while (!thr->m_queue.empty()) {
219  if (!thr->m_active) return true;
220  if (timeout == 0) return false;
221  if (timeout < 0) {
222  thr->m_queue_empty.wait(lock);
223  } else {
224  auto cond_timed_out = thr->m_queue_empty.wait_until(lock, timeout_time);
225  if (cond_timed_out == std::cv_status::timeout) return false;
226  }
227  }
228 
229  return true;
230  }
231 
232  std::vector<typename Thread::UserInfo> Poll(unsigned int poller_uid) {
233  bool timed_out = false;
234  return Poll(poller_uid, -1, &timed_out);
235  }
236 
237  std::vector<typename Thread::UserInfo> Poll(unsigned int poller_uid,
238  double timeout, bool* timed_out) {
239  std::vector<typename Thread::UserInfo> infos;
240  std::shared_ptr<typename Thread::Poller> poller;
241  {
242  auto thr = m_owner.GetThread();
243  if (!thr) return infos;
244  if (poller_uid > thr->m_pollers.size()) return infos;
245  poller = thr->m_pollers[poller_uid];
246  if (!poller) return infos;
247  }
248 
249  std::unique_lock<wpi::mutex> lock(poller->poll_mutex);
250 #if defined(_MSC_VER) && _MSC_VER < 1900
251  auto timeout_time = std::chrono::steady_clock::now() +
252  std::chrono::duration<int64_t, std::nano>(
253  static_cast<int64_t>(timeout * 1e9));
254 #else
255  auto timeout_time = std::chrono::steady_clock::now() +
256  std::chrono::duration<double>(timeout);
257 #endif
258  *timed_out = false;
259  while (poller->poll_queue.empty()) {
260  if (poller->terminating) return infos;
261  if (poller->cancelling) {
262  // Note: this only works if there's a single thread calling this
263  // function for any particular poller, but that's the intended use.
264  poller->cancelling = false;
265  return infos;
266  }
267  if (timeout == 0) {
268  *timed_out = true;
269  return infos;
270  }
271  if (timeout < 0) {
272  poller->poll_cond.wait(lock);
273  } else {
274  auto cond_timed_out = poller->poll_cond.wait_until(lock, timeout_time);
275  if (cond_timed_out == std::cv_status::timeout) {
276  *timed_out = true;
277  return infos;
278  }
279  }
280  }
281 
282  while (!poller->poll_queue.empty()) {
283  infos.emplace_back(std::move(poller->poll_queue.front()));
284  poller->poll_queue.pop();
285  }
286  return infos;
287  }
288 
289  void CancelPoll(unsigned int poller_uid) {
290  std::shared_ptr<typename Thread::Poller> poller;
291  {
292  auto thr = m_owner.GetThread();
293  if (!thr) return;
294  if (poller_uid > thr->m_pollers.size()) return;
295  poller = thr->m_pollers[poller_uid];
296  if (!poller) return;
297  }
298 
299  {
300  std::lock_guard<wpi::mutex> lock(poller->poll_mutex);
301  poller->cancelling = true;
302  }
303  poller->poll_cond.notify_one();
304  }
305 
306  protected:
307  template <typename... Args>
308  void DoStart(Args&&... args) {
309  auto thr = m_owner.GetThread();
310  if (!thr) m_owner.Start(new Thread(std::forward<Args>(args)...));
311  }
312 
313  template <typename... Args>
314  unsigned int DoAdd(Args&&... args) {
315  static_cast<Derived*>(this)->Start();
316  auto thr = m_owner.GetThread();
317  return thr->m_listeners.emplace_back(std::forward<Args>(args)...);
318  }
319 
320  template <typename... Args>
321  void Send(unsigned int only_listener, Args&&... args) {
322  auto thr = m_owner.GetThread();
323  if (!thr || thr->m_listeners.empty()) return;
324  thr->m_queue.emplace(std::piecewise_construct,
325  std::make_tuple(only_listener),
326  std::forward_as_tuple(std::forward<Args>(args)...));
327  thr->m_cond.notify_one();
328  }
329 
330  typename wpi::SafeThreadOwner<Thread>::Proxy GetThread() const {
331  return m_owner.GetThread();
332  }
333 
334  private:
336 };
337 
338 } // namespace nt
339 
340 #endif // NTCORE_CALLBACKMANAGER_H_
Definition: SafeThread.h:20
Definition: CallbackManager.h:54
Definition: CallbackManager.h:74
Definition: IEntryNotifier.h:16
Definition: CallbackManager.h:168
Definition: CallbackManager.h:30