1 module neton.store.Store; 2 3 import core.sync.mutex; 4 import neton.store.WatcherHub; 5 import neton.store.RocksdbStore; 6 import neton.store.Event; 7 import neton.store.Watcher; 8 import hunt.logging; 9 import std.json; 10 import std.uni; 11 import std.algorithm.searching; 12 import neton.store.Util; 13 14 import neton.protocol.neton; 15 import neton.protocol.netonrpc; 16 import neton.rpcservice.Command; 17 18 import neton.lease; 19 20 alias Event = neton.store.Event.Event; 21 22 struct TTLOptionSet 23 { 24 uint ExpireTime; 25 bool Refresh; 26 } 27 28 interface StoreInter 29 { 30 //Version() int 31 ulong Index(); 32 33 Event Get(string nodePath, bool recursive, bool sorted); 34 Event Set(string nodePath, bool dir, string value); 35 // Event Update(string nodePath , string newValue,TTLOptionSet expireOpts ); 36 // Event Create(string nodePath , bool dir, string value , bool unique, 37 // TTLOptionSet expireOpts); 38 // Event CompareAndSwap(string nodePath ,string prevValue , ulong prevIndex, 39 // string value ,TTLOptionSet expireOpts); 40 Event Delete(string nodePath, bool recursive = false); 41 // Event CompareAndDelete(string nodePath ,string prevValue , ulong prevIndex); 42 43 Watcher Watch(string prefix, bool recursive, bool stream, ulong sinceIndex); 44 45 // Save() ([]byte, error) 46 // Recovery(state []byte) error 47 48 // Clone() Store 49 // SaveNoCopy() ([]byte, error) 50 51 // JsonStats() []byte 52 // DeleteExpiredKeys(cutoff time.Time) 53 54 // HasTTLKeys() bool 55 } 56 57 class Store : StoreInter 58 { 59 private 60 { 61 __gshared Store _gstore; 62 RocksdbStore _kvStore; 63 WatcherHub _watcherHub; 64 ulong _currentIndex; 65 Mutex _mtx; 66 } 67 68 void Init(ulong ID, Lessor l) 69 { 70 _kvStore = new RocksdbStore(ID, l); 71 } 72 73 private this() 74 { 75 _currentIndex = 0; 76 _watcherHub = new WatcherHub(1000); 77 } 78 79 // Index retrieves the current index of the store. 80 ulong Index() 81 { 82 return _currentIndex; 83 } 84 85 // Get returns a get event. 86 // If recursive is true, it will return all the content under the node path. 87 // If sorted is true, it will sort the content by keys. 88 Event Get(string nodePath, bool recursive, bool sorted) 89 { 90 // nodePath = getSafeKey(nodePath); 91 auto e = new Event(EventAction.Get, nodePath, 0, recursive); 92 e.setNetonIndex(_currentIndex); 93 //e.Node.loadInternalNode(n, recursive, sorted, s.clock) 94 95 return e; 96 } 97 98 // set value 99 Event Set(string nodePath, bool dir, string value) 100 { 101 // nodePath = getSafeKey(nodePath); 102 // Set new value 103 string error; 104 auto ok = _kvStore.set(nodePath, value, error); 105 106 _currentIndex++; 107 auto e = new Event(EventAction.Set, nodePath, _currentIndex); 108 e.setNetonIndex(_currentIndex); 109 110 //logInfo("---- set event : ", e); 111 if (ok) 112 _watcherHub.notify(e); 113 else 114 e.setErrorMsg(error); 115 116 //logInfo("---- notify finish : ", e.getNodeValue()); 117 return e; 118 } 119 120 // create dir 121 Event CreateDir(string nodePath) 122 { 123 // nodePath = getSafeKey(nodePath); 124 125 string error; 126 auto ok = _kvStore.createDir(nodePath, error); 127 128 _currentIndex++; 129 auto e = new Event(EventAction.Create, nodePath, _currentIndex); 130 e.setNetonIndex(_currentIndex); 131 132 if (ok) 133 _watcherHub.notify(e); 134 else 135 e.setErrorMsg(error); 136 137 return e; 138 } 139 140 // watch key or dir 141 Watcher Watch(string key, bool recursive, bool stream, ulong sinceIndex) 142 { 143 // key = getSafeKey(key); 144 145 auto keys = key; 146 if (sinceIndex == 0) 147 { 148 sinceIndex = _currentIndex + 1; 149 } 150 // WatcherHub does not know about the current index, so we need to pass it in 151 recursive = true /* _kvStore.isDir(key) */ ; 152 auto w = _watcherHub.watch(getSafeKey(keys), recursive, stream, sinceIndex, _currentIndex); 153 if (w is null) 154 { 155 return null; 156 } 157 return w; 158 } 159 160 // Delete deletes the node at the given path. 161 Event Delete(string nodePath, bool recursive = false) 162 { 163 // nodePath = getSafeKey(nodePath); 164 165 _currentIndex++; 166 auto e = new Event(EventAction.Delete, nodePath, _currentIndex); 167 e.setNetonIndex(_currentIndex); 168 169 if (e.dir && recursive == false) 170 { 171 e.setErrorMsg(nodePath ~ " is dir , please use recursive option"); 172 } 173 else 174 { 175 _watcherHub.notify(e); 176 177 _kvStore.Remove(nodePath, recursive); 178 } 179 180 return e; 181 } 182 183 Event Register(ref JSONValue server) 184 { 185 auto service = server["service"]; 186 string id, name, key = SERVICE_PREFIX; 187 if (service.type == JSONType.object) 188 { 189 id = toLower(service["id"].str); 190 name = toLower(service["name"].str); 191 key ~= name; 192 key ~= "/"; 193 key ~= id; 194 } 195 server["status"] = ServiceState.Passing; 196 197 string error; 198 auto ok = _kvStore.set(key, server.toString, error); 199 200 _currentIndex++; 201 auto e = new Event(EventAction.Register, key, _currentIndex); 202 e.setNetonIndex(_currentIndex); 203 204 //logInfo("---- set event : ", e); 205 if (ok) 206 _watcherHub.notify(e); 207 else 208 e.setErrorMsg(error); 209 210 //logInfo("---- notify finish : ", e.getNodeValue()); 211 return e; 212 } 213 214 Event Deregister(ref JSONValue server) 215 { 216 string id, name, key = SERVICE_PREFIX; 217 if (server.type == JSONType.object) 218 { 219 id = toLower(server["id"].str); 220 name = toLower(server["name"].str); 221 key ~= name; 222 key ~= "/"; 223 key ~= id; 224 } 225 226 auto e = new Event(EventAction.Deregister, key, _currentIndex + 1); 227 e.setNetonIndex(_currentIndex + 1); 228 229 auto value = getStringValue(key); 230 if (value.length > 0) 231 { 232 _currentIndex++; 233 _kvStore.Remove(key, false); 234 _watcherHub.notify(e); 235 } 236 else 237 { 238 e.setNetonIndex(_currentIndex); 239 e.setErrorMsg("service not found!"); 240 } 241 242 return e; 243 } 244 245 /// rpc interface 246 RangeResponse get(RpcRequest req) 247 { 248 if (req.CMD == RpcReqCommand.RangeRequest) 249 { 250 auto e = new Event(EventAction.Get, req.Key, req.Key, 0); 251 e.setNetonIndex(_currentIndex); 252 253 RangeResponse respon = new RangeResponse(); 254 respon.kvs = e.getKeyValues(); 255 respon.count = respon.kvs.length; 256 return respon; 257 } 258 else if (req.CMD == RpcReqCommand.ConfigRangeRequest) 259 { 260 auto e = new Event(EventAction.Get, req.Key, getConfigKey(req.Key), 0); 261 e.setNetonIndex(_currentIndex); 262 263 RangeResponse respon = new RangeResponse(); 264 respon.kvs = e.getKeyValues(); 265 respon.count = respon.kvs.length; 266 return respon; 267 } 268 else if (req.CMD == RpcReqCommand.RegistryRangeRequest) 269 { 270 auto e = new Event(EventAction.Get, req.Key, getRegistryKey(req.Key), 0); 271 e.setNetonIndex(_currentIndex); 272 273 RangeResponse respon = new RangeResponse(); 274 respon.kvs = e.getKeyValues(); 275 respon.count = respon.kvs.length; 276 return respon; 277 } 278 return null; 279 } 280 281 PutResponse put(RpcRequest req) 282 { 283 if (req.CMD == RpcReqCommand.PutRequest) 284 { 285 auto safeKey = getSafeKey(req.Key); 286 287 if (isRemained(safeKey)) 288 { 289 logErrorf("%s is remained !",req.Key); 290 return null; 291 } 292 } 293 294 auto respon = _kvStore.put(req); 295 string nodePath; 296 if (req.CMD == RpcReqCommand.ConfigPutRequest) 297 { 298 nodePath = getConfigKey(req.Key); 299 } 300 else if (req.CMD == RpcReqCommand.RegistryPutRequest) 301 { 302 nodePath = getRegistryKey(req.Key); 303 } 304 else 305 { 306 nodePath = getSafeKey(req.Key); 307 } 308 if (respon !is null) 309 { 310 _currentIndex++; 311 auto e = new Event(EventAction.Set, req.Key, nodePath, _currentIndex); 312 e.setNetonIndex(_currentIndex); 313 314 _watcherHub.notify(e); 315 } 316 return respon; 317 } 318 319 DeleteRangeResponse deleteRange(RpcRequest req) 320 { 321 _currentIndex++; 322 string nodePath; 323 if (req.CMD == RpcReqCommand.ConfigDeleteRangeRequest) 324 { 325 nodePath = getConfigKey(req.Key); 326 } 327 else if (req.CMD == RpcReqCommand.RegistryDeleteRangeRequest) 328 { 329 nodePath = getRegistryKey(req.Key); 330 } 331 else 332 { 333 nodePath = getSafeKey(req.Key); 334 } 335 auto e = new Event(EventAction.Delete, req.Key, nodePath, _currentIndex); 336 e.setNetonIndex(_currentIndex); 337 { 338 auto respon = _kvStore.deleteRange(req); 339 auto kv = new KeyValue(); 340 kv.key = cast(ubyte[])(req.Key); 341 kv.value = cast(ubyte[])(e.rpcValue()); 342 respon.prevKvs ~= kv; 343 344 if (respon.deleted > 0) 345 _watcherHub.notify(e); 346 347 return respon; 348 } 349 } 350 351 long generateLeaseID() 352 { 353 return _kvStore.generateLeaseID(); 354 } 355 356 Lease grantLease(long leaseid, long ttl) 357 { 358 return _kvStore.grantLease(leaseid, ttl); 359 } 360 361 bool revokeLease(long leaseid) 362 { 363 return _kvStore.revokeLease(leaseid); 364 } 365 366 LeaseTimeToLiveResponse leaseTimeToLive(long leaseid) 367 { 368 return _kvStore.leaseTimeToLive(leaseid); 369 } 370 371 LeaseLeasesResponse leaseLeases() 372 { 373 return _kvStore.leaseLeases(); 374 } 375 376 LeaseKeepAliveResponse renewLease(RpcRequest req) 377 { 378 return _kvStore.renewLease(req); 379 } 380 381 static Store instance() 382 { 383 if (_gstore is null) 384 _gstore = new Store(); 385 return _gstore; 386 } 387 388 string getStringValue(string key) 389 { 390 return _kvStore.getStringValue(key); 391 } 392 393 JSONValue getJsonValue(string key) 394 { 395 return _kvStore.getJsonValue(key); 396 } 397 398 }