WPILibC++  unspecified
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Pages
Dispatcher.h
1 /*----------------------------------------------------------------------------*/
2 /* Copyright (c) FIRST 2015. All Rights Reserved. */
3 /* Open Source Software - may be modified and shared by FRC teams. The code */
4 /* must be accompanied by the FIRST BSD license file in the root directory of */
5 /* the project. */
6 /*----------------------------------------------------------------------------*/
7 
8 #ifndef NT_DISPATCHER_H_
9 #define NT_DISPATCHER_H_
10 
11 #include <atomic>
12 #include <chrono>
13 #include <condition_variable>
14 #include <functional>
15 #include <memory>
16 #include <mutex>
17 #include <string>
18 #include <vector>
19 
20 #include "llvm/StringRef.h"
21 
22 #include "support/atomic_static.h"
23 #include "NetworkConnection.h"
24 #include "Notifier.h"
25 #include "Storage.h"
26 
27 namespace wpi {
28 class NetworkAcceptor;
29 class NetworkStream;
30 }
31 
32 namespace nt {
33 
35  friend class DispatcherTest;
36 
37  public:
38  typedef std::function<std::unique_ptr<wpi::NetworkStream>()> Connector;
39 
40  virtual ~DispatcherBase();
41 
42  unsigned int GetNetworkMode() const;
43  void StartServer(llvm::StringRef persist_filename,
44  std::unique_ptr<wpi::NetworkAcceptor> acceptor);
45  void StartClient();
46  void Stop();
47  void SetUpdateRate(double interval);
48  void SetIdentity(llvm::StringRef name);
49  void Flush();
50  std::vector<ConnectionInfo> GetConnections() const;
51  void NotifyConnections(ConnectionListenerCallback callback) const;
52 
53  void SetConnector(Connector connector);
54  void SetConnector(std::vector<Connector>&& connectors);
55 
56  void SetConnectorOverride(Connector connector);
57  void ClearConnectorOverride();
58 
59  bool active() const { return m_active; }
60 
61  DispatcherBase(const DispatcherBase&) = delete;
62  DispatcherBase& operator=(const DispatcherBase&) = delete;
63 
64  protected:
65  DispatcherBase(Storage& storage, Notifier& notifier);
66 
67  private:
68  void DispatchThreadMain();
69  void ServerThreadMain();
70  void ClientThreadMain();
71 
72  bool ClientHandshake(
73  NetworkConnection& conn,
74  std::function<std::shared_ptr<Message>()> get_msg,
75  std::function<void(llvm::ArrayRef<std::shared_ptr<Message>>)> send_msgs);
76  bool ServerHandshake(
77  NetworkConnection& conn,
78  std::function<std::shared_ptr<Message>()> get_msg,
79  std::function<void(llvm::ArrayRef<std::shared_ptr<Message>>)> send_msgs);
80 
81  void ClientReconnect(unsigned int proto_rev = 0x0300);
82 
83  void QueueOutgoing(std::shared_ptr<Message> msg, NetworkConnection* only,
84  NetworkConnection* except);
85 
86  Storage& m_storage;
87  Notifier& m_notifier;
88  unsigned int m_networkMode = NT_NET_MODE_NONE;
89  std::string m_persist_filename;
90  std::thread m_dispatch_thread;
91  std::thread m_clientserver_thread;
92 
93  std::unique_ptr<wpi::NetworkAcceptor> m_server_acceptor;
94  Connector m_client_connector_override;
95  std::vector<Connector> m_client_connectors;
96 
97  // Mutex for user-accessible items
98  mutable std::mutex m_user_mutex;
99  std::vector<std::shared_ptr<NetworkConnection>> m_connections;
100  std::string m_identity;
101 
102  std::atomic_bool m_active; // set to false to terminate threads
103  std::atomic_uint m_update_rate; // periodic dispatch update rate, in ms
104 
105  // Condition variable for forced dispatch wakeup (flush)
106  std::mutex m_flush_mutex;
107  std::condition_variable m_flush_cv;
108  std::chrono::steady_clock::time_point m_last_flush;
109  bool m_do_flush = false;
110 
111  // Condition variable for client reconnect (uses user mutex)
112  std::condition_variable m_reconnect_cv;
113  unsigned int m_reconnect_proto_rev = 0x0300;
114  bool m_do_reconnect = true;
115 };
116 
117 class Dispatcher : public DispatcherBase {
118  friend class DispatcherTest;
119 
120  public:
121  static Dispatcher& GetInstance() {
122  ATOMIC_STATIC(Dispatcher, instance);
123  return instance;
124  }
125 
126  void StartServer(StringRef persist_filename, const char* listen_address,
127  unsigned int port);
128 
129  void SetServer(const char* server_name, unsigned int port);
130  void SetServer(ArrayRef<std::pair<StringRef, unsigned int>> servers);
131 
132  void SetServerOverride(const char* server_name, unsigned int port);
133  void ClearServerOverride();
134 
135  private:
136  Dispatcher();
137  Dispatcher(Storage& storage, Notifier& notifier)
138  : DispatcherBase(storage, notifier) {}
139 
140  ATOMIC_STATIC_DECL(Dispatcher)
141 };
142 
143 } // namespace nt
144 
145 #endif // NT_DISPATCHER_H_
Definition: Dispatcher.h:34
Definition: Dispatcher.h:117
Definition: Storage.h:35
Definition: NetworkConnection.h:28
Definition: Notifier.h:19