WPILibC++  unspecified
Storage.h
1 /*----------------------------------------------------------------------------*/
2 /* Copyright (c) FIRST 2015. All Rights Reserved. */
3 /* Open Source Software - may be modified and shared by FRC teams. The code */
4 /* must be accompanied by the FIRST BSD license file in the root directory of */
5 /* the project. */
6 /*----------------------------------------------------------------------------*/
7 
8 #ifndef NT_STORAGE_H_
9 #define NT_STORAGE_H_
10 
11 #include <atomic>
12 #include <condition_variable>
13 #include <cstddef>
14 #include <functional>
15 #include <memory>
16 #include <mutex>
17 
18 #include "llvm/DenseMap.h"
19 #include "llvm/SmallSet.h"
20 #include "llvm/StringMap.h"
21 #include "Message.h"
22 #include "ntcore_cpp.h"
23 #include "SequenceNumber.h"
24 
25 #include "IStorage.h"
26 
27 namespace llvm {
28 class raw_ostream;
29 }
30 
31 namespace wpi {
32 class Logger;
33 class raw_istream;
34 }
35 
36 namespace nt {
37 
38 class IEntryNotifier;
39 class INetworkConnection;
40 class IRpcServer;
41 class IStorageTest;
42 
43 class Storage : public IStorage {
44  friend class StorageTest;
45 
46  public:
47  Storage(IEntryNotifier& notifier, IRpcServer& rpcserver, wpi::Logger& logger);
48  Storage(const Storage&) = delete;
49  Storage& operator=(const Storage&) = delete;
50 
51  ~Storage();
52 
53  // Accessors required by Dispatcher. An interface is used for
54  // generation of outgoing messages to break a dependency loop between
55  // Storage and Dispatcher.
56  void SetDispatcher(IDispatcher* dispatcher, bool server) override;
57  void ClearDispatcher() override;
58 
59  // Required for wire protocol 2.0 to get the entry type of an entry when
60  // receiving entry updates (because the length/type is not provided in the
61  // message itself). Not used in wire protocol 3.0.
62  NT_Type GetMessageEntryType(unsigned int id) const override;
63 
64  void ProcessIncoming(std::shared_ptr<Message> msg, INetworkConnection* conn,
65  std::weak_ptr<INetworkConnection> conn_weak) override;
66  void GetInitialAssignments(
67  INetworkConnection& conn,
68  std::vector<std::shared_ptr<Message>>* msgs) override;
69  void ApplyInitialAssignments(
70  INetworkConnection& conn, llvm::ArrayRef<std::shared_ptr<Message>> msgs,
71  bool new_server,
72  std::vector<std::shared_ptr<Message>>* out_msgs) override;
73 
74  // User functions. These are the actual implementations of the corresponding
75  // user API functions in ntcore_cpp.
76  std::shared_ptr<Value> GetEntryValue(StringRef name) const;
77  std::shared_ptr<Value> GetEntryValue(unsigned int local_id) const;
78 
79  bool SetDefaultEntryValue(StringRef name, std::shared_ptr<Value> value);
80  bool SetDefaultEntryValue(unsigned int local_id,
81  std::shared_ptr<Value> value);
82 
83  bool SetEntryValue(StringRef name, std::shared_ptr<Value> value);
84  bool SetEntryValue(unsigned int local_id, std::shared_ptr<Value> value);
85 
86  void SetEntryTypeValue(StringRef name, std::shared_ptr<Value> value);
87  void SetEntryTypeValue(unsigned int local_id, std::shared_ptr<Value> value);
88 
89  void SetEntryFlags(StringRef name, unsigned int flags);
90  void SetEntryFlags(unsigned int local_id, unsigned int flags);
91 
92  unsigned int GetEntryFlags(StringRef name) const;
93  unsigned int GetEntryFlags(unsigned int local_id) const;
94 
95  void DeleteEntry(StringRef name);
96  void DeleteEntry(unsigned int local_id);
97 
98  void DeleteAllEntries();
99 
100  std::vector<EntryInfo> GetEntryInfo(int inst, StringRef prefix,
101  unsigned int types);
102 
103  unsigned int AddListener(
104  StringRef prefix,
105  std::function<void(const EntryNotification& event)> callback,
106  unsigned int flags) const;
107  unsigned int AddListener(
108  unsigned int local_id,
109  std::function<void(const EntryNotification& event)> callback,
110  unsigned int flags) const;
111 
112  unsigned int AddPolledListener(unsigned int poller_uid, StringRef prefix,
113  unsigned int flags) const;
114  unsigned int AddPolledListener(unsigned int poller_uid, unsigned int local_id,
115  unsigned int flags) const;
116 
117  // Index-only
118  unsigned int GetEntry(StringRef name);
119  std::vector<unsigned int> GetEntries(StringRef prefix, unsigned int types);
120  EntryInfo GetEntryInfo(int inst, unsigned int local_id) const;
121  std::string GetEntryName(unsigned int local_id) const;
122  NT_Type GetEntryType(unsigned int local_id) const;
123  unsigned long long GetEntryLastChange(unsigned int local_id) const;
124 
125  // Filename-based save/load functions. Used both by periodic saves and
126  // accessible directly via the user API.
127  const char* SavePersistent(StringRef filename, bool periodic) const override;
128  const char* LoadPersistent(
129  StringRef filename,
130  std::function<void(std::size_t line, const char* msg)> warn) override;
131 
132  const char* SaveEntries(StringRef filename, StringRef prefix) const;
133  const char* LoadEntries(
134  StringRef filename, StringRef prefix,
135  std::function<void(std::size_t line, const char* msg)> warn);
136 
137  // Stream-based save/load functions (exposed for testing purposes). These
138  // implement the guts of the filename-based functions.
139  void SavePersistent(llvm::raw_ostream& os, bool periodic) const;
140  bool LoadEntries(wpi::raw_istream& is, StringRef prefix, bool persistent,
141  std::function<void(std::size_t line, const char* msg)> warn);
142 
143  void SaveEntries(llvm::raw_ostream& os, StringRef prefix) const;
144 
145  // RPC configuration needs to come through here as RPC definitions are
146  // actually special Storage value types.
147  void CreateRpc(unsigned int local_id, StringRef def, unsigned int rpc_uid);
148  unsigned int CallRpc(unsigned int local_id, StringRef params);
149  bool GetRpcResult(unsigned int local_id, unsigned int call_uid,
150  std::string* result);
151  bool GetRpcResult(unsigned int local_id, unsigned int call_uid,
152  std::string* result, double timeout, bool* timed_out);
153  void CancelRpcResult(unsigned int local_id, unsigned int call_uid);
154 
155  private:
156  // Data for each table entry.
157  struct Entry {
158  Entry(llvm::StringRef name_) : name(name_) {}
159  bool IsPersistent() const { return (flags & NT_PERSISTENT) != 0; }
160 
161  // We redundantly store the name so that it's available when accessing the
162  // raw Entry* via the ID map.
163  std::string name;
164 
165  // The current value and flags.
166  std::shared_ptr<Value> value;
167  unsigned int flags{0};
168 
169  // Unique ID for this entry as used in network messages. The value is
170  // assigned by the server, so on the client this is 0xffff until an
171  // entry assignment is received back from the server.
172  unsigned int id{0xffff};
173 
174  // Local ID.
175  unsigned int local_id{UINT_MAX};
176 
177  // Sequence number for update resolution.
178  SequenceNumber seq_num;
179 
180  // If value has been written locally. Used during initial handshake
181  // on client to determine whether or not to accept remote changes.
182  bool local_write{false};
183 
184  // RPC handle.
185  unsigned int rpc_uid{UINT_MAX};
186 
187  // Last UID used when calling this RPC (primarily for client use). This
188  // is incremented for each call.
189  unsigned int rpc_call_uid{0};
190  };
191 
193  typedef std::vector<Entry*> IdMap;
194  typedef std::vector<std::unique_ptr<Entry>> LocalMap;
195  typedef std::pair<unsigned int, unsigned int> RpcIdPair;
198 
199  mutable std::mutex m_mutex;
200  EntriesMap m_entries;
201  IdMap m_idmap;
202  LocalMap m_localmap;
203  RpcResultMap m_rpc_results;
204  RpcBlockingCallSet m_rpc_blocking_calls;
205  // If any persistent values have changed
206  mutable bool m_persistent_dirty = false;
207 
208  // condition variable and termination flag for blocking on a RPC result
209  std::atomic_bool m_terminating;
210  std::condition_variable m_rpc_results_cond;
211 
212  // configured by dispatcher at startup
213  IDispatcher* m_dispatcher = nullptr;
214  bool m_server = true;
215 
216  IEntryNotifier& m_notifier;
217  IRpcServer& m_rpc_server;
218  wpi::Logger& m_logger;
219 
220  void ProcessIncomingEntryAssign(std::shared_ptr<Message> msg,
221  INetworkConnection* conn);
222  void ProcessIncomingEntryUpdate(std::shared_ptr<Message> msg,
223  INetworkConnection* conn);
224  void ProcessIncomingFlagsUpdate(std::shared_ptr<Message> msg,
225  INetworkConnection* conn);
226  void ProcessIncomingEntryDelete(std::shared_ptr<Message> msg,
227  INetworkConnection* conn);
228  void ProcessIncomingClearEntries(std::shared_ptr<Message> msg,
229  INetworkConnection* conn);
230  void ProcessIncomingExecuteRpc(std::shared_ptr<Message> msg,
231  INetworkConnection* conn,
232  std::weak_ptr<INetworkConnection> conn_weak);
233  void ProcessIncomingRpcResponse(std::shared_ptr<Message> msg,
234  INetworkConnection* conn);
235 
236  bool GetPersistentEntries(
237  bool periodic,
238  std::vector<std::pair<std::string, std::shared_ptr<Value>>>* entries)
239  const;
240  bool GetEntries(StringRef prefix,
241  std::vector<std::pair<std::string, std::shared_ptr<Value>>>*
242  entries) const;
243  void SetEntryValueImpl(Entry* entry, std::shared_ptr<Value> value,
244  std::unique_lock<std::mutex>& lock, bool local);
245  void SetEntryFlagsImpl(Entry* entry, unsigned int flags,
246  std::unique_lock<std::mutex>& lock, bool local);
247  void DeleteEntryImpl(Entry* entry, std::unique_lock<std::mutex>& lock,
248  bool local);
249 
250  // Must be called with m_mutex held
251  template <typename F>
252  void DeleteAllEntriesImpl(bool local, F should_delete);
253  void DeleteAllEntriesImpl(bool local);
254  Entry* GetOrNew(StringRef name);
255 };
256 
257 } // namespace nt
258 
259 #endif // NT_STORAGE_H_
void SetEntryFlags(StringRef name, unsigned int flags)
Set Entry Flags.
Definition: ntcore_cpp.cpp:156
Definition: Path.inc:27
bool SetEntryValue(StringRef name, std::shared_ptr< Value > value)
Set Entry Value.
Definition: ntcore_cpp.cpp:130
unsigned int GetEntryFlags(StringRef name)
Get Entry Flags.
Definition: ntcore_cpp.cpp:169
bool SetDefaultEntryValue(StringRef name, std::shared_ptr< Value > value)
Set Default Entry Value Returns copy of current entry value if it exists.
Definition: ntcore_cpp.cpp:117
NetworkTables Entry Notification.
Definition: ntcore_cpp.h:165
Definition: Storage.h:43
Definition: SocketError.cpp:18
const char * LoadEntries(NT_Inst inst, StringRef filename, StringRef prefix, std::function< void(size_t line, const char *msg)> warn)
Load table values from a file.
Definition: ntcore_cpp.cpp:946
Definition: IStorage.h:26
void CancelRpcResult(NT_Entry entry, NT_RpcCall call)
Ignore the result of a RPC call.
Definition: ntcore_cpp.cpp:617
ArrayRef - Represent a constant reference to an array (0 or more elements consecutively in memory)...
Definition: ArrayRef.h:32
std::vector< NT_Entry > GetEntries(NT_Inst inst, StringRef prefix, unsigned int types)
Get Entry Handles.
Definition: ntcore_cpp.cpp:65
NT_RpcCall CallRpc(NT_Entry entry, StringRef params)
Call a RPC function.
Definition: ntcore_cpp.cpp:575
void SetEntryTypeValue(StringRef name, std::shared_ptr< Value > value)
Set Entry Type and Value.
Definition: ntcore_cpp.cpp:143
const char * LoadPersistent(StringRef filename, std::function< void(size_t line, const char *msg)> warn)
Load persistent values from a file.
Definition: ntcore_cpp.cpp:924
void CreateRpc(NT_Entry entry, StringRef def, std::function< void(const RpcAnswer &answer)> callback)
Create a callback-based RPC entry point.
Definition: ntcore_cpp.cpp:476
Definition: raw_istream.h:23
NT_Entry GetEntry(NT_Inst inst, StringRef name)
Get Entry Handle.
Definition: ntcore_cpp.cpp:55
Definition: SequenceNumber.h:14
Definition: IEntryNotifier.h:15
NT_Type GetEntryType(NT_Entry entry)
Gets the type for the specified entry, or unassigned if non existent.
Definition: ntcore_cpp.cpp:86
NetworkTables Entry Information.
Definition: ntcore_cpp.h:30
std::vector< EntryInfo > GetEntryInfo(StringRef prefix, unsigned int types)
Get Entry Information.
Definition: ntcore_cpp.cpp:207
const char * SavePersistent(StringRef filename)
Save persistent values to a file.
Definition: ntcore_cpp.cpp:913
Definition: IDispatcher.h:21
void DeleteAllEntries()
Delete All Entries.
Definition: ntcore_cpp.cpp:195
std::shared_ptr< Value > GetEntryValue(StringRef name)
Get Entry Value.
Definition: ntcore_cpp.cpp:104
Definition: Logger.h:30
Definition: IRpcServer.h:18
Definition: INetworkConnection.h:18
bool GetRpcResult(NT_Entry entry, NT_RpcCall call, std::string *result)
Get the result (return value) of a RPC call.
Definition: ntcore_cpp.cpp:587
const char * SaveEntries(NT_Inst inst, StringRef filename, StringRef prefix)
Save table values to a file.
Definition: ntcore_cpp.cpp:939
void DeleteEntry(StringRef name)
Delete Entry.
Definition: ntcore_cpp.cpp:182
This class implements an extremely fast bulk output stream that can only output to a stream...
Definition: raw_ostream.h:33
StringRef - Represent a constant reference to a string, i.e.
Definition: StringRef.h:42
Definition: IEntryNotifier.h:17
std::string GetEntryName(NT_Entry entry)
Gets the name of the specified entry.
Definition: ntcore_cpp.cpp:77
unsigned long long GetEntryLastChange(NT_Entry entry)
Gets the last time the entry was changed.
Definition: ntcore_cpp.cpp:95