WPILibC++  unspecified
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Pages
Dispatcher.h
1 /*----------------------------------------------------------------------------*/
2 /* Copyright (c) FIRST 2015. All Rights Reserved. */
3 /* Open Source Software - may be modified and shared by FRC teams. The code */
4 /* must be accompanied by the FIRST BSD license file in the root directory of */
5 /* the project. */
6 /*----------------------------------------------------------------------------*/
7 
8 #ifndef NT_DISPATCHER_H_
9 #define NT_DISPATCHER_H_
10 
11 #include <atomic>
12 #include <chrono>
13 #include <condition_variable>
14 #include <functional>
15 #include <memory>
16 #include <mutex>
17 #include <string>
18 #include <vector>
19 
20 #include "llvm/StringRef.h"
21 
22 #include "atomic_static.h"
23 #include "NetworkConnection.h"
24 #include "Notifier.h"
25 #include "Storage.h"
26 
27 class NetworkAcceptor;
28 class NetworkStream;
29 
30 namespace nt {
31 
33  friend class DispatcherTest;
34  public:
35  virtual ~DispatcherBase();
36 
37  void StartServer(StringRef persist_filename,
38  std::unique_ptr<NetworkAcceptor> acceptor);
39  void StartClient(std::function<std::unique_ptr<NetworkStream>()> connect);
40  void Stop();
41  void SetUpdateRate(double interval);
42  void SetIdentity(llvm::StringRef name);
43  void Flush();
44  std::vector<ConnectionInfo> GetConnections() const;
45  void NotifyConnections(ConnectionListenerCallback callback) const;
46 
47  bool active() const { return m_active; }
48 
49  DispatcherBase(const DispatcherBase&) = delete;
50  DispatcherBase& operator=(const DispatcherBase&) = delete;
51 
52  protected:
53  DispatcherBase(Storage& storage, Notifier& notifier);
54 
55  private:
56  void DispatchThreadMain();
57  void ServerThreadMain();
58  void ClientThreadMain(
59  std::function<std::unique_ptr<NetworkStream>()> connect);
60 
61  bool ClientHandshake(
62  NetworkConnection& conn,
63  std::function<std::shared_ptr<Message>()> get_msg,
64  std::function<void(llvm::ArrayRef<std::shared_ptr<Message>>)> send_msgs);
65  bool ServerHandshake(
66  NetworkConnection& conn,
67  std::function<std::shared_ptr<Message>()> get_msg,
68  std::function<void(llvm::ArrayRef<std::shared_ptr<Message>>)> send_msgs);
69 
70  void ClientReconnect(unsigned int proto_rev = 0x0300);
71 
72  void QueueOutgoing(std::shared_ptr<Message> msg, NetworkConnection* only,
73  NetworkConnection* except);
74 
75  Storage& m_storage;
76  Notifier& m_notifier;
77  bool m_server = false;
78  std::string m_persist_filename;
79  std::thread m_dispatch_thread;
80  std::thread m_clientserver_thread;
81 
82  std::unique_ptr<NetworkAcceptor> m_server_acceptor;
83 
84  // Mutex for user-accessible items
85  mutable std::mutex m_user_mutex;
86  std::vector<std::shared_ptr<NetworkConnection>> m_connections;
87  std::string m_identity;
88 
89  std::atomic_bool m_active; // set to false to terminate threads
90  std::atomic_uint m_update_rate; // periodic dispatch update rate, in ms
91 
92  // Condition variable for forced dispatch wakeup (flush)
93  std::mutex m_flush_mutex;
94  std::condition_variable m_flush_cv;
95  std::chrono::steady_clock::time_point m_last_flush;
96  bool m_do_flush = false;
97 
98  // Condition variable for client reconnect (uses user mutex)
99  std::condition_variable m_reconnect_cv;
100  unsigned int m_reconnect_proto_rev = 0x0300;
101  bool m_do_reconnect = true;
102 };
103 
104 class Dispatcher : public DispatcherBase {
105  friend class DispatcherTest;
106  public:
107  static Dispatcher& GetInstance() {
108  ATOMIC_STATIC(Dispatcher, instance);
109  return instance;
110  }
111 
112  void StartServer(StringRef persist_filename, const char* listen_address,
113  unsigned int port);
114  void StartClient(const char* server_name, unsigned int port);
115 
116  private:
117  Dispatcher();
118  Dispatcher(Storage& storage, Notifier& notifier)
119  : DispatcherBase(storage, notifier) {}
120 
121  ATOMIC_STATIC_DECL(Dispatcher)
122 };
123 
124 
125 } // namespace nt
126 
127 #endif // NT_DISPATCHER_H_
Definition: Dispatcher.h:32
Definition: NetworkStream.h:15
Definition: NetworkAcceptor.h:13
Definition: Dispatcher.h:104
Definition: Storage.h:34
ArrayRef - Represent a constant reference to an array (0 or more elements consecutively in memory)...
Definition: ArrayRef.h:54
Definition: NetworkConnection.h:26
Definition: Notifier.h:19
StringRef - Represent a constant reference to a string, i.e.
Definition: StringRef.h:39