1 module neton.store.RocksdbStore; 2 3 import core.stdc.stdio; 4 import std.string; 5 import std.stdio; 6 import std.json; 7 import std.experimental.allocator; 8 import std.file; 9 import std.conv : to; 10 import std.algorithm.searching; 11 import std.algorithm.mutation; 12 13 import hunt.logging; 14 import hunt.util.DateTime; 15 import hunt.util.Serialize; 16 17 import rocksdb.database; 18 import rocksdb.options; 19 20 import neton.store.Util; 21 import neton.lease; 22 import neton.protocol.neton; 23 import neton.protocol.neton; 24 import neton.rpcservice.Command; 25 26 /* 27 * service register/deregister store format 28 * 29 * key(/redis/redis1) : { 30 "dir" : "false", 31 "value" : "{ 32 "service" : { 33 "id" : "redis1", 34 "name" : "redis", 35 "address" : "127.0.0.1", 36 "port" : 8000 37 }, 38 "check" : { 39 "http" : "http://127.0.0.1:8001", 40 "interval" : 10, 41 "timeout" : 30 42 }, 43 "status" : "passing" 44 }" 45 } 46 */ 47 48 /* 49 * k/v store format 50 * 51 * key : { 52 "dir" : "false", 53 "value" : "some ..", 54 "leaseID": long.init, 55 "create_time" : 2333334422 ///unix seconds timestamp 56 } 57 key : { 58 "dir" : "true", 59 "children" : "/child1;/child2" 60 } 61 */ 62 63 /*** 64 * **************** Lease ************** 65 * key : /lease/{leaseID} 66 * value : { 67 "dir" : "false", 68 * "id" : {leaseID}, 69 * "ttl" : 12344, ///time to live 70 * "create_time" : 13444244, ///unix seconds timestamp 71 * "items":[ 72 * { 73 * "key" : "attach key", 74 * "time" : "attach time" 75 * } 76 * ] 77 * } 78 ***/ 79 80 /*** Gen Lease ID 81 key : LEASE_GEN_ID_PREFIX 82 value : { 83 "ID" : 1234 84 } 85 ****/ 86 87 class RocksdbStore 88 { 89 90 private 91 { 92 Database _rocksdb; 93 Lessor _lessor; 94 } 95 96 this(ulong ID, Lessor l) 97 { 98 auto opts = new DBOptions; 99 opts.createIfMissing = true; 100 opts.errorIfExists = false; 101 102 _rocksdb = new Database(opts, "rocks.db" ~ to!string(ID)); 103 _lessor = l; 104 init(l); 105 } 106 107 void init(Lessor lessor = null) 108 { 109 auto j = getJsonValue("/"); 110 if (j.type == JSONType.null_) 111 { 112 JSONValue dirvalue; 113 dirvalue["dir"] = "true"; 114 dirvalue["children"] = ""; 115 116 SetValue("/", dirvalue.toString); 117 } 118 119 ///init Lease 120 if (lessor !is null) 121 { 122 auto leases = getJsonValue(LEASE_PREFIX[0 .. $ - 1]); 123 if (leases.type != JSONType.null_) 124 { 125 auto lease_keys = leases["children"].str; 126 if (lease_keys.length > 0) 127 { 128 auto leaseIDs = split(lease_keys, ";"); 129 foreach (leaseID; leaseIDs) 130 { 131 if (leaseID.length != 0) 132 { 133 auto leaseItem = getJsonValue(leaseID); 134 if (leaseItem.type != JSONType.null_) 135 { 136 Lease l = new Lease(); 137 l.ID = leaseItem["id"].integer; 138 l.ttl = leaseItem["ttl"].integer; 139 l.expiry = l.ttl + leaseItem["create_time"].integer; 140 foreach (item; leaseItem["items"].array) 141 { 142 l.itemSet.add(item["key"].str); 143 } 144 lessor.init(l); 145 } 146 } 147 } 148 } 149 } 150 } 151 152 } 153 154 bool isDir(string key) 155 { 156 key = getSafeKey(key); 157 auto value = getJsonValue(key); 158 if (value.type != JSONType.null_) 159 { 160 if ("dir" in value) 161 { 162 if (value["dir"].str == "true") 163 return true; 164 } 165 } 166 return false; 167 } 168 169 bool Exsit(string key) 170 { 171 return getJsonValue(key).type != JSONType.null_; 172 } 173 174 //递归创建目录 175 void createDirWithRecur(string dir) 176 { 177 logInfo("---create dir : ", dir); 178 auto parent = getParent(dir); 179 if (parent != string.init) 180 { 181 if (!Exsit(parent)) 182 { 183 createDirWithRecur(parent); 184 } 185 CreateAndAppendSubdir(parent, dir); 186 } 187 else 188 { 189 CreateAndAppendSubdir(dir, ""); 190 } 191 } 192 193 //创建子目录 并添加key到父目录,父目录不存在则创建 194 void CreateAndAppendSubdir(string parent, string child) 195 { 196 if (child.length > 0) 197 { 198 JSONValue subdir; 199 subdir["dir"] = "true"; 200 subdir["children"] = ""; 201 SetValue(child, subdir.toString); 202 } 203 204 auto j = getJsonValue(parent); 205 if (j.type != JSONType.null_) 206 { 207 auto childs = j["children"].str; 208 auto segments = split(childs, ";"); 209 if (find(segments, child).empty) 210 { 211 childs ~= ";"; 212 childs ~= child; 213 j["children"] = childs; 214 215 SetValue(parent, j.toString); 216 } 217 } 218 else 219 { 220 JSONValue dirvalue; 221 dirvalue["dir"] = "true"; 222 dirvalue["children"] = child; 223 224 SetValue(parent, dirvalue.toString); 225 } 226 } 227 228 string Lookup(string key) 229 { 230 if (key.length == 0) 231 return string.init; 232 if (_rocksdb is null) 233 return string.init; 234 235 return _rocksdb.getString(key); 236 } 237 238 void Remove(string key, bool recursive = false) 239 { 240 key = getSafeKey(key); 241 if (!recursive) 242 _rocksdb.removeString(key); 243 else 244 { 245 auto j = getJsonValue(key); 246 if (j.type == JSONType.object && j["dir"].str == "true") 247 { 248 auto children = j["children"].str(); 249 auto segments = split(children, ";"); 250 foreach (subkey; segments) 251 { 252 if (subkey.length > 0) 253 _rocksdb.removeString(subkey); 254 } 255 } 256 257 _rocksdb.removeString(key); 258 } 259 removekeyFromParent(key); 260 } 261 262 JSONValue getJsonValue(string key) 263 { 264 auto jsonvalue = Lookup(key); 265 // logInfo("---getJsonValue key : ",key," value : ",jsonvalue); 266 JSONValue jvalue; 267 if (jsonvalue.length == 0) 268 return jvalue; 269 try 270 { 271 //logInfo("parse json value : ",jsonvalue); 272 jvalue = parseJSON(jsonvalue); 273 } 274 catch (Exception e) 275 { 276 logError("catch error : %s", e.msg); 277 } 278 279 return jvalue; 280 } 281 282 string getStringValue(string key) 283 { 284 return Lookup(key); 285 } 286 287 bool set(string originKey, string value, out string error, long leaseid = long.init) 288 { 289 //不能是目录 290 auto nodePath = getSafeKey(originKey); 291 auto node = getJsonValue(nodePath); 292 if (node.type == JSONType.object && "dir" in node) 293 { 294 if (node["dir"].str == "true") 295 { 296 error ~= nodePath; 297 error ~= " is dir"; 298 logError("set error : ", error, " info: ", node); 299 return false; 300 } 301 } 302 //父级目录要么不存在 要么必须是目录 303 auto p = getParent(nodePath); 304 if (p == string.init) 305 { 306 error = "the key is illegal"; 307 logError("set error : ", error, " info: ", p); 308 return false; 309 } 310 auto j = getJsonValue(p); 311 if (j.type != JSONType.null_ && j["dir"].str != "true") 312 { 313 error ~= p; 314 error ~= " not is dir"; 315 logError("set error : ", error, " info: ", p); 316 return false; 317 } 318 319 setFileKeyValue(originKey, value, leaseid); 320 return true; 321 } 322 323 bool createDir(string path, out string error) 324 { 325 path = getSafeKey(path); 326 //目录或文件存在 327 auto node = getJsonValue(path); 328 if (node.type != JSONType.null_) 329 { 330 error ~= path; 331 error ~= " is exist"; 332 return false; 333 } 334 335 //父级目录要么不存在 要么必须是目录 336 auto p = getParent(path); 337 if (p == string.init) 338 { 339 error = "the key is illegal"; 340 return false; 341 } 342 auto j = getJsonValue(p); 343 if (j.type != JSONType.null_ && j["dir"].str != "true") 344 { 345 error ~= p; 346 error ~= " not is dir"; 347 return false; 348 } 349 350 createDirWithRecur(path); 351 return true; 352 } 353 354 Lease grantLease(long leaseid, long ttl) 355 { 356 if (_lessor !is null) 357 { 358 auto l = _lessor.Grant(leaseid, ttl); 359 if (l is null) 360 return null; 361 362 auto lease = getJsonValue(LEASE_PREFIX ~ leaseid.to!string); 363 if (lease.type == JSONType.null_) 364 { 365 JSONValue newLease; 366 newLease["dir"] = "false"; 367 newLease["ttl"] = l.ttl; 368 newLease["id"] = leaseid; 369 JSONValue[] items; 370 newLease["items"] = items; 371 newLease["create_time"] = time(); 372 setLeaseKeyValue(LEASE_PREFIX ~ leaseid.to!string, newLease.toString); 373 374 ///update global leaseID 375 JSONValue leaseID; 376 leaseID["ID"] = leaseid; 377 SetValue(LEASE_GEN_ID_PREFIX, leaseID.toString); 378 return l; 379 } 380 } 381 return null; 382 } 383 384 bool revokeLease(long leaseid) 385 { 386 if (_lessor.Revoke(leaseid) is null) 387 Remove(LEASE_PREFIX ~ leaseid.to!string); 388 else 389 return false; 390 return true; 391 } 392 393 bool attachToLease(string key, long leaseid) 394 { 395 auto lease = getJsonValue(LEASE_PREFIX ~ leaseid.to!string); 396 if (lease.type != JSONType.null_) 397 { 398 if (_lessor !is null) 399 { 400 LeaseItem item = {Key: 401 key}; 402 _lessor.Attach(leaseid, [item]); 403 } 404 405 if (!canFind!((JSONValue b, string a) => b["key"].str == a)(lease["items"].array, key)) 406 { 407 JSONValue item; 408 item["key"] = key; 409 item["time"] = time(); 410 lease["items"].array ~= item; 411 SetValue(LEASE_PREFIX ~ leaseid.to!string, lease.toString); 412 } 413 return true; 414 } 415 return false; 416 } 417 418 bool detachFromLease(string key, long leaseid) 419 { 420 auto lease = getJsonValue(LEASE_PREFIX ~ leaseid.to!string); 421 if (lease.type != JSONType.null_) 422 { 423 if (_lessor !is null) 424 { 425 LeaseItem item = {Key: 426 key}; 427 _lessor.Detach(leaseid, [item]); 428 } 429 430 JSONValue[] oldItems = lease["items"].array; 431 JSONValue[] newItems; 432 foreach (JSONValue item; oldItems) 433 { 434 if (item["key"].str != key) 435 { 436 newItems ~= item; 437 } 438 } 439 lease["items"] = newItems; 440 SetValue(LEASE_PREFIX ~ leaseid.to!string, lease.toString); 441 return true; 442 } 443 return false; 444 } 445 446 bool foreverKey(string key) 447 { 448 auto value = getJsonValue(key); 449 if (value.type != JSONType.null_) 450 { 451 value["leaseID"] = long.init; 452 SetValue(key, value.toString); 453 return true; 454 } 455 return false; 456 } 457 458 long generateLeaseID() 459 { 460 auto value = getJsonValue(LEASE_GEN_ID_PREFIX); 461 if (value.type != JSONType.null_) 462 { 463 return value["ID"].integer + 1; 464 } 465 else 466 return 1; 467 } 468 469 LeaseTimeToLiveResponse leaseTimeToLive(long leaseid) 470 { 471 auto leaseItem = getJsonValue(LEASE_PREFIX ~ leaseid.to!string); 472 LeaseTimeToLiveResponse respon = new LeaseTimeToLiveResponse(); 473 if (leaseItem.type != JSONType.null_) 474 { 475 respon.ID = leaseid; 476 auto remainTTL = (leaseItem["create_time"].integer + leaseItem["ttl"].integer - time()); 477 respon.TTL = remainTTL > 0 ? remainTTL : 0; 478 respon.grantedTTL = leaseItem["ttl"].integer; 479 foreach (item; leaseItem["items"].array) 480 { 481 respon.keys ~= cast(ubyte[])(item["key"].str); 482 } 483 } 484 else 485 { 486 respon.ID = leaseid; 487 respon.TTL = -1; 488 respon.grantedTTL = 0; 489 } 490 return respon; 491 } 492 493 LeaseLeasesResponse leaseLeases() 494 { 495 auto leases = getJsonValue(LEASE_PREFIX[0 .. $ - 1]); 496 if (leases.type != JSONType.null_) 497 { 498 auto lease_keys = leases["children"].str; 499 LeaseLeasesResponse response = new LeaseLeasesResponse(); 500 if (lease_keys.length > 0) 501 { 502 auto leaseIDs = split(lease_keys, ";"); 503 foreach (leaseID; leaseIDs) 504 { 505 if (leaseID.length != 0) 506 { 507 auto leaseItem = getJsonValue(leaseID); 508 if (leaseItem.type != JSONType.null_) 509 { 510 LeaseStatus ls = new LeaseStatus(); 511 ls.ID = leaseItem["id"].integer; 512 response.leases ~= ls; 513 } 514 } 515 } 516 } 517 return response; 518 } 519 return null; 520 } 521 522 LeaseKeepAliveResponse renewLease(RpcRequest req) 523 { 524 if (_lessor !is null) 525 { 526 auto newTTL = _lessor.Renew(req.LeaseID); 527 if (newTTL > 0) 528 { 529 auto lease = getJsonValue(LEASE_PREFIX ~ req.LeaseID.to!string); 530 if (lease.type != JSONType.null_) 531 { 532 lease["ttl"] = newTTL; 533 lease["create_time"] = time(); 534 535 setLeaseKeyValue(LEASE_PREFIX ~ req.LeaseID.to!string, lease.toString); 536 537 LeaseKeepAliveResponse respon = new LeaseKeepAliveResponse(); 538 respon.ID = req.LeaseID; 539 respon.TTL = newTTL; 540 return respon; 541 } 542 } 543 } 544 logWarning("-----renewLease fail --------"); 545 return null; 546 } 547 548 PutResponse put(RpcRequest req) 549 { 550 string nodePath; 551 if(req.CMD == RpcReqCommand.ConfigPutRequest) 552 { 553 nodePath = getConfigKey(req.Key); 554 } 555 else if(req.CMD == RpcReqCommand.RegistryPutRequest) 556 { 557 nodePath = getRegistryKey(req.Key); 558 } 559 else 560 { 561 nodePath = getSafeKey(req.Key); 562 } 563 // Set new value 564 string error; 565 if (req.LeaseID != 0) 566 { 567 if (attachToLease(nodePath, req.LeaseID)) 568 { 569 auto ok = set(nodePath, req.Value, error, req.LeaseID); 570 if (ok) 571 { 572 PutResponse respon = new PutResponse(); 573 auto kv = new KeyValue(); 574 kv.key = cast(ubyte[])(req.Key); 575 kv.value = cast(ubyte[])(req.Value); 576 respon.prevKv = kv; 577 return respon; 578 } 579 } 580 } 581 else 582 { 583 auto ok = set(nodePath, req.Value, error); 584 if (ok) 585 { 586 PutResponse respon = new PutResponse(); 587 auto kv = new KeyValue(); 588 kv.key = cast(ubyte[])(req.Key); 589 kv.value = cast(ubyte[])(req.Value); 590 respon.prevKv = kv; 591 return respon; 592 } 593 } 594 return null; 595 } 596 597 DeleteRangeResponse deleteRange(RpcRequest req) 598 { 599 DeleteRangeResponse respon = new DeleteRangeResponse(); 600 601 string nodePath; 602 if(req.CMD == RpcReqCommand.ConfigDeleteRangeRequest) 603 { 604 nodePath = getConfigKey(req.Key); 605 } 606 else if(req.CMD == RpcReqCommand.RegistryDeleteRangeRequest) 607 { 608 nodePath = getRegistryKey(req.Key); 609 } 610 else 611 { 612 nodePath = getSafeKey(req.Key); 613 } 614 615 if (Exsit(nodePath)) 616 { 617 Remove(nodePath, true); 618 respon.deleted = 1; 619 } 620 else 621 respon.deleted = 0; 622 return respon; 623 } 624 625 protected: 626 void SetValue(string key, string value) 627 { 628 _rocksdb.putString(key, value); 629 } 630 631 void setFileKeyValue(string originKey, string value, long leaseid = long.init) 632 { 633 auto key = getSafeKey(originKey); 634 635 auto p = getParent(key); 636 if (p != string.init) 637 { 638 if (!Exsit(p)) 639 createDirWithRecur(p); 640 } 641 auto j = getJsonValue(p); 642 auto children = j["children"].str(); 643 auto segments = split(children, ";"); 644 if (find(segments, key).empty) 645 { 646 children ~= ";"; 647 children ~= key; 648 j["children"] = children; 649 SetValue(p, j.toString); 650 } 651 652 JSONValue filevalue; 653 filevalue["dir"] = "false"; 654 filevalue["key"] = originKey; 655 filevalue["value"] = value; 656 filevalue["leaseID"] = leaseid; 657 filevalue["create_time"] = time(); 658 659 SetValue(key, filevalue.toString); 660 } 661 662 void removekeyFromParent(string skey) 663 { 664 auto pkey = getParent(skey); 665 if (pkey != string.init) 666 { 667 auto pnode = getJsonValue(pkey); 668 if (pnode.type == JSONType.object && "children" in pnode) 669 { 670 auto children = pnode["children"].str; 671 auto segments = split(children, ";"); 672 auto childs = remove!(a => a == skey)(segments); 673 string newvalue; 674 foreach (child; childs) 675 { 676 if (child != ";" && child.length > 0) 677 { 678 newvalue ~= child; 679 newvalue ~= ";"; 680 } 681 } 682 pnode["children"] = newvalue; 683 SetValue(pkey, pnode.toString); 684 } 685 } 686 } 687 688 void setLeaseKeyValue(string key, string jsonValue) 689 { 690 auto p = getParent(key); 691 if (p != string.init) 692 { 693 if (!Exsit(p)) 694 createDirWithRecur(p); 695 } 696 auto j = getJsonValue(p); 697 auto children = j["children"].str(); 698 auto segments = split(children, ";"); 699 if (find(segments, key).empty) 700 { 701 children ~= ";"; 702 children ~= key; 703 j["children"] = children; 704 SetValue(p, j.toString); 705 } 706 707 SetValue(key, jsonValue); 708 } 709 710 }