WPILibC++  unspecified
Storage.h
1 /*----------------------------------------------------------------------------*/
2 /* Copyright (c) 2015-2018 FIRST. All Rights Reserved. */
3 /* Open Source Software - may be modified and shared by FRC teams. The code */
4 /* must be accompanied by the FIRST BSD license file in the root directory of */
5 /* the project. */
6 /*----------------------------------------------------------------------------*/
7 
8 #ifndef NTCORE_STORAGE_H_
9 #define NTCORE_STORAGE_H_
10 
11 #include <stdint.h>
12 
13 #include <atomic>
14 #include <cstddef>
15 #include <functional>
16 #include <memory>
17 #include <string>
18 #include <utility>
19 #include <vector>
20 
21 #include <wpi/DenseMap.h>
22 #include <wpi/SmallSet.h>
23 #include <wpi/StringMap.h>
24 #include <wpi/condition_variable.h>
25 #include <wpi/mutex.h>
26 
27 #include "IStorage.h"
28 #include "Message.h"
29 #include "SequenceNumber.h"
30 #include "ntcore_cpp.h"
31 
32 namespace wpi {
33 class Logger;
34 class raw_istream;
35 class raw_ostream;
36 } // namespace wpi
37 
38 namespace nt {
39 
40 class IEntryNotifier;
41 class INetworkConnection;
42 class IRpcServer;
43 class IStorageTest;
44 
45 class Storage : public IStorage {
46  friend class StorageTest;
47 
48  public:
49  Storage(IEntryNotifier& notifier, IRpcServer& rpcserver, wpi::Logger& logger);
50  Storage(const Storage&) = delete;
51  Storage& operator=(const Storage&) = delete;
52 
53  ~Storage();
54 
55  // Accessors required by Dispatcher. An interface is used for
56  // generation of outgoing messages to break a dependency loop between
57  // Storage and Dispatcher.
58  void SetDispatcher(IDispatcher* dispatcher, bool server) override;
59  void ClearDispatcher() override;
60 
61  // Required for wire protocol 2.0 to get the entry type of an entry when
62  // receiving entry updates (because the length/type is not provided in the
63  // message itself). Not used in wire protocol 3.0.
64  NT_Type GetMessageEntryType(unsigned int id) const override;
65 
66  void ProcessIncoming(std::shared_ptr<Message> msg, INetworkConnection* conn,
67  std::weak_ptr<INetworkConnection> conn_weak) override;
68  void GetInitialAssignments(
69  INetworkConnection& conn,
70  std::vector<std::shared_ptr<Message>>* msgs) override;
71  void ApplyInitialAssignments(
72  INetworkConnection& conn, wpi::ArrayRef<std::shared_ptr<Message>> msgs,
73  bool new_server,
74  std::vector<std::shared_ptr<Message>>* out_msgs) override;
75 
76  // User functions. These are the actual implementations of the corresponding
77  // user API functions in ntcore_cpp.
78  std::shared_ptr<Value> GetEntryValue(StringRef name) const;
79  std::shared_ptr<Value> GetEntryValue(unsigned int local_id) const;
80 
81  bool SetDefaultEntryValue(StringRef name, std::shared_ptr<Value> value);
82  bool SetDefaultEntryValue(unsigned int local_id,
83  std::shared_ptr<Value> value);
84 
85  bool SetEntryValue(StringRef name, std::shared_ptr<Value> value);
86  bool SetEntryValue(unsigned int local_id, std::shared_ptr<Value> value);
87 
88  void SetEntryTypeValue(StringRef name, std::shared_ptr<Value> value);
89  void SetEntryTypeValue(unsigned int local_id, std::shared_ptr<Value> value);
90 
91  void SetEntryFlags(StringRef name, unsigned int flags);
92  void SetEntryFlags(unsigned int local_id, unsigned int flags);
93 
94  unsigned int GetEntryFlags(StringRef name) const;
95  unsigned int GetEntryFlags(unsigned int local_id) const;
96 
97  void DeleteEntry(StringRef name);
98  void DeleteEntry(unsigned int local_id);
99 
100  void DeleteAllEntries();
101 
102  std::vector<EntryInfo> GetEntryInfo(int inst, const Twine& prefix,
103  unsigned int types);
104 
105  unsigned int AddListener(
106  const Twine& prefix,
107  std::function<void(const EntryNotification& event)> callback,
108  unsigned int flags) const;
109  unsigned int AddListener(
110  unsigned int local_id,
111  std::function<void(const EntryNotification& event)> callback,
112  unsigned int flags) const;
113 
114  unsigned int AddPolledListener(unsigned int poller_uid, const Twine& prefix,
115  unsigned int flags) const;
116  unsigned int AddPolledListener(unsigned int poller_uid, unsigned int local_id,
117  unsigned int flags) const;
118 
119  // Index-only
120  unsigned int GetEntry(const Twine& name);
121  std::vector<unsigned int> GetEntries(const Twine& prefix, unsigned int types);
122  EntryInfo GetEntryInfo(int inst, unsigned int local_id) const;
123  std::string GetEntryName(unsigned int local_id) const;
124  NT_Type GetEntryType(unsigned int local_id) const;
125  uint64_t GetEntryLastChange(unsigned int local_id) const;
126 
127  // Filename-based save/load functions. Used both by periodic saves and
128  // accessible directly via the user API.
129  const char* SavePersistent(const Twine& filename,
130  bool periodic) const override;
131  const char* LoadPersistent(
132  const Twine& filename,
133  std::function<void(size_t line, const char* msg)> warn) override;
134 
135  const char* SaveEntries(const Twine& filename, const Twine& prefix) const;
136  const char* LoadEntries(
137  const Twine& filename, const Twine& prefix,
138  std::function<void(size_t line, const char* msg)> warn);
139 
140  // Stream-based save/load functions (exposed for testing purposes). These
141  // implement the guts of the filename-based functions.
142  void SavePersistent(wpi::raw_ostream& os, bool periodic) const;
143  bool LoadEntries(wpi::raw_istream& is, const Twine& prefix, bool persistent,
144  std::function<void(size_t line, const char* msg)> warn);
145 
146  void SaveEntries(wpi::raw_ostream& os, const Twine& prefix) const;
147 
148  // RPC configuration needs to come through here as RPC definitions are
149  // actually special Storage value types.
150  void CreateRpc(unsigned int local_id, StringRef def, unsigned int rpc_uid);
151  unsigned int CallRpc(unsigned int local_id, StringRef params);
152  bool GetRpcResult(unsigned int local_id, unsigned int call_uid,
153  std::string* result);
154  bool GetRpcResult(unsigned int local_id, unsigned int call_uid,
155  std::string* result, double timeout, bool* timed_out);
156  void CancelRpcResult(unsigned int local_id, unsigned int call_uid);
157 
158  private:
159  // Data for each table entry.
160  struct Entry {
161  explicit Entry(wpi::StringRef name_) : name(name_) {}
162  bool IsPersistent() const { return (flags & NT_PERSISTENT) != 0; }
163 
164  // We redundantly store the name so that it's available when accessing the
165  // raw Entry* via the ID map.
166  std::string name;
167 
168  // The current value and flags.
169  std::shared_ptr<Value> value;
170  unsigned int flags{0};
171 
172  // Unique ID for this entry as used in network messages. The value is
173  // assigned by the server, so on the client this is 0xffff until an
174  // entry assignment is received back from the server.
175  unsigned int id{0xffff};
176 
177  // Local ID.
178  unsigned int local_id{UINT_MAX};
179 
180  // Sequence number for update resolution.
181  SequenceNumber seq_num;
182 
183  // If value has been written locally. Used during initial handshake
184  // on client to determine whether or not to accept remote changes.
185  bool local_write{false};
186 
187  // RPC handle.
188  unsigned int rpc_uid{UINT_MAX};
189 
190  // Last UID used when calling this RPC (primarily for client use). This
191  // is incremented for each call.
192  unsigned int rpc_call_uid{0};
193  };
194 
196  typedef std::vector<Entry*> IdMap;
197  typedef std::vector<std::unique_ptr<Entry>> LocalMap;
198  typedef std::pair<unsigned int, unsigned int> RpcIdPair;
201 
202  mutable wpi::mutex m_mutex;
203  EntriesMap m_entries;
204  IdMap m_idmap;
205  LocalMap m_localmap;
206  RpcResultMap m_rpc_results;
207  RpcBlockingCallSet m_rpc_blocking_calls;
208  // If any persistent values have changed
209  mutable bool m_persistent_dirty = false;
210 
211  // condition variable and termination flag for blocking on a RPC result
212  std::atomic_bool m_terminating;
213  wpi::condition_variable m_rpc_results_cond;
214 
215  // configured by dispatcher at startup
216  IDispatcher* m_dispatcher = nullptr;
217  bool m_server = true;
218 
219  IEntryNotifier& m_notifier;
220  IRpcServer& m_rpc_server;
221  wpi::Logger& m_logger;
222 
223  void ProcessIncomingEntryAssign(std::shared_ptr<Message> msg,
224  INetworkConnection* conn);
225  void ProcessIncomingEntryUpdate(std::shared_ptr<Message> msg,
226  INetworkConnection* conn);
227  void ProcessIncomingFlagsUpdate(std::shared_ptr<Message> msg,
228  INetworkConnection* conn);
229  void ProcessIncomingEntryDelete(std::shared_ptr<Message> msg,
230  INetworkConnection* conn);
231  void ProcessIncomingClearEntries(std::shared_ptr<Message> msg,
232  INetworkConnection* conn);
233  void ProcessIncomingExecuteRpc(std::shared_ptr<Message> msg,
234  INetworkConnection* conn,
235  std::weak_ptr<INetworkConnection> conn_weak);
236  void ProcessIncomingRpcResponse(std::shared_ptr<Message> msg,
237  INetworkConnection* conn);
238 
239  bool GetPersistentEntries(
240  bool periodic,
241  std::vector<std::pair<std::string, std::shared_ptr<Value>>>* entries)
242  const;
243  bool GetEntries(const Twine& prefix,
244  std::vector<std::pair<std::string, std::shared_ptr<Value>>>*
245  entries) const;
246  void SetEntryValueImpl(Entry* entry, std::shared_ptr<Value> value,
247  std::unique_lock<wpi::mutex>& lock, bool local);
248  void SetEntryFlagsImpl(Entry* entry, unsigned int flags,
249  std::unique_lock<wpi::mutex>& lock, bool local);
250  void DeleteEntryImpl(Entry* entry, std::unique_lock<wpi::mutex>& lock,
251  bool local);
252 
253  // Must be called with m_mutex held
254  template <typename F>
255  void DeleteAllEntriesImpl(bool local, F should_delete);
256  void DeleteAllEntriesImpl(bool local);
257  Entry* GetOrNew(const Twine& name);
258 };
259 
260 } // namespace nt
261 
262 #endif // NTCORE_STORAGE_H_
This class implements an extremely fast bulk output stream that can only output to a stream...
Definition: raw_ostream.h:45
const char * SaveEntries(NT_Inst inst, const Twine &filename, const Twine &prefix)
Save table values to a file.
Definition: ntcore_cpp.cpp:941
void SetEntryFlags(StringRef name, unsigned int flags)
Set Entry Flags.
Definition: ntcore_cpp.cpp:157
bool SetEntryValue(StringRef name, std::shared_ptr< Value > value)
Set Entry Value.
Definition: ntcore_cpp.cpp:131
unsigned int GetEntryFlags(StringRef name)
Get Entry Flags.
Definition: ntcore_cpp.cpp:170
bool SetDefaultEntryValue(StringRef name, std::shared_ptr< Value > value)
Set Default Entry Value Returns copy of current entry value if it exists.
Definition: ntcore_cpp.cpp:118
NetworkTables Entry Notification.
Definition: ntcore_cpp.h:171
Definition: Storage.h:45
ArrayRef - Represent a constant reference to an array (0 or more elements consecutively in memory)...
Definition: ArrayRef.h:41
namespace to hold default to_json function
Definition: json_binary_writer.cpp:39
Definition: IStorage.h:26
void CancelRpcResult(NT_Entry entry, NT_RpcCall call)
Ignore the result of a RPC call.
Definition: ntcore_cpp.cpp:619
NT_Entry GetEntry(NT_Inst inst, const Twine &name)
Get Entry Handle.
Definition: ntcore_cpp.cpp:56
const char * LoadEntries(NT_Inst inst, const Twine &filename, const Twine &prefix, std::function< void(size_t line, const char *msg)> warn)
Load table values from a file.
Definition: ntcore_cpp.cpp:949
NT_RpcCall CallRpc(NT_Entry entry, StringRef params)
Call a RPC function.
Definition: ntcore_cpp.cpp:577
void SetEntryTypeValue(StringRef name, std::shared_ptr< Value > value)
Set Entry Type and Value.
Definition: ntcore_cpp.cpp:144
const char * LoadPersistent(StringRef filename, std::function< void(size_t line, const char *msg)> warn)
Load persistent values from a file.
Definition: ntcore_cpp.cpp:926
void CreateRpc(NT_Entry entry, StringRef def, std::function< void(const RpcAnswer &answer)> callback)
Create a callback-based RPC entry point.
Definition: ntcore_cpp.cpp:478
Definition: raw_istream.h:26
std::vector< NT_Entry > GetEntries(NT_Inst inst, const Twine &prefix, unsigned int types)
Get Entry Handles.
Definition: ntcore_cpp.cpp:66
Definition: SequenceNumber.h:14
Definition: IStorage.h:21
NT_Type GetEntryType(NT_Entry entry)
Gets the type for the specified entry, or unassigned if non existent.
Definition: ntcore_cpp.cpp:87
NetworkTables Entry Information.
Definition: ntcore_cpp.h:35
std::vector< EntryInfo > GetEntryInfo(StringRef prefix, unsigned int types)
Get Entry Information.
Definition: ntcore_cpp.cpp:208
const char * SavePersistent(StringRef filename)
Save persistent values to a file.
Definition: ntcore_cpp.cpp:915
Definition: IDispatcher.h:21
StringRef - Represent a constant reference to a string, i.e.
Definition: StringRef.h:49
void DeleteAllEntries()
Delete All Entries.
Definition: ntcore_cpp.cpp:196
std::shared_ptr< Value > GetEntryValue(StringRef name)
Get Entry Value.
Definition: ntcore_cpp.cpp:105
Definition: Logger.h:30
Definition: IRpcServer.h:18
uint64_t GetEntryLastChange(NT_Entry entry)
Gets the last time the entry was changed.
Definition: ntcore_cpp.cpp:96
Definition: INetworkConnection.h:18
bool GetRpcResult(NT_Entry entry, NT_RpcCall call, std::string *result)
Get the result (return value) of a RPC call.
Definition: ntcore_cpp.cpp:589
Twine - A lightweight data structure for efficiently representing the concatenation of temporary valu...
Definition: Twine.h:79
void DeleteEntry(StringRef name)
Delete Entry.
Definition: ntcore_cpp.cpp:183
Definition: IEntryNotifier.h:18
std::string GetEntryName(NT_Entry entry)
Gets the name of the specified entry.
Definition: ntcore_cpp.cpp:78