WPILibC++  unspecified
CallbackManager.h
1 /*----------------------------------------------------------------------------*/
2 /* Copyright (c) 2017-2018 FIRST. All Rights Reserved. */
3 /* Open Source Software - may be modified and shared by FRC teams. The code */
4 /* must be accompanied by the FIRST BSD license file in the root directory of */
5 /* the project. */
6 /*----------------------------------------------------------------------------*/
7 
8 #ifndef NTCORE_CALLBACKMANAGER_H_
9 #define NTCORE_CALLBACKMANAGER_H_
10 
11 #include <atomic>
12 #include <climits>
13 #include <functional>
14 #include <memory>
15 #include <queue>
16 #include <utility>
17 #include <vector>
18 
19 #include <wpi/SafeThread.h>
20 #include <wpi/UidVector.h>
21 #include <wpi/condition_variable.h>
22 #include <wpi/mutex.h>
23 #include <wpi/raw_ostream.h>
24 
25 namespace nt {
26 
27 namespace impl {
28 
29 template <typename Callback>
30 class ListenerData {
31  public:
32  ListenerData() = default;
33  explicit ListenerData(Callback callback_) : callback(callback_) {}
34  explicit ListenerData(unsigned int poller_uid_) : poller_uid(poller_uid_) {}
35 
36  explicit operator bool() const { return callback || poller_uid != UINT_MAX; }
37 
38  Callback callback;
39  unsigned int poller_uid = UINT_MAX;
40 };
41 
42 // CRTP callback manager thread
43 // @tparam Derived derived class
44 // @tparam NotifierData data buffered for each callback
45 // @tparam ListenerData data stored for each listener
46 // Derived must define the following functions:
47 // bool Matches(const ListenerData& listener, const NotifierData& data);
48 // void SetListener(NotifierData* data, unsigned int listener_uid);
49 // void DoCallback(Callback callback, const NotifierData& data);
50 template <typename Derived, typename TUserInfo,
51  typename TListenerData =
53  typename TNotifierData = TUserInfo>
55  public:
56  typedef TUserInfo UserInfo;
57  typedef TNotifierData NotifierData;
58  typedef TListenerData ListenerData;
59 
60  ~CallbackThread() {
61  // Wake up any blocked pollers
62  for (size_t i = 0; i < m_pollers.size(); ++i) {
63  if (auto poller = m_pollers[i]) poller->Terminate();
64  }
65  }
66 
67  void Main() override;
68 
70 
71  std::queue<std::pair<unsigned int, NotifierData>> m_queue;
72  wpi::condition_variable m_queue_empty;
73 
74  struct Poller {
75  void Terminate() {
76  {
77  std::lock_guard<wpi::mutex> lock(poll_mutex);
78  terminating = true;
79  }
80  poll_cond.notify_all();
81  }
82  std::queue<NotifierData> poll_queue;
83  wpi::mutex poll_mutex;
84  wpi::condition_variable poll_cond;
85  bool terminating = false;
86  bool cancelling = false;
87  };
89 
90  // Must be called with m_mutex held
91  template <typename... Args>
92  void SendPoller(unsigned int poller_uid, Args&&... args) {
93  if (poller_uid > m_pollers.size()) return;
94  auto poller = m_pollers[poller_uid];
95  if (!poller) return;
96  {
97  std::lock_guard<wpi::mutex> lock(poller->poll_mutex);
98  poller->poll_queue.emplace(std::forward<Args>(args)...);
99  }
100  poller->poll_cond.notify_one();
101  }
102 };
103 
104 template <typename Derived, typename TUserInfo, typename TListenerData,
105  typename TNotifierData>
107  std::unique_lock<wpi::mutex> lock(m_mutex);
108  while (m_active) {
109  while (m_queue.empty()) {
110  m_cond.wait(lock);
111  if (!m_active) return;
112  }
113 
114  while (!m_queue.empty()) {
115  if (!m_active) return;
116  auto item = std::move(m_queue.front());
117 
118  if (item.first != UINT_MAX) {
119  if (item.first < m_listeners.size()) {
120  auto& listener = m_listeners[item.first];
121  if (listener &&
122  static_cast<Derived*>(this)->Matches(listener, item.second)) {
123  static_cast<Derived*>(this)->SetListener(&item.second, item.first);
124  if (listener.callback) {
125  lock.unlock();
126  static_cast<Derived*>(this)->DoCallback(listener.callback,
127  item.second);
128  lock.lock();
129  } else if (listener.poller_uid != UINT_MAX) {
130  SendPoller(listener.poller_uid, std::move(item.second));
131  }
132  }
133  }
134  } else {
135  // Use index because iterator might get invalidated.
136  for (size_t i = 0; i < m_listeners.size(); ++i) {
137  auto& listener = m_listeners[i];
138  if (!listener) continue;
139  if (!static_cast<Derived*>(this)->Matches(listener, item.second))
140  continue;
141  static_cast<Derived*>(this)->SetListener(&item.second, i);
142  if (listener.callback) {
143  lock.unlock();
144  static_cast<Derived*>(this)->DoCallback(listener.callback,
145  item.second);
146  lock.lock();
147  } else if (listener.poller_uid != UINT_MAX) {
148  SendPoller(listener.poller_uid, item.second);
149  }
150  }
151  }
152  m_queue.pop();
153  }
154 
155  m_queue_empty.notify_all();
156  }
157 }
158 
159 } // namespace impl
160 
161 // CRTP callback manager
162 // @tparam Derived derived class
163 // @tparam Thread custom thread (must be derived from impl::CallbackThread)
164 //
165 // Derived must define the following functions:
166 // void Start();
167 template <typename Derived, typename Thread>
169  friend class RpcServerTest;
170 
171  public:
172  void Stop() { m_owner.Stop(); }
173 
174  void Remove(unsigned int listener_uid) {
175  auto thr = m_owner.GetThread();
176  if (!thr) return;
177  thr->m_listeners.erase(listener_uid);
178  }
179 
180  unsigned int CreatePoller() {
181  static_cast<Derived*>(this)->Start();
182  auto thr = m_owner.GetThread();
183  return thr->m_pollers.emplace_back(
184  std::make_shared<typename Thread::Poller>());
185  }
186 
187  void RemovePoller(unsigned int poller_uid) {
188  auto thr = m_owner.GetThread();
189  if (!thr) return;
190 
191  // Remove any listeners that are associated with this poller
192  for (size_t i = 0; i < thr->m_listeners.size(); ++i) {
193  if (thr->m_listeners[i].poller_uid == poller_uid)
194  thr->m_listeners.erase(i);
195  }
196 
197  // Wake up any blocked pollers
198  if (poller_uid >= thr->m_pollers.size()) return;
199  auto poller = thr->m_pollers[poller_uid];
200  if (!poller) return;
201  poller->Terminate();
202  return thr->m_pollers.erase(poller_uid);
203  }
204 
205  bool WaitForQueue(double timeout) {
206  auto thr = m_owner.GetThread();
207  if (!thr) return true;
208 
209  auto& lock = thr.GetLock();
210  auto timeout_time = std::chrono::steady_clock::now() +
211  std::chrono::duration<double>(timeout);
212  while (!thr->m_queue.empty()) {
213  if (!thr->m_active) return true;
214  if (timeout == 0) return false;
215  if (timeout < 0) {
216  thr->m_queue_empty.wait(lock);
217  } else {
218  auto cond_timed_out = thr->m_queue_empty.wait_until(lock, timeout_time);
219  if (cond_timed_out == std::cv_status::timeout) return false;
220  }
221  }
222 
223  return true;
224  }
225 
226  std::vector<typename Thread::UserInfo> Poll(unsigned int poller_uid) {
227  bool timed_out = false;
228  return Poll(poller_uid, -1, &timed_out);
229  }
230 
231  std::vector<typename Thread::UserInfo> Poll(unsigned int poller_uid,
232  double timeout, bool* timed_out) {
233  std::vector<typename Thread::UserInfo> infos;
234  std::shared_ptr<typename Thread::Poller> poller;
235  {
236  auto thr = m_owner.GetThread();
237  if (!thr) return infos;
238  if (poller_uid > thr->m_pollers.size()) return infos;
239  poller = thr->m_pollers[poller_uid];
240  if (!poller) return infos;
241  }
242 
243  std::unique_lock<wpi::mutex> lock(poller->poll_mutex);
244  auto timeout_time = std::chrono::steady_clock::now() +
245  std::chrono::duration<double>(timeout);
246  *timed_out = false;
247  while (poller->poll_queue.empty()) {
248  if (poller->terminating) return infos;
249  if (poller->cancelling) {
250  // Note: this only works if there's a single thread calling this
251  // function for any particular poller, but that's the intended use.
252  poller->cancelling = false;
253  return infos;
254  }
255  if (timeout == 0) {
256  *timed_out = true;
257  return infos;
258  }
259  if (timeout < 0) {
260  poller->poll_cond.wait(lock);
261  } else {
262  auto cond_timed_out = poller->poll_cond.wait_until(lock, timeout_time);
263  if (cond_timed_out == std::cv_status::timeout) {
264  *timed_out = true;
265  return infos;
266  }
267  }
268  }
269 
270  while (!poller->poll_queue.empty()) {
271  infos.emplace_back(std::move(poller->poll_queue.front()));
272  poller->poll_queue.pop();
273  }
274  return infos;
275  }
276 
277  void CancelPoll(unsigned int poller_uid) {
278  std::shared_ptr<typename Thread::Poller> poller;
279  {
280  auto thr = m_owner.GetThread();
281  if (!thr) return;
282  if (poller_uid > thr->m_pollers.size()) return;
283  poller = thr->m_pollers[poller_uid];
284  if (!poller) return;
285  }
286 
287  {
288  std::lock_guard<wpi::mutex> lock(poller->poll_mutex);
289  poller->cancelling = true;
290  }
291  poller->poll_cond.notify_one();
292  }
293 
294  protected:
295  template <typename... Args>
296  void DoStart(Args&&... args) {
297  auto thr = m_owner.GetThread();
298  if (!thr) m_owner.Start(new Thread(std::forward<Args>(args)...));
299  }
300 
301  template <typename... Args>
302  unsigned int DoAdd(Args&&... args) {
303  static_cast<Derived*>(this)->Start();
304  auto thr = m_owner.GetThread();
305  return thr->m_listeners.emplace_back(std::forward<Args>(args)...);
306  }
307 
308  template <typename... Args>
309  void Send(unsigned int only_listener, Args&&... args) {
310  auto thr = m_owner.GetThread();
311  if (!thr || thr->m_listeners.empty()) return;
312  thr->m_queue.emplace(std::piecewise_construct,
313  std::make_tuple(only_listener),
314  std::forward_as_tuple(std::forward<Args>(args)...));
315  thr->m_cond.notify_one();
316  }
317 
318  typename wpi::SafeThreadOwner<Thread>::Proxy GetThread() const {
319  return m_owner.GetThread();
320  }
321 
322  private:
324 };
325 
326 } // namespace nt
327 
328 #endif // NTCORE_CALLBACKMANAGER_H_
Definition: SafeThread.h:20
Definition: CallbackManager.h:54
Definition: CallbackManager.h:74
Definition: IStorage.h:21
Definition: CallbackManager.h:168
Definition: CallbackManager.h:30