1 module neton.store.Store;
2 
3 import core.sync.mutex;
4 import neton.store.WatcherHub;
5 import neton.store.RocksdbStore;
6 import neton.store.Event;
7 import neton.store.Watcher;
8 import hunt.logging;
9 import std.json;
10 import std.uni;
11 import std.algorithm.searching;
12 import neton.store.Util;
13 
14 import neton.protocol.neton;
15 import neton.protocol.netonrpc;
16 import neton.rpcservice.Command;
17 
18 import neton.lease;
19 
20 alias Event = neton.store.Event.Event;
21 
22 struct TTLOptionSet
23 {
24     uint ExpireTime;
25     bool Refresh;
26 }
27 
28 interface StoreInter
29 {
30     //Version() int
31     ulong Index();
32 
33     Event Get(string nodePath, bool recursive, bool sorted);
34     Event Set(string nodePath, bool dir, string value);
35     // Event Update(string nodePath , string newValue,TTLOptionSet expireOpts );
36     // Event Create(string nodePath , bool dir, string value , bool unique,
37     // 	TTLOptionSet expireOpts);
38     // Event CompareAndSwap(string nodePath ,string prevValue , ulong  prevIndex,
39     // 	string value ,TTLOptionSet expireOpts);
40     Event Delete(string nodePath, bool recursive = false);
41     // Event CompareAndDelete(string nodePath ,string prevValue , ulong prevIndex);
42 
43     Watcher Watch(string prefix, bool recursive, bool stream, ulong sinceIndex);
44 
45     // Save() ([]byte, error)
46     // Recovery(state []byte) error
47 
48     // Clone() Store
49     // SaveNoCopy() ([]byte, error)
50 
51     // JsonStats() []byte
52     // DeleteExpiredKeys(cutoff time.Time)
53 
54     // HasTTLKeys() bool
55 }
56 
57 class Store : StoreInter
58 {
59     private
60     {
61         __gshared Store _gstore;
62         RocksdbStore _kvStore;
63         WatcherHub _watcherHub;
64         ulong _currentIndex;
65         Mutex _mtx;
66     }
67 
68     void Init(ulong ID, Lessor l)
69     {
70         _kvStore = new RocksdbStore(ID, l);
71     }
72 
73     private this()
74     {
75         _currentIndex = 0;
76         _watcherHub = new WatcherHub(1000);
77     }
78 
79     // Index retrieves the current index of the store.
80     ulong Index()
81     {
82         return _currentIndex;
83     }
84 
85     // Get returns a get event.
86     // If recursive is true, it will return all the content under the node path.
87     // If sorted is true, it will sort the content by keys.
88     Event Get(string nodePath, bool recursive, bool sorted)
89     {
90         // nodePath = getSafeKey(nodePath);
91         auto e = new Event(EventAction.Get, nodePath, 0, recursive);
92         e.setNetonIndex(_currentIndex);
93         //e.Node.loadInternalNode(n, recursive, sorted, s.clock)
94 
95         return e;
96     }
97 
98     // set value
99     Event Set(string nodePath, bool dir, string value)
100     {
101         // nodePath = getSafeKey(nodePath);
102         // Set new value
103         string error;
104         auto ok = _kvStore.set(nodePath, value, error);
105 
106         _currentIndex++;
107         auto e = new Event(EventAction.Set, nodePath, _currentIndex);
108         e.setNetonIndex(_currentIndex);
109 
110         //logInfo("---- set event : ", e);
111         if (ok)
112             _watcherHub.notify(e);
113         else
114             e.setErrorMsg(error);
115 
116         //logInfo("---- notify finish : ", e.getNodeValue());
117         return e;
118     }
119 
120     // create dir
121     Event CreateDir(string nodePath)
122     {
123         // nodePath = getSafeKey(nodePath);
124 
125         string error;
126         auto ok = _kvStore.createDir(nodePath, error);
127 
128         _currentIndex++;
129         auto e = new Event(EventAction.Create, nodePath, _currentIndex);
130         e.setNetonIndex(_currentIndex);
131 
132         if (ok)
133             _watcherHub.notify(e);
134         else
135             e.setErrorMsg(error);
136 
137         return e;
138     }
139 
140     // watch key or dir
141     Watcher Watch(string key, bool recursive, bool stream, ulong sinceIndex)
142     {
143         // key = getSafeKey(key);
144 
145         auto keys = key;
146         if (sinceIndex == 0)
147         {
148             sinceIndex = _currentIndex + 1;
149         }
150         // WatcherHub does not know about the current index, so we need to pass it in
151         recursive = true  /* _kvStore.isDir(key) */ ;
152         auto w = _watcherHub.watch(getSafeKey(keys), recursive, stream, sinceIndex, _currentIndex);
153         if (w is null)
154         {
155             return null;
156         }
157         return w;
158     }
159 
160     // Delete deletes the node at the given path.
161     Event Delete(string nodePath, bool recursive = false)
162     {
163         // nodePath = getSafeKey(nodePath);
164 
165         _currentIndex++;
166         auto e = new Event(EventAction.Delete, nodePath, _currentIndex);
167         e.setNetonIndex(_currentIndex);
168 
169         if (e.dir && recursive == false)
170         {
171             e.setErrorMsg(nodePath ~ " is dir , please use recursive option");
172         }
173         else
174         {
175             _watcherHub.notify(e);
176 
177             _kvStore.Remove(nodePath, recursive);
178         }
179 
180         return e;
181     }
182 
183     Event Register(ref JSONValue server)
184     {
185         auto service = server["service"];
186         string id, name, key = SERVICE_PREFIX;
187         if (service.type == JSONType.object)
188         {
189             id = toLower(service["id"].str);
190             name = toLower(service["name"].str);
191             key ~= name;
192             key ~= "/";
193             key ~= id;
194         }
195         server["status"] = ServiceState.Passing;
196 
197         string error;
198         auto ok = _kvStore.set(key, server.toString, error);
199 
200         _currentIndex++;
201         auto e = new Event(EventAction.Register, key, _currentIndex);
202         e.setNetonIndex(_currentIndex);
203 
204         //logInfo("---- set event : ", e);
205         if (ok)
206             _watcherHub.notify(e);
207         else
208             e.setErrorMsg(error);
209 
210         //logInfo("---- notify finish : ", e.getNodeValue());
211         return e;
212     }
213 
214     Event Deregister(ref JSONValue server)
215     {
216         string id, name, key = SERVICE_PREFIX;
217         if (server.type == JSONType.object)
218         {
219             id = toLower(server["id"].str);
220             name = toLower(server["name"].str);
221             key ~= name;
222             key ~= "/";
223             key ~= id;
224         }
225 
226         auto e = new Event(EventAction.Deregister, key, _currentIndex + 1);
227         e.setNetonIndex(_currentIndex + 1);
228 
229         auto value = getStringValue(key);
230         if (value.length > 0)
231         {
232             _currentIndex++;
233             _kvStore.Remove(key, false);
234             _watcherHub.notify(e);
235         }
236         else
237         {
238             e.setNetonIndex(_currentIndex);
239             e.setErrorMsg("service not found!");
240         }
241 
242         return e;
243     }
244 
245     /// rpc interface
246     RangeResponse get(RpcRequest req)
247     {
248         if (req.CMD == RpcReqCommand.RangeRequest)
249         {
250             auto e = new Event(EventAction.Get, req.Key, req.Key, 0);
251             e.setNetonIndex(_currentIndex);
252 
253             RangeResponse respon = new RangeResponse();
254             respon.kvs = e.getKeyValues();
255             respon.count = respon.kvs.length;
256             return respon;
257         }
258         else if (req.CMD == RpcReqCommand.ConfigRangeRequest)
259         {
260             auto e = new Event(EventAction.Get, req.Key, getConfigKey(req.Key), 0);
261             e.setNetonIndex(_currentIndex);
262 
263             RangeResponse respon = new RangeResponse();
264             respon.kvs = e.getKeyValues();
265             respon.count = respon.kvs.length;
266             return respon;
267         }
268         else if (req.CMD == RpcReqCommand.RegistryRangeRequest)
269         {
270             auto e = new Event(EventAction.Get, req.Key, getRegistryKey(req.Key), 0);
271             e.setNetonIndex(_currentIndex);
272 
273             RangeResponse respon = new RangeResponse();
274             respon.kvs = e.getKeyValues();
275             respon.count = respon.kvs.length;
276             return respon;
277         }
278         return null;
279     }
280 
281     PutResponse put(RpcRequest req)
282     {
283         if (req.CMD == RpcReqCommand.PutRequest)
284         {
285             auto safeKey = getSafeKey(req.Key);
286 
287             if (isRemained(safeKey))
288             {
289                 logErrorf("%s is remained !",req.Key);
290                 return null;
291             }
292         }
293 
294         auto respon = _kvStore.put(req);
295         string nodePath;
296         if (req.CMD == RpcReqCommand.ConfigPutRequest)
297         {
298             nodePath = getConfigKey(req.Key);
299         }
300         else if (req.CMD == RpcReqCommand.RegistryPutRequest)
301         {
302             nodePath = getRegistryKey(req.Key);
303         }
304         else
305         {
306             nodePath = getSafeKey(req.Key);
307         }
308         if (respon !is null)
309         {
310             _currentIndex++;
311             auto e = new Event(EventAction.Set, req.Key, nodePath, _currentIndex);
312             e.setNetonIndex(_currentIndex);
313 
314             _watcherHub.notify(e);
315         }
316         return respon;
317     }
318 
319     DeleteRangeResponse deleteRange(RpcRequest req)
320     {
321         _currentIndex++;
322         string nodePath;
323         if (req.CMD == RpcReqCommand.ConfigDeleteRangeRequest)
324         {
325             nodePath = getConfigKey(req.Key);
326         }
327         else if (req.CMD == RpcReqCommand.RegistryDeleteRangeRequest)
328         {
329             nodePath = getRegistryKey(req.Key);
330         }
331         else
332         {
333             nodePath = getSafeKey(req.Key);
334         }
335         auto e = new Event(EventAction.Delete, req.Key, nodePath, _currentIndex);
336         e.setNetonIndex(_currentIndex);
337         {
338             auto respon = _kvStore.deleteRange(req);
339             auto kv = new KeyValue();
340             kv.key = cast(ubyte[])(req.Key);
341             kv.value = cast(ubyte[])(e.rpcValue());
342             respon.prevKvs ~= kv;
343 
344             if (respon.deleted > 0)
345                 _watcherHub.notify(e);
346 
347             return respon;
348         }
349     }
350 
351     long generateLeaseID()
352     {
353         return _kvStore.generateLeaseID();
354     }
355 
356     Lease grantLease(long leaseid, long ttl)
357     {
358         return _kvStore.grantLease(leaseid, ttl);
359     }
360 
361     bool revokeLease(long leaseid)
362     {
363         return _kvStore.revokeLease(leaseid);
364     }
365 
366     LeaseTimeToLiveResponse leaseTimeToLive(long leaseid)
367     {
368         return _kvStore.leaseTimeToLive(leaseid);
369     }
370 
371     LeaseLeasesResponse leaseLeases()
372     {
373         return _kvStore.leaseLeases();
374     }
375 
376     LeaseKeepAliveResponse renewLease(RpcRequest req)
377     {
378         return _kvStore.renewLease(req);
379     }
380 
381     static Store instance()
382     {
383         if (_gstore is null)
384             _gstore = new Store();
385         return _gstore;
386     }
387 
388     string getStringValue(string key)
389     {
390         return _kvStore.getStringValue(key);
391     }
392 
393     JSONValue getJsonValue(string key)
394     {
395         return _kvStore.getJsonValue(key);
396     }
397 
398 }