WPILibC++  unspecified
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Pages
Dispatcher.h
1 /*----------------------------------------------------------------------------*/
2 /* Copyright (c) FIRST 2015. All Rights Reserved. */
3 /* Open Source Software - may be modified and shared by FRC teams. The code */
4 /* must be accompanied by the FIRST BSD license file in the root directory of */
5 /* the project. */
6 /*----------------------------------------------------------------------------*/
7 
8 #ifndef NT_DISPATCHER_H_
9 #define NT_DISPATCHER_H_
10 
11 #include <atomic>
12 #include <chrono>
13 #include <condition_variable>
14 #include <functional>
15 #include <memory>
16 #include <mutex>
17 #include <string>
18 #include <vector>
19 
20 #include "llvm/StringRef.h"
21 
22 #include "support/atomic_static.h"
23 #include "NetworkConnection.h"
24 #include "Notifier.h"
25 #include "Storage.h"
26 
27 namespace wpi {
28 class NetworkAcceptor;
29 class NetworkStream;
30 }
31 
32 namespace nt {
33 
35  friend class DispatcherTest;
36 
37  public:
38  typedef std::function<std::unique_ptr<wpi::NetworkStream>()> Connector;
39 
40  virtual ~DispatcherBase();
41 
42  void StartServer(llvm::StringRef persist_filename,
43  std::unique_ptr<wpi::NetworkAcceptor> acceptor);
44  void StartClient();
45  void Stop();
46  void SetUpdateRate(double interval);
47  void SetIdentity(llvm::StringRef name);
48  void Flush();
49  std::vector<ConnectionInfo> GetConnections() const;
50  void NotifyConnections(ConnectionListenerCallback callback) const;
51 
52  void SetConnector(Connector connector);
53  void SetConnector(std::vector<Connector>&& connectors);
54 
55  void SetConnectorOverride(Connector connector);
56  void ClearConnectorOverride();
57 
58  bool active() const { return m_active; }
59 
60  DispatcherBase(const DispatcherBase&) = delete;
61  DispatcherBase& operator=(const DispatcherBase&) = delete;
62 
63  protected:
64  DispatcherBase(Storage& storage, Notifier& notifier);
65 
66  private:
67  void DispatchThreadMain();
68  void ServerThreadMain();
69  void ClientThreadMain();
70 
71  bool ClientHandshake(
72  NetworkConnection& conn,
73  std::function<std::shared_ptr<Message>()> get_msg,
74  std::function<void(llvm::ArrayRef<std::shared_ptr<Message>>)> send_msgs);
75  bool ServerHandshake(
76  NetworkConnection& conn,
77  std::function<std::shared_ptr<Message>()> get_msg,
78  std::function<void(llvm::ArrayRef<std::shared_ptr<Message>>)> send_msgs);
79 
80  void ClientReconnect(unsigned int proto_rev = 0x0300);
81 
82  void QueueOutgoing(std::shared_ptr<Message> msg, NetworkConnection* only,
83  NetworkConnection* except);
84 
85  Storage& m_storage;
86  Notifier& m_notifier;
87  bool m_server = false;
88  std::string m_persist_filename;
89  std::thread m_dispatch_thread;
90  std::thread m_clientserver_thread;
91 
92  std::unique_ptr<wpi::NetworkAcceptor> m_server_acceptor;
93  Connector m_client_connector_override;
94  std::vector<Connector> m_client_connectors;
95 
96  // Mutex for user-accessible items
97  mutable std::mutex m_user_mutex;
98  std::vector<std::shared_ptr<NetworkConnection>> m_connections;
99  std::string m_identity;
100 
101  std::atomic_bool m_active; // set to false to terminate threads
102  std::atomic_uint m_update_rate; // periodic dispatch update rate, in ms
103 
104  // Condition variable for forced dispatch wakeup (flush)
105  std::mutex m_flush_mutex;
106  std::condition_variable m_flush_cv;
107  std::chrono::steady_clock::time_point m_last_flush;
108  bool m_do_flush = false;
109 
110  // Condition variable for client reconnect (uses user mutex)
111  std::condition_variable m_reconnect_cv;
112  unsigned int m_reconnect_proto_rev = 0x0300;
113  bool m_do_reconnect = true;
114 };
115 
116 class Dispatcher : public DispatcherBase {
117  friend class DispatcherTest;
118 
119  public:
120  static Dispatcher& GetInstance() {
121  ATOMIC_STATIC(Dispatcher, instance);
122  return instance;
123  }
124 
125  void StartServer(StringRef persist_filename, const char* listen_address,
126  unsigned int port);
127 
128  void SetServer(const char* server_name, unsigned int port);
129  void SetServer(ArrayRef<std::pair<StringRef, unsigned int>> servers);
130 
131  void SetServerOverride(const char* server_name, unsigned int port);
132  void ClearServerOverride();
133 
134  private:
135  Dispatcher();
136  Dispatcher(Storage& storage, Notifier& notifier)
137  : DispatcherBase(storage, notifier) {}
138 
139  ATOMIC_STATIC_DECL(Dispatcher)
140 };
141 
142 } // namespace nt
143 
144 #endif // NT_DISPATCHER_H_
Definition: Dispatcher.h:34
Definition: Dispatcher.h:116
Definition: Storage.h:35
Definition: NetworkConnection.h:28
Definition: Notifier.h:19