8 #ifndef NTCORE_CALLBACKMANAGER_H_ 9 #define NTCORE_CALLBACKMANAGER_H_ 19 #include <wpi/SafeThread.h> 20 #include <wpi/UidVector.h> 21 #include <wpi/condition_variable.h> 22 #include <wpi/mutex.h> 23 #include <wpi/raw_ostream.h> 29 template <
typename Callback>
33 explicit ListenerData(Callback callback_) : callback(callback_) {}
34 explicit ListenerData(
unsigned int poller_uid_) : poller_uid(poller_uid_) {}
36 explicit operator bool()
const {
return callback || poller_uid != UINT_MAX; }
39 unsigned int poller_uid = UINT_MAX;
50 template <
typename Derived,
typename TUserInfo,
51 typename TListenerData =
53 typename TNotifierData = TUserInfo>
56 typedef TUserInfo UserInfo;
57 typedef TNotifierData NotifierData;
58 typedef TListenerData ListenerData;
62 for (
size_t i = 0; i < m_pollers.size(); ++i) {
63 if (
auto poller = m_pollers[i]) poller->Terminate();
71 std::queue<std::pair<unsigned int, NotifierData>> m_queue;
72 wpi::condition_variable m_queue_empty;
77 std::lock_guard<wpi::mutex> lock(poll_mutex);
80 poll_cond.notify_all();
82 std::queue<NotifierData> poll_queue;
83 wpi::mutex poll_mutex;
84 wpi::condition_variable poll_cond;
85 bool terminating =
false;
86 bool cancelling =
false;
91 template <
typename... Args>
92 void SendPoller(
unsigned int poller_uid, Args&&... args) {
93 if (poller_uid > m_pollers.size())
return;
94 auto poller = m_pollers[poller_uid];
97 std::lock_guard<wpi::mutex> lock(poller->poll_mutex);
98 poller->poll_queue.emplace(std::forward<Args>(args)...);
100 poller->poll_cond.notify_one();
104 template <
typename Derived,
typename TUserInfo,
typename TListenerData,
105 typename TNotifierData>
107 std::unique_lock<wpi::mutex> lock(m_mutex);
109 while (m_queue.empty()) {
111 if (!m_active)
return;
114 while (!m_queue.empty()) {
115 if (!m_active)
return;
116 auto item = std::move(m_queue.front());
118 if (item.first != UINT_MAX) {
119 if (item.first < m_listeners.size()) {
120 auto& listener = m_listeners[item.first];
122 static_cast<Derived*>(
this)->Matches(listener, item.second)) {
123 static_cast<Derived*
>(
this)->SetListener(&item.second, item.first);
124 if (listener.callback) {
126 static_cast<Derived*
>(
this)->DoCallback(listener.callback,
129 }
else if (listener.poller_uid != UINT_MAX) {
130 SendPoller(listener.poller_uid, std::move(item.second));
136 for (
size_t i = 0; i < m_listeners.size(); ++i) {
137 auto& listener = m_listeners[i];
138 if (!listener)
continue;
139 if (!static_cast<Derived*>(
this)->Matches(listener, item.second))
141 static_cast<Derived*
>(
this)->SetListener(&item.second, i);
142 if (listener.callback) {
144 static_cast<Derived*
>(
this)->DoCallback(listener.callback,
147 }
else if (listener.poller_uid != UINT_MAX) {
148 SendPoller(listener.poller_uid, item.second);
155 m_queue_empty.notify_all();
167 template <
typename Derived,
typename Thread>
169 friend class RpcServerTest;
172 void Stop() { m_owner.Stop(); }
174 void Remove(
unsigned int listener_uid) {
175 auto thr = m_owner.GetThread();
177 thr->m_listeners.erase(listener_uid);
180 unsigned int CreatePoller() {
181 static_cast<Derived*
>(
this)->Start();
182 auto thr = m_owner.GetThread();
183 return thr->m_pollers.emplace_back(
184 std::make_shared<typename Thread::Poller>());
187 void RemovePoller(
unsigned int poller_uid) {
188 auto thr = m_owner.GetThread();
192 for (
size_t i = 0; i < thr->m_listeners.size(); ++i) {
193 if (thr->m_listeners[i].poller_uid == poller_uid)
194 thr->m_listeners.erase(i);
198 if (poller_uid >= thr->m_pollers.size())
return;
199 auto poller = thr->m_pollers[poller_uid];
202 return thr->m_pollers.erase(poller_uid);
205 bool WaitForQueue(
double timeout) {
206 auto thr = m_owner.GetThread();
207 if (!thr)
return true;
209 auto& lock = thr.GetLock();
210 auto timeout_time = std::chrono::steady_clock::now() +
211 std::chrono::duration<double>(timeout);
212 while (!thr->m_queue.empty()) {
213 if (!thr->m_active)
return true;
214 if (timeout == 0)
return false;
216 thr->m_queue_empty.wait(lock);
218 auto cond_timed_out = thr->m_queue_empty.wait_until(lock, timeout_time);
219 if (cond_timed_out == std::cv_status::timeout)
return false;
226 std::vector<typename Thread::UserInfo> Poll(
unsigned int poller_uid) {
227 bool timed_out =
false;
228 return Poll(poller_uid, -1, &timed_out);
231 std::vector<typename Thread::UserInfo> Poll(
unsigned int poller_uid,
232 double timeout,
bool* timed_out) {
233 std::vector<typename Thread::UserInfo> infos;
234 std::shared_ptr<typename Thread::Poller> poller;
236 auto thr = m_owner.GetThread();
237 if (!thr)
return infos;
238 if (poller_uid > thr->m_pollers.size())
return infos;
239 poller = thr->m_pollers[poller_uid];
240 if (!poller)
return infos;
243 std::unique_lock<wpi::mutex> lock(poller->poll_mutex);
244 auto timeout_time = std::chrono::steady_clock::now() +
245 std::chrono::duration<double>(timeout);
247 while (poller->poll_queue.empty()) {
248 if (poller->terminating)
return infos;
249 if (poller->cancelling) {
252 poller->cancelling =
false;
260 poller->poll_cond.wait(lock);
262 auto cond_timed_out = poller->poll_cond.wait_until(lock, timeout_time);
263 if (cond_timed_out == std::cv_status::timeout) {
270 while (!poller->poll_queue.empty()) {
271 infos.emplace_back(std::move(poller->poll_queue.front()));
272 poller->poll_queue.pop();
277 void CancelPoll(
unsigned int poller_uid) {
278 std::shared_ptr<typename Thread::Poller> poller;
280 auto thr = m_owner.GetThread();
282 if (poller_uid > thr->m_pollers.size())
return;
283 poller = thr->m_pollers[poller_uid];
288 std::lock_guard<wpi::mutex> lock(poller->poll_mutex);
289 poller->cancelling =
true;
291 poller->poll_cond.notify_one();
295 template <
typename... Args>
296 void DoStart(Args&&... args) {
297 auto thr = m_owner.GetThread();
298 if (!thr) m_owner.Start(
new Thread(std::forward<Args>(args)...));
301 template <
typename... Args>
302 unsigned int DoAdd(Args&&... args) {
303 static_cast<Derived*
>(
this)->Start();
304 auto thr = m_owner.GetThread();
305 return thr->m_listeners.emplace_back(std::forward<Args>(args)...);
308 template <
typename... Args>
309 void Send(
unsigned int only_listener, Args&&... args) {
310 auto thr = m_owner.GetThread();
311 if (!thr || thr->m_listeners.empty())
return;
312 thr->m_queue.emplace(std::piecewise_construct,
313 std::make_tuple(only_listener),
314 std::forward_as_tuple(std::forward<Args>(args)...));
315 thr->m_cond.notify_one();
318 typename wpi::SafeThreadOwner<Thread>::Proxy GetThread()
const {
319 return m_owner.GetThread();
328 #endif // NTCORE_CALLBACKMANAGER_H_ Definition: SafeThread.h:20
Definition: CallbackManager.h:54
Definition: CallbackManager.h:74
Definition: IStorage.h:21
Definition: CallbackManager.h:168
Definition: CallbackManager.h:30