8 #ifndef NTCORE_CALLBACKMANAGER_H_ 9 #define NTCORE_CALLBACKMANAGER_H_ 19 #include <llvm/raw_ostream.h> 20 #include <support/SafeThread.h> 21 #include <support/UidVector.h> 22 #include <support/condition_variable.h> 23 #include <support/mutex.h> 29 template <
typename Callback>
33 explicit ListenerData(Callback callback_) : callback(callback_) {}
34 explicit ListenerData(
unsigned int poller_uid_) : poller_uid(poller_uid_) {}
36 explicit operator bool()
const {
return callback || poller_uid != UINT_MAX; }
39 unsigned int poller_uid = UINT_MAX;
50 template <
typename Derived,
typename TUserInfo,
51 typename TListenerData =
53 typename TNotifierData = TUserInfo>
56 typedef TUserInfo UserInfo;
57 typedef TNotifierData NotifierData;
58 typedef TListenerData ListenerData;
62 for (
size_t i = 0; i < m_pollers.size(); ++i) {
63 if (
auto poller = m_pollers[i]) poller->Terminate();
71 std::queue<std::pair<unsigned int, NotifierData>> m_queue;
72 wpi::condition_variable m_queue_empty;
77 std::lock_guard<wpi::mutex> lock(poll_mutex);
80 poll_cond.notify_all();
82 std::queue<NotifierData> poll_queue;
83 wpi::mutex poll_mutex;
84 wpi::condition_variable poll_cond;
85 bool terminating =
false;
86 bool cancelling =
false;
91 template <
typename... Args>
92 void SendPoller(
unsigned int poller_uid, Args&&... args) {
93 if (poller_uid > m_pollers.size())
return;
94 auto poller = m_pollers[poller_uid];
97 std::lock_guard<wpi::mutex> lock(poller->poll_mutex);
98 poller->poll_queue.emplace(std::forward<Args>(args)...);
100 poller->poll_cond.notify_one();
104 template <
typename Derived,
typename TUserInfo,
typename TListenerData,
105 typename TNotifierData>
107 std::unique_lock<wpi::mutex> lock(m_mutex);
109 while (m_queue.empty()) {
111 if (!m_active)
return;
114 while (!m_queue.empty()) {
115 if (!m_active)
return;
116 auto item = std::move(m_queue.front());
118 if (item.first != UINT_MAX) {
119 if (item.first < m_listeners.size()) {
120 auto& listener = m_listeners[item.first];
122 static_cast<Derived*>(
this)->Matches(listener, item.second)) {
123 static_cast<Derived*
>(
this)->SetListener(&item.second, item.first);
124 if (listener.callback) {
126 static_cast<Derived*
>(
this)->DoCallback(listener.callback,
129 }
else if (listener.poller_uid != UINT_MAX) {
130 SendPoller(listener.poller_uid, std::move(item.second));
136 for (
size_t i = 0; i < m_listeners.size(); ++i) {
137 auto& listener = m_listeners[i];
138 if (!listener)
continue;
139 if (!static_cast<Derived*>(
this)->Matches(listener, item.second))
141 static_cast<Derived*
>(
this)->SetListener(&item.second, i);
142 if (listener.callback) {
144 static_cast<Derived*
>(
this)->DoCallback(listener.callback,
147 }
else if (listener.poller_uid != UINT_MAX) {
148 SendPoller(listener.poller_uid, item.second);
155 m_queue_empty.notify_all();
167 template <
typename Derived,
typename Thread>
169 friend class RpcServerTest;
172 void Stop() { m_owner.Stop(); }
174 void Remove(
unsigned int listener_uid) {
175 auto thr = m_owner.GetThread();
177 thr->m_listeners.erase(listener_uid);
180 unsigned int CreatePoller() {
181 static_cast<Derived*
>(
this)->Start();
182 auto thr = m_owner.GetThread();
183 return thr->m_pollers.emplace_back(
184 std::make_shared<typename Thread::Poller>());
187 void RemovePoller(
unsigned int poller_uid) {
188 auto thr = m_owner.GetThread();
192 for (
size_t i = 0; i < thr->m_listeners.size(); ++i) {
193 if (thr->m_listeners[i].poller_uid == poller_uid)
194 thr->m_listeners.erase(i);
198 if (poller_uid >= thr->m_pollers.size())
return;
199 auto poller = thr->m_pollers[poller_uid];
202 return thr->m_pollers.erase(poller_uid);
205 bool WaitForQueue(
double timeout) {
206 auto thr = m_owner.GetThread();
207 if (!thr)
return true;
209 auto& lock = thr.GetLock();
210 #if defined(_MSC_VER) && _MSC_VER < 1900 211 auto timeout_time = std::chrono::steady_clock::now() +
212 std::chrono::duration<int64_t, std::nano>(
213 static_cast<int64_t
>(timeout * 1e9));
215 auto timeout_time = std::chrono::steady_clock::now() +
216 std::chrono::duration<double>(timeout);
218 while (!thr->m_queue.empty()) {
219 if (!thr->m_active)
return true;
220 if (timeout == 0)
return false;
222 thr->m_queue_empty.wait(lock);
224 auto cond_timed_out = thr->m_queue_empty.wait_until(lock, timeout_time);
225 if (cond_timed_out == std::cv_status::timeout)
return false;
232 std::vector<typename Thread::UserInfo> Poll(
unsigned int poller_uid) {
233 bool timed_out =
false;
234 return Poll(poller_uid, -1, &timed_out);
237 std::vector<typename Thread::UserInfo> Poll(
unsigned int poller_uid,
238 double timeout,
bool* timed_out) {
239 std::vector<typename Thread::UserInfo> infos;
240 std::shared_ptr<typename Thread::Poller> poller;
242 auto thr = m_owner.GetThread();
243 if (!thr)
return infos;
244 if (poller_uid > thr->m_pollers.size())
return infos;
245 poller = thr->m_pollers[poller_uid];
246 if (!poller)
return infos;
249 std::unique_lock<wpi::mutex> lock(poller->poll_mutex);
250 #if defined(_MSC_VER) && _MSC_VER < 1900 251 auto timeout_time = std::chrono::steady_clock::now() +
252 std::chrono::duration<int64_t, std::nano>(
253 static_cast<int64_t
>(timeout * 1e9));
255 auto timeout_time = std::chrono::steady_clock::now() +
256 std::chrono::duration<double>(timeout);
259 while (poller->poll_queue.empty()) {
260 if (poller->terminating)
return infos;
261 if (poller->cancelling) {
264 poller->cancelling =
false;
272 poller->poll_cond.wait(lock);
274 auto cond_timed_out = poller->poll_cond.wait_until(lock, timeout_time);
275 if (cond_timed_out == std::cv_status::timeout) {
282 while (!poller->poll_queue.empty()) {
283 infos.emplace_back(std::move(poller->poll_queue.front()));
284 poller->poll_queue.pop();
289 void CancelPoll(
unsigned int poller_uid) {
290 std::shared_ptr<typename Thread::Poller> poller;
292 auto thr = m_owner.GetThread();
294 if (poller_uid > thr->m_pollers.size())
return;
295 poller = thr->m_pollers[poller_uid];
300 std::lock_guard<wpi::mutex> lock(poller->poll_mutex);
301 poller->cancelling =
true;
303 poller->poll_cond.notify_one();
307 template <
typename... Args>
308 void DoStart(Args&&... args) {
309 auto thr = m_owner.GetThread();
310 if (!thr) m_owner.Start(
new Thread(std::forward<Args>(args)...));
313 template <
typename... Args>
314 unsigned int DoAdd(Args&&... args) {
315 static_cast<Derived*
>(
this)->Start();
316 auto thr = m_owner.GetThread();
317 return thr->m_listeners.emplace_back(std::forward<Args>(args)...);
320 template <
typename... Args>
321 void Send(
unsigned int only_listener, Args&&... args) {
322 auto thr = m_owner.GetThread();
323 if (!thr || thr->m_listeners.empty())
return;
324 thr->m_queue.emplace(std::piecewise_construct,
325 std::make_tuple(only_listener),
326 std::forward_as_tuple(std::forward<Args>(args)...));
327 thr->m_cond.notify_one();
330 typename wpi::SafeThreadOwner<Thread>::Proxy GetThread()
const {
331 return m_owner.GetThread();
340 #endif // NTCORE_CALLBACKMANAGER_H_ Definition: SafeThread.h:20
Definition: CallbackManager.h:54
Definition: CallbackManager.h:74
Definition: IEntryNotifier.h:16
Definition: CallbackManager.h:168
Definition: CallbackManager.h:30