8 #ifndef NT_CALLBACKMANAGER_H_ 9 #define NT_CALLBACKMANAGER_H_ 13 #include <condition_variable> 19 #include "llvm/raw_ostream.h" 20 #include "support/SafeThread.h" 21 #include "support/UidVector.h" 27 template <
typename Callback>
31 ListenerData(Callback callback_) : callback(callback_) {}
32 ListenerData(
unsigned int poller_uid_) : poller_uid(poller_uid_) {}
34 explicit operator bool()
const {
return callback || poller_uid != UINT_MAX; }
37 unsigned int poller_uid = UINT_MAX;
48 template <
typename Derived,
typename TUserInfo,
49 typename TListenerData =
51 typename TNotifierData = TUserInfo>
54 typedef TUserInfo UserInfo;
55 typedef TNotifierData NotifierData;
56 typedef TListenerData ListenerData;
60 for (std::size_t i = 0; i < m_pollers.size(); ++i) {
61 if (
auto poller = m_pollers[i]) poller->Terminate();
69 std::queue<std::pair<unsigned int, NotifierData>> m_queue;
70 std::condition_variable m_queue_empty;
75 std::lock_guard<std::mutex> lock(poll_mutex);
78 poll_cond.notify_all();
80 std::queue<NotifierData> poll_queue;
81 std::mutex poll_mutex;
82 std::condition_variable poll_cond;
83 bool terminating =
false;
84 bool cancelling =
false;
89 template <
typename... Args>
90 void SendPoller(
unsigned int poller_uid, Args&&... args) {
91 if (poller_uid > m_pollers.size())
return;
92 auto poller = m_pollers[poller_uid];
95 std::lock_guard<std::mutex> lock(poller->poll_mutex);
96 poller->poll_queue.emplace(std::forward<Args>(args)...);
98 poller->poll_cond.notify_one();
102 template <
typename Derived,
typename TUserInfo,
typename TListenerData,
103 typename TNotifierData>
105 std::unique_lock<std::mutex> lock(m_mutex);
107 while (m_queue.empty()) {
109 if (!m_active)
return;
112 while (!m_queue.empty()) {
113 if (!m_active)
return;
114 auto item = std::move(m_queue.front());
116 if (item.first != UINT_MAX) {
117 if (item.first < m_listeners.size()) {
118 auto& listener = m_listeners[item.first];
120 static_cast<Derived*>(
this)->Matches(listener, item.second)) {
121 static_cast<Derived*
>(
this)->SetListener(&item.second, item.first);
122 if (listener.callback) {
124 static_cast<Derived*
>(
this)->DoCallback(listener.callback,
127 }
else if (listener.poller_uid != UINT_MAX) {
128 SendPoller(listener.poller_uid, std::move(item.second));
134 for (
size_t i = 0; i < m_listeners.size(); ++i) {
135 auto& listener = m_listeners[i];
136 if (!listener)
continue;
137 if (!static_cast<Derived*>(
this)->Matches(listener, item.second))
139 static_cast<Derived*
>(
this)->SetListener(&item.second, i);
140 if (listener.callback) {
142 static_cast<Derived*
>(
this)->DoCallback(listener.callback,
145 }
else if (listener.poller_uid != UINT_MAX) {
146 SendPoller(listener.poller_uid, item.second);
153 m_queue_empty.notify_all();
165 template <
typename Derived,
typename Thread>
167 friend class RpcServerTest;
170 void Stop() { m_owner.Stop(); }
172 void Remove(
unsigned int listener_uid) {
173 auto thr = m_owner.GetThread();
175 thr->m_listeners.erase(listener_uid);
178 unsigned int CreatePoller() {
179 static_cast<Derived*
>(
this)->Start();
180 auto thr = m_owner.GetThread();
181 return thr->m_pollers.emplace_back(
182 std::make_shared<typename Thread::Poller>());
185 void RemovePoller(
unsigned int poller_uid) {
186 auto thr = m_owner.GetThread();
190 for (std::size_t i = 0; i < thr->m_listeners.size(); ++i) {
191 if (thr->m_listeners[i].poller_uid == poller_uid)
192 thr->m_listeners.erase(i);
196 if (poller_uid >= thr->m_pollers.size())
return;
197 auto poller = thr->m_pollers[poller_uid];
200 return thr->m_pollers.erase(poller_uid);
203 bool WaitForQueue(
double timeout) {
204 auto thr = m_owner.GetThread();
205 if (!thr)
return true;
207 auto& lock = thr.GetLock();
208 #if defined(_MSC_VER) && _MSC_VER < 1900 209 auto timeout_time = std::chrono::steady_clock::now() +
210 std::chrono::duration<int64_t, std::nano>(
211 static_cast<int64_t
>(timeout * 1e9));
213 auto timeout_time = std::chrono::steady_clock::now() +
214 std::chrono::duration<double>(timeout);
216 while (!thr->m_queue.empty()) {
217 if (!thr->m_active)
return true;
218 if (timeout == 0)
return false;
220 thr->m_queue_empty.wait(lock);
222 auto cond_timed_out = thr->m_queue_empty.wait_until(lock, timeout_time);
223 if (cond_timed_out == std::cv_status::timeout)
return false;
230 std::vector<typename Thread::UserInfo> Poll(
unsigned int poller_uid) {
231 bool timed_out =
false;
232 return Poll(poller_uid, -1, &timed_out);
235 std::vector<typename Thread::UserInfo> Poll(
unsigned int poller_uid,
236 double timeout,
bool* timed_out) {
237 std::vector<typename Thread::UserInfo> infos;
238 std::shared_ptr<typename Thread::Poller> poller;
240 auto thr = m_owner.GetThread();
241 if (!thr)
return infos;
242 if (poller_uid > thr->m_pollers.size())
return infos;
243 poller = thr->m_pollers[poller_uid];
244 if (!poller)
return infos;
247 std::unique_lock<std::mutex> lock(poller->poll_mutex);
248 #if defined(_MSC_VER) && _MSC_VER < 1900 249 auto timeout_time = std::chrono::steady_clock::now() +
250 std::chrono::duration<int64_t, std::nano>(
251 static_cast<int64_t
>(timeout * 1e9));
253 auto timeout_time = std::chrono::steady_clock::now() +
254 std::chrono::duration<double>(timeout);
257 while (poller->poll_queue.empty()) {
258 if (poller->terminating)
return infos;
259 if (poller->cancelling) {
262 poller->cancelling =
false;
270 poller->poll_cond.wait(lock);
272 auto cond_timed_out = poller->poll_cond.wait_until(lock, timeout_time);
273 if (cond_timed_out == std::cv_status::timeout) {
280 while (!poller->poll_queue.empty()) {
281 infos.emplace_back(std::move(poller->poll_queue.front()));
282 poller->poll_queue.pop();
287 void CancelPoll(
unsigned int poller_uid) {
288 std::shared_ptr<typename Thread::Poller> poller;
290 auto thr = m_owner.GetThread();
292 if (poller_uid > thr->m_pollers.size())
return;
293 poller = thr->m_pollers[poller_uid];
298 std::lock_guard<std::mutex> lock(poller->poll_mutex);
299 poller->cancelling =
true;
301 poller->poll_cond.notify_one();
305 template <
typename... Args>
306 void DoStart(Args&&... args) {
307 auto thr = m_owner.GetThread();
308 if (!thr) m_owner.Start(
new Thread(std::forward<Args>(args)...));
311 template <
typename... Args>
312 unsigned int DoAdd(Args&&... args) {
313 static_cast<Derived*
>(
this)->Start();
314 auto thr = m_owner.GetThread();
315 return thr->m_listeners.emplace_back(std::forward<Args>(args)...);
318 template <
typename... Args>
319 void Send(
unsigned int only_listener, Args&&... args) {
320 auto thr = m_owner.GetThread();
321 if (!thr || thr->m_listeners.empty())
return;
322 thr->m_queue.emplace(std::piecewise_construct,
323 std::make_tuple(only_listener),
324 std::forward_as_tuple(std::forward<Args>(args)...));
325 thr->m_cond.notify_one();
328 typename wpi::SafeThreadOwner<Thread>::Proxy GetThread()
const {
329 return m_owner.GetThread();
338 #endif // NT_CALLBACKMANAGER_H_ Definition: SafeThread.h:19
Definition: CallbackManager.h:52
Definition: CallbackManager.h:72
Definition: IEntryNotifier.h:15
Definition: CallbackManager.h:166
Definition: CallbackManager.h:28