WPILibC++  unspecified
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Pages
Storage.h
1 /*----------------------------------------------------------------------------*/
2 /* Copyright (c) FIRST 2015. All Rights Reserved. */
3 /* Open Source Software - may be modified and shared by FRC teams. The code */
4 /* must be accompanied by the FIRST BSD license file in the root directory of */
5 /* the project. */
6 /*----------------------------------------------------------------------------*/
7 
8 #ifndef NT_STORAGE_H_
9 #define NT_STORAGE_H_
10 
11 #include <atomic>
12 #include <cstddef>
13 #include <fstream>
14 #include <functional>
15 #include <iosfwd>
16 #include <memory>
17 #include <mutex>
18 #include <vector>
19 
20 #include "llvm/DenseMap.h"
21 #include "llvm/SmallSet.h"
22 #include "llvm/StringMap.h"
23 #include "support/atomic_static.h"
24 #include "Message.h"
25 #include "Notifier.h"
26 #include "ntcore_cpp.h"
27 #include "RpcServer.h"
28 #include "SequenceNumber.h"
29 
30 namespace nt {
31 
32 class NetworkConnection;
33 class StorageTest;
34 
35 class Storage {
36  friend class StorageTest;
37 
38  public:
39  static Storage& GetInstance() {
40  ATOMIC_STATIC(Storage, instance);
41  return instance;
42  }
43  ~Storage();
44 
45  // Accessors required by Dispatcher. A function pointer is used for
46  // generation of outgoing messages to break a dependency loop between
47  // Storage and Dispatcher; in operation this is always set to
48  // Dispatcher::QueueOutgoing.
49  typedef std::function<void(std::shared_ptr<Message> msg,
50  NetworkConnection* only,
51  NetworkConnection* except)>
52  QueueOutgoingFunc;
53  void SetOutgoing(QueueOutgoingFunc queue_outgoing, bool server);
54  void ClearOutgoing();
55 
56  // Required for wire protocol 2.0 to get the entry type of an entry when
57  // receiving entry updates (because the length/type is not provided in the
58  // message itself). Not used in wire protocol 3.0.
59  NT_Type GetEntryType(unsigned int id) const;
60 
61  void ProcessIncoming(std::shared_ptr<Message> msg, NetworkConnection* conn,
62  std::weak_ptr<NetworkConnection> conn_weak);
63  void GetInitialAssignments(NetworkConnection& conn,
64  std::vector<std::shared_ptr<Message>>* msgs);
65  void ApplyInitialAssignments(NetworkConnection& conn,
66  llvm::ArrayRef<std::shared_ptr<Message>> msgs,
67  bool new_server,
68  std::vector<std::shared_ptr<Message>>* out_msgs);
69 
70  // User functions. These are the actual implementations of the corresponding
71  // user API functions in ntcore_cpp.
72  std::shared_ptr<Value> GetEntryValue(StringRef name) const;
73  bool SetDefaultEntryValue(StringRef name, std::shared_ptr<Value> value);
74  bool SetEntryValue(StringRef name, std::shared_ptr<Value> value);
75  void SetEntryTypeValue(StringRef name, std::shared_ptr<Value> value);
76  void SetEntryFlags(StringRef name, unsigned int flags);
77  unsigned int GetEntryFlags(StringRef name) const;
78  void DeleteEntry(StringRef name);
79  void DeleteAllEntries();
80  std::vector<EntryInfo> GetEntryInfo(StringRef prefix, unsigned int types);
81  void NotifyEntries(StringRef prefix,
82  EntryListenerCallback only = nullptr) const;
83 
84  // Filename-based save/load functions. Used both by periodic saves and
85  // accessible directly via the user API.
86  const char* SavePersistent(StringRef filename, bool periodic) const;
87  const char* LoadPersistent(
88  StringRef filename,
89  std::function<void(std::size_t line, const char* msg)> warn);
90 
91  // Stream-based save/load functions (exposed for testing purposes). These
92  // implement the guts of the filename-based functions.
93  void SavePersistent(std::ostream& os, bool periodic) const;
94  bool LoadPersistent(
95  std::istream& is,
96  std::function<void(std::size_t line, const char* msg)> warn);
97 
98  // RPC configuration needs to come through here as RPC definitions are
99  // actually special Storage value types.
100  void CreateRpc(StringRef name, StringRef def, RpcCallback callback);
101  void CreatePolledRpc(StringRef name, StringRef def);
102 
103  unsigned int CallRpc(StringRef name, StringRef params);
104  bool GetRpcResult(bool blocking, unsigned int call_uid, std::string* result);
105  bool GetRpcResult(bool blocking, unsigned int call_uid, double time_out,
106  std::string* result);
107  void CancelBlockingRpcResult(unsigned int call_uid);
108 
109  private:
110  Storage();
111  Storage(Notifier& notifier, RpcServer& rpcserver);
112  Storage(const Storage&) = delete;
113  Storage& operator=(const Storage&) = delete;
114 
115  // Data for each table entry.
116  struct Entry {
117  Entry(llvm::StringRef name_)
118  : name(name_), flags(0), id(0xffff), rpc_call_uid(0) {}
119  bool IsPersistent() const { return (flags & NT_PERSISTENT) != 0; }
120 
121  // We redundantly store the name so that it's available when accessing the
122  // raw Entry* via the ID map.
123  std::string name;
124 
125  // The current value and flags.
126  std::shared_ptr<Value> value;
127  unsigned int flags;
128 
129  // Unique ID for this entry as used in network messages. The value is
130  // assigned by the server, so on the client this is 0xffff until an
131  // entry assignment is received back from the server.
132  unsigned int id;
133 
134  // Sequence number for update resolution.
135  SequenceNumber seq_num;
136 
137  // RPC callback function. Null if either not an RPC or if the RPC is
138  // polled.
139  RpcCallback rpc_callback;
140 
141  // Last UID used when calling this RPC (primarily for client use). This
142  // is incremented for each call.
143  unsigned int rpc_call_uid;
144  };
145 
146  typedef llvm::StringMap<std::unique_ptr<Entry>> EntriesMap;
147  typedef std::vector<Entry*> IdMap;
148  typedef llvm::DenseMap<std::pair<unsigned int, unsigned int>, std::string>
149  RpcResultMap;
150  typedef llvm::SmallSet<unsigned int, 12> RpcBlockingCallSet;
151 
152  mutable std::mutex m_mutex;
153  EntriesMap m_entries;
154  IdMap m_idmap;
155  RpcResultMap m_rpc_results;
156  RpcBlockingCallSet m_rpc_blocking_calls;
157  // If any persistent values have changed
158  mutable bool m_persistent_dirty = false;
159 
160  // condition variable and termination flag for blocking on a RPC result
161  std::atomic_bool m_terminating;
162  std::condition_variable m_rpc_results_cond;
163 
164  // configured by dispatcher at startup
165  QueueOutgoingFunc m_queue_outgoing;
166  bool m_server = true;
167 
168  // references to singletons (we don't grab them directly for testing purposes)
169  Notifier& m_notifier;
170  RpcServer& m_rpc_server;
171 
172  bool GetPersistentEntries(
173  bool periodic,
174  std::vector<std::pair<std::string, std::shared_ptr<Value>>>* entries)
175  const;
176  void DeleteAllEntriesImpl();
177 
178  ATOMIC_STATIC_DECL(Storage)
179 };
180 
181 } // namespace nt
182 
183 #endif // NT_STORAGE_H_
Definition: Storage.h:35
Definition: NetworkConnection.h:28
Definition: SequenceNumber.h:14
Definition: Notifier.h:19
Definition: RpcServer.h:25