WPILibC++  unspecified
Storage.h
1 /*----------------------------------------------------------------------------*/
2 /* Copyright (c) FIRST 2015-2018. All Rights Reserved. */
3 /* Open Source Software - may be modified and shared by FRC teams. The code */
4 /* must be accompanied by the FIRST BSD license file in the root directory of */
5 /* the project. */
6 /*----------------------------------------------------------------------------*/
7 
8 #ifndef NTCORE_STORAGE_H_
9 #define NTCORE_STORAGE_H_
10 
11 #include <stdint.h>
12 
13 #include <atomic>
14 #include <cstddef>
15 #include <functional>
16 #include <memory>
17 #include <string>
18 #include <utility>
19 #include <vector>
20 
21 #include <llvm/DenseMap.h>
22 #include <llvm/SmallSet.h>
23 #include <llvm/StringMap.h>
24 #include <support/condition_variable.h>
25 #include <support/mutex.h>
26 
27 #include "IStorage.h"
28 #include "Message.h"
29 #include "SequenceNumber.h"
30 #include "ntcore_cpp.h"
31 
32 namespace llvm {
33 class raw_ostream;
34 } // namespace llvm
35 
36 namespace wpi {
37 class Logger;
38 class raw_istream;
39 } // namespace wpi
40 
41 namespace nt {
42 
43 class IEntryNotifier;
44 class INetworkConnection;
45 class IRpcServer;
46 class IStorageTest;
47 
48 class Storage : public IStorage {
49  friend class StorageTest;
50 
51  public:
52  Storage(IEntryNotifier& notifier, IRpcServer& rpcserver, wpi::Logger& logger);
53  Storage(const Storage&) = delete;
54  Storage& operator=(const Storage&) = delete;
55 
56  ~Storage();
57 
58  // Accessors required by Dispatcher. An interface is used for
59  // generation of outgoing messages to break a dependency loop between
60  // Storage and Dispatcher.
61  void SetDispatcher(IDispatcher* dispatcher, bool server) override;
62  void ClearDispatcher() override;
63 
64  // Required for wire protocol 2.0 to get the entry type of an entry when
65  // receiving entry updates (because the length/type is not provided in the
66  // message itself). Not used in wire protocol 3.0.
67  NT_Type GetMessageEntryType(unsigned int id) const override;
68 
69  void ProcessIncoming(std::shared_ptr<Message> msg, INetworkConnection* conn,
70  std::weak_ptr<INetworkConnection> conn_weak) override;
71  void GetInitialAssignments(
72  INetworkConnection& conn,
73  std::vector<std::shared_ptr<Message>>* msgs) override;
74  void ApplyInitialAssignments(
75  INetworkConnection& conn, llvm::ArrayRef<std::shared_ptr<Message>> msgs,
76  bool new_server,
77  std::vector<std::shared_ptr<Message>>* out_msgs) override;
78 
79  // User functions. These are the actual implementations of the corresponding
80  // user API functions in ntcore_cpp.
81  std::shared_ptr<Value> GetEntryValue(StringRef name) const;
82  std::shared_ptr<Value> GetEntryValue(unsigned int local_id) const;
83 
84  bool SetDefaultEntryValue(StringRef name, std::shared_ptr<Value> value);
85  bool SetDefaultEntryValue(unsigned int local_id,
86  std::shared_ptr<Value> value);
87 
88  bool SetEntryValue(StringRef name, std::shared_ptr<Value> value);
89  bool SetEntryValue(unsigned int local_id, std::shared_ptr<Value> value);
90 
91  void SetEntryTypeValue(StringRef name, std::shared_ptr<Value> value);
92  void SetEntryTypeValue(unsigned int local_id, std::shared_ptr<Value> value);
93 
94  void SetEntryFlags(StringRef name, unsigned int flags);
95  void SetEntryFlags(unsigned int local_id, unsigned int flags);
96 
97  unsigned int GetEntryFlags(StringRef name) const;
98  unsigned int GetEntryFlags(unsigned int local_id) const;
99 
100  void DeleteEntry(StringRef name);
101  void DeleteEntry(unsigned int local_id);
102 
103  void DeleteAllEntries();
104 
105  std::vector<EntryInfo> GetEntryInfo(int inst, const Twine& prefix,
106  unsigned int types);
107 
108  unsigned int AddListener(
109  const Twine& prefix,
110  std::function<void(const EntryNotification& event)> callback,
111  unsigned int flags) const;
112  unsigned int AddListener(
113  unsigned int local_id,
114  std::function<void(const EntryNotification& event)> callback,
115  unsigned int flags) const;
116 
117  unsigned int AddPolledListener(unsigned int poller_uid, const Twine& prefix,
118  unsigned int flags) const;
119  unsigned int AddPolledListener(unsigned int poller_uid, unsigned int local_id,
120  unsigned int flags) const;
121 
122  // Index-only
123  unsigned int GetEntry(const Twine& name);
124  std::vector<unsigned int> GetEntries(const Twine& prefix, unsigned int types);
125  EntryInfo GetEntryInfo(int inst, unsigned int local_id) const;
126  std::string GetEntryName(unsigned int local_id) const;
127  NT_Type GetEntryType(unsigned int local_id) const;
128  uint64_t GetEntryLastChange(unsigned int local_id) const;
129 
130  // Filename-based save/load functions. Used both by periodic saves and
131  // accessible directly via the user API.
132  const char* SavePersistent(const Twine& filename,
133  bool periodic) const override;
134  const char* LoadPersistent(
135  const Twine& filename,
136  std::function<void(size_t line, const char* msg)> warn) override;
137 
138  const char* SaveEntries(const Twine& filename, const Twine& prefix) const;
139  const char* LoadEntries(
140  const Twine& filename, const Twine& prefix,
141  std::function<void(size_t line, const char* msg)> warn);
142 
143  // Stream-based save/load functions (exposed for testing purposes). These
144  // implement the guts of the filename-based functions.
145  void SavePersistent(llvm::raw_ostream& os, bool periodic) const;
146  bool LoadEntries(wpi::raw_istream& is, const Twine& prefix, bool persistent,
147  std::function<void(size_t line, const char* msg)> warn);
148 
149  void SaveEntries(llvm::raw_ostream& os, const Twine& prefix) const;
150 
151  // RPC configuration needs to come through here as RPC definitions are
152  // actually special Storage value types.
153  void CreateRpc(unsigned int local_id, StringRef def, unsigned int rpc_uid);
154  unsigned int CallRpc(unsigned int local_id, StringRef params);
155  bool GetRpcResult(unsigned int local_id, unsigned int call_uid,
156  std::string* result);
157  bool GetRpcResult(unsigned int local_id, unsigned int call_uid,
158  std::string* result, double timeout, bool* timed_out);
159  void CancelRpcResult(unsigned int local_id, unsigned int call_uid);
160 
161  private:
162  // Data for each table entry.
163  struct Entry {
164  explicit Entry(llvm::StringRef name_) : name(name_) {}
165  bool IsPersistent() const { return (flags & NT_PERSISTENT) != 0; }
166 
167  // We redundantly store the name so that it's available when accessing the
168  // raw Entry* via the ID map.
169  std::string name;
170 
171  // The current value and flags.
172  std::shared_ptr<Value> value;
173  unsigned int flags{0};
174 
175  // Unique ID for this entry as used in network messages. The value is
176  // assigned by the server, so on the client this is 0xffff until an
177  // entry assignment is received back from the server.
178  unsigned int id{0xffff};
179 
180  // Local ID.
181  unsigned int local_id{UINT_MAX};
182 
183  // Sequence number for update resolution.
184  SequenceNumber seq_num;
185 
186  // If value has been written locally. Used during initial handshake
187  // on client to determine whether or not to accept remote changes.
188  bool local_write{false};
189 
190  // RPC handle.
191  unsigned int rpc_uid{UINT_MAX};
192 
193  // Last UID used when calling this RPC (primarily for client use). This
194  // is incremented for each call.
195  unsigned int rpc_call_uid{0};
196  };
197 
199  typedef std::vector<Entry*> IdMap;
200  typedef std::vector<std::unique_ptr<Entry>> LocalMap;
201  typedef std::pair<unsigned int, unsigned int> RpcIdPair;
204 
205  mutable wpi::mutex m_mutex;
206  EntriesMap m_entries;
207  IdMap m_idmap;
208  LocalMap m_localmap;
209  RpcResultMap m_rpc_results;
210  RpcBlockingCallSet m_rpc_blocking_calls;
211  // If any persistent values have changed
212  mutable bool m_persistent_dirty = false;
213 
214  // condition variable and termination flag for blocking on a RPC result
215  std::atomic_bool m_terminating;
216  wpi::condition_variable m_rpc_results_cond;
217 
218  // configured by dispatcher at startup
219  IDispatcher* m_dispatcher = nullptr;
220  bool m_server = true;
221 
222  IEntryNotifier& m_notifier;
223  IRpcServer& m_rpc_server;
224  wpi::Logger& m_logger;
225 
226  void ProcessIncomingEntryAssign(std::shared_ptr<Message> msg,
227  INetworkConnection* conn);
228  void ProcessIncomingEntryUpdate(std::shared_ptr<Message> msg,
229  INetworkConnection* conn);
230  void ProcessIncomingFlagsUpdate(std::shared_ptr<Message> msg,
231  INetworkConnection* conn);
232  void ProcessIncomingEntryDelete(std::shared_ptr<Message> msg,
233  INetworkConnection* conn);
234  void ProcessIncomingClearEntries(std::shared_ptr<Message> msg,
235  INetworkConnection* conn);
236  void ProcessIncomingExecuteRpc(std::shared_ptr<Message> msg,
237  INetworkConnection* conn,
238  std::weak_ptr<INetworkConnection> conn_weak);
239  void ProcessIncomingRpcResponse(std::shared_ptr<Message> msg,
240  INetworkConnection* conn);
241 
242  bool GetPersistentEntries(
243  bool periodic,
244  std::vector<std::pair<std::string, std::shared_ptr<Value>>>* entries)
245  const;
246  bool GetEntries(const Twine& prefix,
247  std::vector<std::pair<std::string, std::shared_ptr<Value>>>*
248  entries) const;
249  void SetEntryValueImpl(Entry* entry, std::shared_ptr<Value> value,
250  std::unique_lock<wpi::mutex>& lock, bool local);
251  void SetEntryFlagsImpl(Entry* entry, unsigned int flags,
252  std::unique_lock<wpi::mutex>& lock, bool local);
253  void DeleteEntryImpl(Entry* entry, std::unique_lock<wpi::mutex>& lock,
254  bool local);
255 
256  // Must be called with m_mutex held
257  template <typename F>
258  void DeleteAllEntriesImpl(bool local, F should_delete);
259  void DeleteAllEntriesImpl(bool local);
260  Entry* GetOrNew(const Twine& name);
261 };
262 
263 } // namespace nt
264 
265 #endif // NTCORE_STORAGE_H_
const char * SaveEntries(NT_Inst inst, const Twine &filename, const Twine &prefix)
Save table values to a file.
Definition: ntcore_cpp.cpp:941
void SetEntryFlags(StringRef name, unsigned int flags)
Set Entry Flags.
Definition: ntcore_cpp.cpp:157
Definition: Path.inc:31
bool SetEntryValue(StringRef name, std::shared_ptr< Value > value)
Set Entry Value.
Definition: ntcore_cpp.cpp:131
unsigned int GetEntryFlags(StringRef name)
Get Entry Flags.
Definition: ntcore_cpp.cpp:170
bool SetDefaultEntryValue(StringRef name, std::shared_ptr< Value > value)
Set Default Entry Value Returns copy of current entry value if it exists.
Definition: ntcore_cpp.cpp:118
NetworkTables Entry Notification.
Definition: ntcore_cpp.h:169
Definition: Storage.h:48
Twine - A lightweight data structure for efficiently representing the concatenation of temporary valu...
Definition: Twine.h:79
Definition: SocketError.cpp:17
Definition: IStorage.h:26
void CancelRpcResult(NT_Entry entry, NT_RpcCall call)
Ignore the result of a RPC call.
Definition: ntcore_cpp.cpp:619
ArrayRef - Represent a constant reference to an array (0 or more elements consecutively in memory)...
Definition: ArrayRef.h:32
NT_Entry GetEntry(NT_Inst inst, const Twine &name)
Get Entry Handle.
Definition: ntcore_cpp.cpp:56
const char * LoadEntries(NT_Inst inst, const Twine &filename, const Twine &prefix, std::function< void(size_t line, const char *msg)> warn)
Load table values from a file.
Definition: ntcore_cpp.cpp:949
NT_RpcCall CallRpc(NT_Entry entry, StringRef params)
Call a RPC function.
Definition: ntcore_cpp.cpp:577
void SetEntryTypeValue(StringRef name, std::shared_ptr< Value > value)
Set Entry Type and Value.
Definition: ntcore_cpp.cpp:144
const char * LoadPersistent(StringRef filename, std::function< void(size_t line, const char *msg)> warn)
Load persistent values from a file.
Definition: ntcore_cpp.cpp:926
void CreateRpc(NT_Entry entry, StringRef def, std::function< void(const RpcAnswer &answer)> callback)
Create a callback-based RPC entry point.
Definition: ntcore_cpp.cpp:478
Definition: raw_istream.h:21
std::vector< NT_Entry > GetEntries(NT_Inst inst, const Twine &prefix, unsigned int types)
Get Entry Handles.
Definition: ntcore_cpp.cpp:66
Definition: SequenceNumber.h:14
Definition: IEntryNotifier.h:16
NT_Type GetEntryType(NT_Entry entry)
Gets the type for the specified entry, or unassigned if non existent.
Definition: ntcore_cpp.cpp:87
NetworkTables Entry Information.
Definition: ntcore_cpp.h:34
std::vector< EntryInfo > GetEntryInfo(StringRef prefix, unsigned int types)
Get Entry Information.
Definition: ntcore_cpp.cpp:208
const char * SavePersistent(StringRef filename)
Save persistent values to a file.
Definition: ntcore_cpp.cpp:915
Definition: IDispatcher.h:21
void DeleteAllEntries()
Delete All Entries.
Definition: ntcore_cpp.cpp:196
std::shared_ptr< Value > GetEntryValue(StringRef name)
Get Entry Value.
Definition: ntcore_cpp.cpp:105
Definition: Logger.h:30
Definition: IRpcServer.h:18
uint64_t GetEntryLastChange(NT_Entry entry)
Gets the last time the entry was changed.
Definition: ntcore_cpp.cpp:96
Definition: INetworkConnection.h:18
bool GetRpcResult(NT_Entry entry, NT_RpcCall call, std::string *result)
Get the result (return value) of a RPC call.
Definition: ntcore_cpp.cpp:589
void DeleteEntry(StringRef name)
Delete Entry.
Definition: ntcore_cpp.cpp:183
This class implements an extremely fast bulk output stream that can only output to a stream...
Definition: raw_ostream.h:33
StringRef - Represent a constant reference to a string, i.e.
Definition: StringRef.h:42
Definition: IEntryNotifier.h:18
std::string GetEntryName(NT_Entry entry)
Gets the name of the specified entry.
Definition: ntcore_cpp.cpp:78