WPILibC++  unspecified
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Pages
NetworkConnection.h
1 /*----------------------------------------------------------------------------*/
2 /* Copyright (c) FIRST 2015. All Rights Reserved. */
3 /* Open Source Software - may be modified and shared by FRC teams. The code */
4 /* must be accompanied by the FIRST BSD license file in the root directory of */
5 /* the project. */
6 /*----------------------------------------------------------------------------*/
7 
8 #ifndef NT_NETWORKCONNECTION_H_
9 #define NT_NETWORKCONNECTION_H_
10 
11 #include <atomic>
12 #include <chrono>
13 #include <memory>
14 #include <thread>
15 
16 #include "support/ConcurrentQueue.h"
17 #include "Message.h"
18 #include "ntcore_cpp.h"
19 
20 class NetworkStream;
21 
22 namespace nt {
23 
24 class Notifier;
25 
27  public:
28  enum State { kCreated, kInit, kHandshake, kSynchronized, kActive, kDead };
29 
30  typedef std::function<bool(
31  NetworkConnection& conn,
32  std::function<std::shared_ptr<Message>()> get_msg,
33  std::function<void(llvm::ArrayRef<std::shared_ptr<Message>>)> send_msgs)>
34  HandshakeFunc;
35  typedef std::function<void(std::shared_ptr<Message> msg,
36  NetworkConnection* conn)> ProcessIncomingFunc;
37  typedef std::vector<std::shared_ptr<Message>> Outgoing;
39 
40  NetworkConnection(std::unique_ptr<NetworkStream> stream,
41  Notifier& notifier,
42  HandshakeFunc handshake,
43  Message::GetEntryTypeFunc get_entry_type);
45 
46  // Set the input processor function. This must be called before Start().
47  void set_process_incoming(ProcessIncomingFunc func) {
48  m_process_incoming = func;
49  }
50 
51  void Start();
52  void Stop();
53 
54  ConnectionInfo info() const;
55 
56  bool active() const { return m_active; }
57  NetworkStream& stream() { return *m_stream; }
58 
59  void QueueOutgoing(std::shared_ptr<Message> msg);
60  void PostOutgoing(bool keep_alive);
61 
62  unsigned int uid() const { return m_uid; }
63 
64  unsigned int proto_rev() const { return m_proto_rev; }
65  void set_proto_rev(unsigned int proto_rev) { m_proto_rev = proto_rev; }
66 
67  State state() const { return static_cast<State>(m_state.load()); }
68  void set_state(State state) { m_state = static_cast<int>(state); }
69 
70  std::string remote_id() const;
71  void set_remote_id(StringRef remote_id);
72 
73  unsigned long long last_update() const { return m_last_update; }
74 
75  NetworkConnection(const NetworkConnection&) = delete;
76  NetworkConnection& operator=(const NetworkConnection&) = delete;
77 
78  private:
79  void ReadThreadMain();
80  void WriteThreadMain();
81 
82  static std::atomic_uint s_uid;
83 
84  unsigned int m_uid;
85  std::unique_ptr<NetworkStream> m_stream;
86  Notifier& m_notifier;
87  OutgoingQueue m_outgoing;
88  HandshakeFunc m_handshake;
89  Message::GetEntryTypeFunc m_get_entry_type;
90  ProcessIncomingFunc m_process_incoming;
91  std::thread m_read_thread;
92  std::thread m_write_thread;
93  std::atomic_bool m_active;
94  std::atomic_uint m_proto_rev;
95  std::atomic_int m_state;
96  mutable std::mutex m_remote_id_mutex;
97  std::string m_remote_id;
98  std::atomic_ullong m_last_update;
99  std::chrono::steady_clock::time_point m_last_post;
100 
101  std::mutex m_pending_mutex;
102  Outgoing m_pending_outgoing;
103  std::vector<std::pair<std::size_t, std::size_t>> m_pending_update;
104 
105  // Condition variables for shutdown
106  std::mutex m_shutdown_mutex;
107  std::condition_variable m_read_shutdown_cv;
108  std::condition_variable m_write_shutdown_cv;
109  bool m_read_shutdown = false;
110  bool m_write_shutdown = false;
111 };
112 
113 } // namespace nt
114 
115 #endif // NT_NETWORKCONNECTION_H_
Definition: NetworkStream.h:15
NetworkTables Connection Information.
Definition: ntcore_cpp.h:43
ArrayRef - Represent a constant reference to an array (0 or more elements consecutively in memory)...
Definition: ArrayRef.h:54
Definition: Notifier.h:17
Definition: NetworkConnection.h:26
Definition: Notifier.h:19
StringRef - Represent a constant reference to a string, i.e.
Definition: StringRef.h:39