WPILibC++  unspecified
Dispatcher.h
1 /*----------------------------------------------------------------------------*/
2 /* Copyright (c) 2015-2018 FIRST. All Rights Reserved. */
3 /* Open Source Software - may be modified and shared by FRC teams. The code */
4 /* must be accompanied by the FIRST BSD license file in the root directory of */
5 /* the project. */
6 /*----------------------------------------------------------------------------*/
7 
8 #ifndef NTCORE_DISPATCHER_H_
9 #define NTCORE_DISPATCHER_H_
10 
11 #include <atomic>
12 #include <chrono>
13 #include <functional>
14 #include <memory>
15 #include <string>
16 #include <thread>
17 #include <utility>
18 #include <vector>
19 
20 #include <wpi/StringRef.h>
21 #include <wpi/Twine.h>
22 #include <wpi/condition_variable.h>
23 #include <wpi/mutex.h>
24 
25 #include "IDispatcher.h"
26 #include "INetworkConnection.h"
27 
28 namespace wpi {
29 class Logger;
30 class NetworkAcceptor;
31 class NetworkStream;
32 } // namespace wpi
33 
34 namespace nt {
35 
36 class IConnectionNotifier;
37 class IStorage;
38 class NetworkConnection;
39 
40 class DispatcherBase : public IDispatcher {
41  friend class DispatcherTest;
42 
43  public:
44  typedef std::function<std::unique_ptr<wpi::NetworkStream>()> Connector;
45 
46  DispatcherBase(IStorage& storage, IConnectionNotifier& notifier,
47  wpi::Logger& logger);
48  virtual ~DispatcherBase();
49 
50  unsigned int GetNetworkMode() const;
51  void StartServer(const Twine& persist_filename,
52  std::unique_ptr<wpi::NetworkAcceptor> acceptor);
53  void StartClient();
54  void Stop();
55  void SetUpdateRate(double interval);
56  void SetIdentity(const Twine& name);
57  void Flush();
58  std::vector<ConnectionInfo> GetConnections() const;
59  bool IsConnected() const;
60 
61  unsigned int AddListener(
62  std::function<void(const ConnectionNotification& event)> callback,
63  bool immediate_notify) const;
64  unsigned int AddPolledListener(unsigned int poller_uid,
65  bool immediate_notify) const;
66 
67  void SetConnector(Connector connector);
68  void SetConnectorOverride(Connector connector);
69  void ClearConnectorOverride();
70 
71  bool active() const { return m_active; }
72 
73  DispatcherBase(const DispatcherBase&) = delete;
74  DispatcherBase& operator=(const DispatcherBase&) = delete;
75 
76  private:
77  void DispatchThreadMain();
78  void ServerThreadMain();
79  void ClientThreadMain();
80 
81  bool ClientHandshake(
82  NetworkConnection& conn,
83  std::function<std::shared_ptr<Message>()> get_msg,
84  std::function<void(wpi::ArrayRef<std::shared_ptr<Message>>)> send_msgs);
85  bool ServerHandshake(
86  NetworkConnection& conn,
87  std::function<std::shared_ptr<Message>()> get_msg,
88  std::function<void(wpi::ArrayRef<std::shared_ptr<Message>>)> send_msgs);
89 
90  void ClientReconnect(unsigned int proto_rev = 0x0300);
91 
92  void QueueOutgoing(std::shared_ptr<Message> msg, INetworkConnection* only,
93  INetworkConnection* except) override;
94 
95  IStorage& m_storage;
96  IConnectionNotifier& m_notifier;
97  unsigned int m_networkMode = NT_NET_MODE_NONE;
98  std::string m_persist_filename;
99  std::thread m_dispatch_thread;
100  std::thread m_clientserver_thread;
101 
102  std::unique_ptr<wpi::NetworkAcceptor> m_server_acceptor;
103  Connector m_client_connector_override;
104  Connector m_client_connector;
105  uint8_t m_connections_uid = 0;
106 
107  // Mutex for user-accessible items
108  mutable wpi::mutex m_user_mutex;
109  std::vector<std::shared_ptr<INetworkConnection>> m_connections;
110  std::string m_identity;
111 
112  std::atomic_bool m_active; // set to false to terminate threads
113  std::atomic_uint m_update_rate; // periodic dispatch update rate, in ms
114 
115  // Condition variable for forced dispatch wakeup (flush)
116  wpi::mutex m_flush_mutex;
117  wpi::condition_variable m_flush_cv;
118  std::chrono::steady_clock::time_point m_last_flush;
119  bool m_do_flush = false;
120 
121  // Condition variable for client reconnect (uses user mutex)
122  wpi::condition_variable m_reconnect_cv;
123  unsigned int m_reconnect_proto_rev = 0x0300;
124  bool m_do_reconnect = true;
125 
126  protected:
127  wpi::Logger& m_logger;
128 };
129 
130 class Dispatcher : public DispatcherBase {
131  friend class DispatcherTest;
132 
133  public:
134  Dispatcher(IStorage& storage, IConnectionNotifier& notifier,
135  wpi::Logger& logger)
136  : DispatcherBase(storage, notifier, logger) {}
137 
138  void StartServer(const Twine& persist_filename, const char* listen_address,
139  unsigned int port);
140 
141  void SetServer(const char* server_name, unsigned int port);
142  void SetServer(ArrayRef<std::pair<StringRef, unsigned int>> servers);
143  void SetServerTeam(unsigned int team, unsigned int port);
144 
145  void SetServerOverride(const char* server_name, unsigned int port);
146  void ClearServerOverride();
147 };
148 
149 } // namespace nt
150 
151 #endif // NTCORE_DISPATCHER_H_
Definition: Dispatcher.h:40
std::vector< ConnectionInfo > GetConnections()
Get information on the currently established network connections.
Definition: ntcore_cpp.cpp:893
void SetUpdateRate(double interval)
Set the periodic update rate.
Definition: ntcore_cpp.cpp:873
void SetServer(const char *server_name, unsigned int port)
Sets server address and port for client (without restarting client).
Definition: ntcore_cpp.cpp:823
Definition: Dispatcher.h:130
void Flush()
Flush Entries.
Definition: ntcore_cpp.cpp:884
ArrayRef - Represent a constant reference to an array (0 or more elements consecutively in memory)...
Definition: ArrayRef.h:41
Definition: IConnectionNotifier.h:17
namespace to hold default to_json function
Definition: json_binary_writer.cpp:39
Definition: IStorage.h:26
Definition: NetworkConnection.h:38
bool IsConnected(NT_Inst inst)
Return whether or not the instance is connected to another node.
Definition: ntcore_cpp.cpp:904
Definition: IStorage.h:21
NetworkTables Connection Notification.
Definition: ntcore_cpp.h:212
unsigned int GetNetworkMode()
Get the current network mode.
Definition: ntcore_cpp.cpp:734
void SetServerTeam(NT_Inst inst, unsigned int team, unsigned int port)
Sets server addresses and port for client (without restarting client).
Definition: ntcore_cpp.cpp:846
void StartServer(StringRef persist_filename, const char *listen_address, unsigned int port)
Starts a server using the specified filename, listening address, and port.
Definition: ntcore_cpp.cpp:745
Definition: IDispatcher.h:21
Definition: Logger.h:30
Definition: INetworkConnection.h:18
Twine - A lightweight data structure for efficiently representing the concatenation of temporary valu...
Definition: Twine.h:79
void StartClient()
Starts a client.
Definition: ntcore_cpp.cpp:768