1 module neton.server.NetonHttpServer; 2 3 import neton.network.NodeClient; 4 import neton.server.NetonConfig; 5 import neton.server.Health; 6 import neton.wal.Wal; 7 import neton.snap.SnapShotter; 8 import neton.wal.Record; 9 import neton.wal.Util; 10 import neton.store.Event; 11 import neton.store.Watcher; 12 import neton.store.Util; 13 import neton.store.Store; 14 import neton.network; 15 16 import hunt.io.ByteBuffer; 17 import hunt.logging.ConsoleLogger; 18 import hunt.util.Serialize; 19 import hunt.util.Timer; 20 import hunt.event.timer; 21 import hunt.event.EventLoop; 22 import hunt.event.timer.Common; 23 import hunt.raft; 24 import hunt.net; 25 26 import core.time; 27 import core.thread; 28 import core.sync.mutex; 29 30 import std.conv; 31 import std.file; 32 import std.stdio; 33 import std.json; 34 import std.algorithm.mutation; 35 import std.string; 36 37 enum defaultSnapCount = 10; 38 enum snapshotCatchUpEntriesN = 10000; 39 40 class NetonHttpServer : MessageReceiver 41 { 42 private static NetonHttpServer _gserver; 43 44 this() 45 { 46 } 47 48 void publishSnapshot(Snapshot snap) 49 { 50 if (IsEmptySnap(snap)) 51 return; 52 53 if (snap.Metadata.Index <= _appliedIndex) 54 { 55 logError("snapshot index [%d] should > progress.appliedIndex [%d] + 1", 56 snap.Metadata.Index, _appliedIndex); 57 } 58 59 _confState = snap.Metadata.CS; 60 _snapshotIndex = snap.Metadata.Index; 61 _appliedIndex = snap.Metadata.Index; 62 } 63 64 void saveSnap(Snapshot snap) 65 { 66 // must save the snapshot index to the WAL before saving the 67 // snapshot to maintain the invariant that we only Open the 68 // wal at previously-saved snapshot indexes. 69 WalSnapshot walSnap = { 70 index: 71 snap.Metadata.Index, term : snap.Metadata.Term, 72 }; 73 _wal.SaveSnapshot(walSnap); 74 _snapshotter.SaveSnap(snap); 75 76 } 77 78 Entry[] entriesToApply(Entry[] ents) 79 { 80 if (ents.length == 0) 81 return null; 82 83 auto firstIdx = ents[0].Index; 84 if (firstIdx > _appliedIndex + 1) 85 { 86 logError("first index of committed entry[%d] should <= progress.appliedIndex[%d] 1", 87 firstIdx, _appliedIndex); 88 } 89 90 if (_appliedIndex - firstIdx + 1 < ents.length) 91 return ents[_appliedIndex - firstIdx + 1 .. $]; 92 93 return null; 94 } 95 96 string makeJsonString(Event e) 97 { 98 if (e.error.length > 0) 99 return e.error; 100 JSONValue res; 101 try 102 { 103 JSONValue j; 104 j["action"] = e.action; 105 j["netonIndex"] = Store.instance.Index(); 106 JSONValue node; 107 node = e.getNodeValue(); 108 109 j["node"] = node; 110 return j.toString(); 111 } 112 catch (Exception e) 113 { 114 logError("catch %s", e.msg); 115 res["error"] = e.msg; 116 } 117 return res.toString; 118 } 119 120 bool publishEntries(Entry[] ents) 121 { 122 bool iswatch = false; 123 for (auto i = 0; i < ents.length; i++) 124 { 125 switch (ents[i].Type) 126 { 127 case EntryType.EntryNormal: 128 if (ents[i].Data.length == 0) 129 break; 130 131 RequestCommand command = unserialize!RequestCommand(cast(byte[]) ents[i].Data); 132 133 string res; 134 iswatch = false; 135 switch (command.Method) 136 { 137 case RequestMethod.METHOD_GET: 138 { 139 //string value; 140 auto param = tryGetJsonFormat(command.Params); 141 logInfo("http GET param : ", param); 142 bool recursive = false; 143 if (param.type == JSONType.object && "recursive" in param) 144 recursive = param["recursive"].str == "true" ? true : false; 145 if (param.type == JSONType.object && ("wait" in param) 146 && param["wait"].str == "true") 147 { 148 ulong waitIndex = 0; 149 if ("waitIndex" in param) 150 waitIndex = to!ulong(param["waitIndex"].str); 151 auto w = Store.instance.Watch(command.Key, recursive, false, waitIndex); 152 w.setWatchId(command.Hash); 153 _watchers ~= w; 154 iswatch = true; 155 } 156 else 157 { 158 auto e = Store.instance.Get(command.Key, recursive, false); 159 //value = e.getNodeValue(); 160 res = makeJsonString(e); 161 } 162 163 } 164 break; 165 case RequestMethod.METHOD_PUT: 166 { 167 if (isRemained(command.Key)) 168 res = "the " ~ command.Key ~ " is remained"; 169 else 170 { 171 auto param = tryGetJsonFormat(command.Params); 172 logInfo("http PUT param : ", param); 173 bool dir = false; 174 if (param.type == JSONType.object && "dir" in param) 175 dir = param["dir"].str == "true" ? true : false; 176 177 if (dir) 178 { 179 auto e = Store.instance.CreateDir(command.Key); 180 res = makeJsonString(e); 181 } 182 else 183 { 184 string value; 185 if (param.type == JSONType.object && "value" in param) 186 value = param["value"].str; 187 auto e = Store.instance.Set(command.Key, false, value); 188 res = makeJsonString(e); 189 } 190 } 191 192 } 193 break; 194 case RequestMethod.METHOD_UPDATESERVICE: 195 { 196 auto param = tryGetJsonFormat(command.Params); 197 logInfo("http update service param : ", param); 198 { 199 if (param.type == JSONType.object) 200 auto e = Store.instance.Set(command.Key, false, param.toString); 201 //res = makeJsonString(e); 202 } 203 204 } 205 break; 206 case RequestMethod.METHOD_DELETE: 207 { 208 auto param = tryGetJsonFormat(command.Params); 209 logInfo("http DELETE param : ", param); 210 bool recursive = false; 211 if (param.type == JSONType.object && "recursive" in param) 212 recursive = param["recursive"].str == "true" ? true : false; 213 auto e = Store.instance.Delete(command.Key, recursive); 214 res = makeJsonString(e); 215 } 216 break; 217 case RequestMethod.METHOD_POST: 218 { 219 auto param = tryGetJsonFormat(command.Params); 220 logInfo("http post param : ", param); 221 bool recursive = false; 222 if (param.type == JSONType.object) 223 { 224 if (startsWith(command.Key, "/register")) 225 { 226 auto e = Store.instance.Register(param); 227 res = makeJsonString(e); 228 if (e.isOk()) 229 { 230 auto nd = e.getNodeValue(); 231 if ("key" in nd && "value" in nd) 232 { 233 // addHealthCheck(nd["key"].str,nd["value"]); 234 } 235 } 236 } 237 else if (startsWith(command.Key, "/deregister")) 238 { 239 auto e = Store.instance.Deregister(param); 240 res = makeJsonString(e); 241 if (e.isOk) 242 { 243 auto nd = e.getNodeValue(); 244 if ("key" in nd) 245 { 246 // removeHealthCheck(nd["key"].str); 247 } 248 } 249 } 250 else 251 { 252 res = "can not sovle " ~ command.Key; 253 } 254 } 255 else 256 { 257 res = "can not sovle " ~ command.Key; 258 } 259 260 } 261 break; 262 default: 263 break; 264 } 265 266 //if leader 267 //if(_node.isLeader()) 268 { 269 auto http = (command.Hash in _request); 270 if (http != null) 271 { 272 logInfo(" http request params : ", http.params()); 273 if (!iswatch) 274 { 275 http.do_response(res); 276 http.close(); 277 _request.remove(command.Hash); 278 } 279 280 logInfo(" http request array length : ", _request.length); 281 } 282 } 283 // else 284 // { 285 // //logInfo("not leader handle http request : ",_ID); 286 // } 287 288 break; 289 //next 290 case EntryType.EntryConfChange: 291 ConfChange cc = unserialize!ConfChange(cast(byte[]) ents[i].Data); 292 _confState = _node.ApplyConfChange(cc); 293 switch (cc.Type) 294 { 295 case ConfChangeType.ConfChangeAddNode: 296 if (cc.Context.length > 0) 297 addPeer(cc.NodeID, cc.Context); 298 break; 299 case ConfChangeType.ConfChangeRemoveNode: 300 if (cc.NodeID == _ID) 301 { 302 logWarning(_ID, " I've been removed from the cluster! Shutting down."); 303 return false; 304 } 305 logWarning(_ID, " del node ", cc.NodeID); 306 delPeer(cc.NodeID); 307 break; 308 default: 309 break; 310 } 311 break; 312 default: 313 314 } 315 316 _appliedIndex = ents[i].Index; 317 318 } 319 320 return true; 321 } 322 323 void maybeTriggerSnapshot() 324 { 325 if (_appliedIndex - _snapshotIndex <= defaultSnapCount) 326 return; 327 328 logInfo("start snapshot [applied index: %d | last snapshot index: %d]", 329 _appliedIndex, _snapshotIndex); 330 331 auto data = loadSnapshot().Data; 332 Snapshot snap; 333 auto err = _storage.CreateSnapshot(_appliedIndex, &_confState, cast(string) data, snap); 334 if (err != ErrNil) 335 { 336 logError(err); 337 } 338 339 saveSnap(snap); 340 341 long compactIndex = 1; 342 if (_appliedIndex > snapshotCatchUpEntriesN) 343 compactIndex = _appliedIndex - snapshotCatchUpEntriesN; 344 345 _storage.Compact(compactIndex); 346 logInfo("compacted log at index ", compactIndex); 347 _snapshotIndex = _appliedIndex; 348 } 349 350 void Propose(RequestCommand command, HttpBase h) 351 { 352 auto err = _node.Propose(cast(string) serialize(command)); 353 if (err != ErrNil) 354 { 355 logError("---------", err); 356 } 357 else 358 { 359 //logInfo("--------- request hash ",command.Hash); 360 _request[command.Hash] = h; 361 } 362 } 363 364 void Propose(RequestCommand command) 365 { 366 auto err = _node.Propose(cast(string) serialize(command)); 367 if (err != ErrNil) 368 { 369 logError("---------", err); 370 } 371 } 372 373 void ReadIndex(RequestCommand command, HttpBase h) 374 { 375 _node.ReadIndex(cast(string) serialize(command)); 376 _request[command.Hash] = h; 377 } 378 379 void ProposeConfChange(ConfChange cc) 380 { 381 auto err = _node.ProposeConfChange(cc); 382 if (err != ErrNil) 383 { 384 logError(err); 385 } 386 } 387 388 Snapshot loadSnapshot() 389 { 390 auto snapshot = _snapshotter.loadSnap(); 391 392 return snapshot; 393 } 394 395 // openWAL returns a WAL ready for reading. 396 void openWAL(Snapshot snapshot) 397 { 398 if (isEmptyDir(_waldir)) 399 { 400 mkdir(_waldir); 401 402 auto wal = new WAL(_waldir, null); 403 404 if (wal is null) 405 { 406 logError("raftexample: create wal error ", _ID); 407 } 408 wal.Close(); 409 } 410 411 WalSnapshot walsnap; 412 413 walsnap.index = snapshot.Metadata.Index; 414 walsnap.term = snapshot.Metadata.Term; 415 416 logInfo("loading WAL at term ", walsnap.term, " and index ", walsnap.index); 417 418 _wal = new WAL(_waldir, walsnap, true); 419 420 if (_wal is null) 421 { 422 logError("raftexample: error loading wal ", _ID); 423 } 424 } 425 426 // replayWAL replays WAL entries into the raft instance. 427 void replayWAL() 428 { 429 logInfo("replaying WAL of member ", _ID); 430 auto snapshot = loadSnapshot(); 431 openWAL(snapshot); 432 433 //Snapshot *shot = null; 434 HardState hs; 435 Entry[] ents; 436 byte[] metadata; 437 438 _wal.ReadAll(metadata, hs, ents); 439 440 _storage = new MemoryStorage(); 441 442 if (!IsEmptySnap(snapshot)) 443 { 444 logInfo("******* exsit snapshot : ", snapshot); 445 _storage.ApplySnapshot(snapshot); 446 _confState = snapshot.Metadata.CS; 447 _snapshotIndex = snapshot.Metadata.Index; 448 _appliedIndex = snapshot.Metadata.Index; 449 } 450 451 _storage.setHadrdState(hs); 452 453 _storage.Append(ents); 454 if (ents.length > 0) 455 { 456 _lastIndex = ents[$ - 1].Index; 457 } 458 } 459 460 void start(bool join) 461 { 462 _ID = NetonConfig.instance.selfConf.id; 463 _snapdir = snapDirPath(_ID); 464 _waldir = walDirPath(_ID); 465 466 _mutex = new Mutex(); 467 468 if (!Exist(_snapdir)) 469 { 470 mkdir(_snapdir); 471 } 472 _snapshotter = new Snapshotter(_snapdir); 473 474 bool oldwal = isEmptyDir(_waldir); 475 476 replayWAL(); 477 478 Config conf = new Config(); 479 480 Store.instance.Init(_ID , null); 481 482 conf._ID = _ID; 483 conf._ElectionTick = 10; 484 conf._HeartbeatTick = 1; 485 conf._storage = _storage; 486 conf._MaxSizePerMsg = 1024 * 1024; 487 conf._MaxInflightMsgs = 256; 488 489 _buffer.length = 1024; 490 491 Peer[] peers; 492 Peer slf = {ID: 493 NetonConfig.instance.selfConf.id}; 494 peers ~= slf; 495 foreach (peer; NetonConfig.instance.peersConf) 496 { 497 Peer p = {ID: 498 peer.id}; 499 peers ~= p; 500 } 501 502 if (!oldwal) 503 { 504 _node = new RawNode(conf); 505 } 506 else 507 { 508 if (join) 509 { 510 peers.length = 0; 511 } 512 logInfo("self conf : ", conf, " peers conf : ", peers); 513 _node = new RawNode(conf, peers); 514 } 515 516 // API server (HTTP) 517 initApiServer(); 518 519 // Node server (TCP) 520 initNodeServer(); 521 522 // 523 foreach (peer; NetonConfig.instance.peersConf) 524 { 525 addPeer(peer.id, peer.ip ~ ":" ~ to!string(peer.nodeport)); 526 } 527 528 EventLoop timerLoop = new EventLoop(); 529 530 new Timer(timerLoop, 100.msecs).onTick(&ready).start(); 531 532 new Timer(timerLoop, 100.msecs).onTick( 533 &scanWatchers).start(); 534 535 new Timer(timerLoop, 1000.msecs).onTick(&onTimer).start(); 536 537 timerLoop.runAsync(-1); 538 } 539 540 private void initApiServer() { 541 _http = NetUtil.createNetServer!(ThreadMode.Single)(); 542 543 _http.setHandler(new class NetConnectionHandler { 544 private HttpBase _httpBase; 545 546 override void connectionOpened(Connection connection) { 547 infof("Connection created: %s", connection.getRemoteAddress()); 548 _httpBase = new HttpBase(connection, this.outer); 549 } 550 551 override void connectionClosed(Connection connection) { 552 infof("Connection closed: %s", connection.getRemoteAddress()); 553 _httpBase.onClose(); 554 } 555 556 override void messageReceived(Connection connection, Object message) { 557 tracef("message type: %s", typeid(message).name); 558 // string str = format("data received: %s", message.toString()); 559 // tracef(str); 560 ByteBuffer buffer = cast(ByteBuffer)message; 561 byte[] data = buffer.getRemaining(); 562 _httpBase.onRead(cast(ubyte[])data); 563 } 564 565 override void exceptionCaught(Connection connection, Throwable t) { 566 warning(t); 567 } 568 569 override void failedOpeningConnection(int connectionId, Throwable t) { 570 error(t); 571 } 572 573 override void failedAcceptingConnection(int connectionId, Throwable t) { 574 error(t); 575 } 576 }); 577 578 _http.listen("0.0.0.0", cast(int)NetonConfig.instance.selfConf.apiport); 579 } 580 581 private void initNodeServer() { 582 // _server = new NetServer!(ServerHandler, MessageReceiver)(_ID, this); 583 _server = NetUtil.createNetServer!(ThreadMode.Single)(); 584 585 _server.setHandler(new class NetConnectionHandler { 586 private ServerHandler _serverHandler; 587 588 override void connectionOpened(Connection connection) { 589 infof("Connection created: %s", connection.getRemoteAddress()); 590 _serverHandler = new ServerHandler(this.outer); 591 } 592 593 override void connectionClosed(Connection connection) { 594 infof("Connection closed: %s", connection.getRemoteAddress()); 595 _serverHandler.onClose(); 596 } 597 598 override void messageReceived(Connection connection, Object message) { 599 tracef("message type: %s", typeid(message).name); 600 // string str = format("data received: %s", message.toString()); 601 // tracef(str); 602 ByteBuffer buffer = cast(ByteBuffer)message; 603 byte[] data = buffer.getRemaining(); 604 _serverHandler.onRead(cast(ubyte[])data); 605 } 606 607 override void exceptionCaught(Connection connection, Throwable t) { 608 warning(t); 609 } 610 611 override void failedOpeningConnection(int connectionId, Throwable t) { 612 error(t); 613 } 614 615 override void failedAcceptingConnection(int connectionId, Throwable t) { 616 error(t); 617 } 618 }); 619 620 _server.listen("0.0.0.0", cast(int)NetonConfig.instance.selfConf.nodeport); 621 } 622 623 bool addPeer(ulong ID, string data) 624 { 625 if (ID in _clients) 626 return false; 627 628 auto client = new NodeClient(this._ID, ID); 629 string[] hostport = split(data, ":"); 630 // client.connect(hostport[0], to!int(hostport[1]), (Result!NetSocket result) { 631 // if (result.failed()) 632 // { 633 634 // logWarning("connect fail --> : ", data); 635 // new Thread(() { 636 // Thread.sleep(dur!"seconds"(1)); 637 // addPeer(ID, data); 638 // }).start(); 639 // return; 640 // } 641 // _clients[ID] = client; 642 // logInfo(this._ID, " client connected ", hostport[0], " ", hostport[1]); 643 // // return true; 644 // }); 645 646 client.connect(hostport[0], to!int(hostport[1]), (AsyncResult!Connection result) { 647 if (result.failed()) 648 { 649 650 logWarning("connect fail --> : ", data); 651 new Thread(() { 652 Thread.sleep(dur!"seconds"(1)); 653 addPeer(ID, data); 654 }).start(); 655 return; 656 } 657 _clients[ID] = client; 658 logInfo(this._ID, " client connected ", hostport[0], " ", hostport[1]); 659 // return true; 660 }); 661 662 return true; 663 } 664 665 bool delPeer(ulong ID) 666 { 667 if (ID !in _clients) 668 return false; 669 670 logInfo(_ID, " client disconnect ", ID); 671 _clients[ID].close(); 672 _clients.remove(ID); 673 674 return true; 675 } 676 677 void send(Message[] msg) 678 { 679 foreach (m; msg) 680 _clients[m.To].write(m); 681 } 682 683 void step(Message msg) 684 { 685 _mutex.lock(); 686 scope (exit) 687 _mutex.unlock(); 688 689 _node.Step(msg); 690 } 691 692 void onTimer(Object sender) 693 { 694 _mutex.lock(); 695 scope (exit) 696 _mutex.unlock(); 697 698 _node.Tick(); 699 } 700 701 void ready(Object sender) 702 { 703 _mutex.lock(); 704 scope (exit) 705 _mutex.unlock(); 706 707 Ready rd = _node.ready(); 708 if (!rd.containsUpdates()) 709 { 710 // logInfo("----- read not update"); 711 return; 712 } 713 //logInfo("------ready ------ ",_ID); 714 _wal.Save(rd.hs, rd.Entries); 715 if (!IsEmptySnap(rd.snap)) 716 { 717 saveSnap(rd.snap); 718 _storage.ApplySnapshot(rd.snap); 719 publishSnapshot(rd.snap); 720 } 721 _storage.Append(rd.Entries); 722 // logInfo("------ready ------ ",_ID); 723 724 send(rd.Messages); 725 if (!publishEntries(entriesToApply(rd.CommittedEntries))) 726 { 727 // _poll.stop(); 728 logError("----- poll stop"); 729 return; 730 } 731 732 //for readindex 733 foreach (r; rd.ReadStates) 734 { 735 string res; 736 bool iswatch = false; 737 if (r.Index >= _appliedIndex) 738 { 739 RequestCommand command = unserialize!RequestCommand(cast(byte[]) r.RequestCtx); 740 auto h = command.Hash in _request; 741 if (h == null) 742 { 743 continue; 744 } 745 auto param = tryGetJsonFormat(command.Params); 746 logInfo("http GET param : ", param); 747 bool recursive = false; 748 if (param.type == JSONType.object && "recursive" in param) 749 recursive = param["recursive"].str == "true" ? true : false; 750 if (param.type == JSONType.object && ("wait" in param) 751 && param["wait"].str == "true") 752 { 753 ulong waitIndex = 0; 754 if ("waitIndex" in param) 755 waitIndex = to!ulong(param["waitIndex"].str); 756 auto w = Store.instance.Watch(command.Key, recursive, false, waitIndex); 757 w.setWatchId(command.Hash); 758 _watchers ~= w; 759 iswatch = true; 760 } 761 else 762 { 763 auto e = Store.instance.Get(command.Key, recursive, false); 764 //value = e.getNodeValue(); 765 res = makeJsonString(e); 766 } 767 if (!iswatch) 768 { 769 h.do_response(res); 770 h.close(); 771 _request.remove(command.Hash); 772 } 773 } 774 } 775 776 maybeTriggerSnapshot(); 777 _node.Advance(rd); 778 779 // if(_node.isLeader()) 780 // { 781 // if(leader() != _lastLeader) 782 // { 783 // logWarning("-----*****start health check *****-----"); 784 // _lastLeader = leader(); 785 // starHealthCheck(); 786 // loadServices(SERVICE_PREFIX[0..$-1]); 787 // } 788 // } 789 // else 790 // { 791 // if(_healths.length > 0) 792 // { 793 // logWarning("-----*****stop health check *****-----"); 794 // synchronized(_mutex) 795 // { 796 // if(_healthPoll !is null) 797 // { 798 // _healthPoll.stop(); 799 // foreach(key;_healths.keys) 800 // { 801 // _healthPoll.delTimer(_healths[key].timerFd); 802 // } 803 // } 804 // _healths.clear; 805 // _healthPoll = null; 806 // } 807 // } 808 // } 809 } 810 811 void scanWatchers(Object sender) 812 { 813 //if(_node.isLeader()) 814 { 815 foreach (w; _watchers) 816 { 817 if (w.haveNotify) 818 { 819 logInfo("----- scaned notify key: ", w.key, " hash :", w.watchId); 820 auto http = (w.watchId in _request); 821 if (http != null) 822 { 823 auto es = w.events(); 824 foreach (e; es) 825 { 826 auto res = makeJsonString(e); 827 //logInfo("----- response msg : ",res); 828 http.do_response(res); 829 http.close(); 830 break; 831 } 832 _request.remove(w.watchId); 833 } 834 removeWatcher(w.watchId); 835 } 836 } 837 } 838 } 839 840 void handleHttpClose(size_t hash) 841 { 842 auto http = (hash in _request); 843 if (http != null) 844 _request.remove(hash); 845 removeWatcher(hash); 846 } 847 848 void removeWatcher(size_t hash) 849 { 850 foreach (w; _watchers) 851 { 852 if (w.watchId == hash) 853 w.Remove(); 854 } 855 auto wl = remove!(a => a.watchId == hash)(_watchers); 856 move(wl, _watchers); 857 logInfo("---watchers len : ", _watchers.length); 858 } 859 860 static NetonHttpServer instance() 861 { 862 if (_gserver is null) 863 _gserver = new NetonHttpServer(); 864 return _gserver; 865 } 866 867 ulong leader() 868 { 869 return _node._raft._lead; 870 } 871 872 void saveHttp(HttpBase h) 873 { 874 _request[h.toHash] = h; 875 } 876 877 // void starHealthCheck() 878 // { 879 // _healthPoll = new Epoll(100); 880 // _healthPoll.start(); 881 // } 882 883 // void addHealthCheck(string key,ref JSONValue value) 884 // { 885 // if(!_node.isLeader) 886 // return; 887 // if(_healthPoll !is null) 888 // { 889 // synchronized(_mutex) 890 // { 891 // auto health = new Health(key,value); 892 // if(key in _healths) 893 // { 894 // auto oldHlt = _healths[key]; 895 // _healthPoll.delTimer(oldHlt.timerFd); 896 // } 897 // _healths[key] = health; 898 // _healthPoll.addTimer(&health.onTimer,health.interval_ms(),WheelType.WHEEL_PERIODIC); 899 // } 900 // } 901 // logInfo("-----*****health check num : ",_healths.length); 902 // } 903 904 // void removeHealthCheck(string key) 905 // { 906 // if(!_node.isLeader) 907 // return; 908 // synchronized(_mutex) 909 // { 910 // if(key in _healths) 911 // { 912 // auto health = _healths[key]; 913 // if(_healthPoll !is null) 914 // { 915 // _healthPoll.delTimer(health.timerFd); 916 // } 917 // _healths.remove(key); 918 // } 919 // } 920 921 // logInfo("-----*****health check num : ",_healths.length); 922 // } 923 924 // void loadServices(string key) 925 // { 926 927 // JSONValue node = Store.instance().getJsonValue(key); 928 // if(node.type != JSONType.null_) 929 // { 930 // auto dir = node["dir"].str == "true" ? true:false; 931 // if(!dir) 932 // { 933 // if(startsWith(key,SERVICE_PREFIX)) 934 // { 935 // auto val = tryGetJsonFormat(node["value"].str); 936 // addHealthCheck(key,val); 937 // } 938 // else 939 // { 940 // } 941 // return ; 942 // } 943 // else 944 // { 945 // auto children = node["children"].str; 946 // if(children.length == 0) 947 // { 948 // return ; 949 // } 950 // else 951 // { 952 // JSONValue[] subs; 953 // auto segments = split(children, ";"); 954 // foreach(subkey;segments) 955 // { 956 // if(subkey.length != 0) 957 // { 958 // loadServices(subkey); 959 // } 960 // } 961 // return ; 962 // } 963 964 // } 965 // } 966 // else 967 // { 968 // return ; 969 // } 970 // } 971 972 MemoryStorage _storage; 973 ulong _ID; 974 // NetServer!(ServerHandler, MessageReceiver) _server; 975 // NetServer!(HttpBase, NetonHttpServer) _http; 976 NetServer _server; 977 NetServer _http; 978 979 NodeClient[ulong] _clients; 980 RawNode _node; 981 byte[] _buffer; 982 983 bool _join; 984 ulong _lastIndex; 985 ConfState _confState; 986 ulong _snapshotIndex; 987 ulong _appliedIndex; 988 989 HttpBase[ulong] _request; 990 991 string _waldir; // path to WAL directory 992 string _snapdir; // path to snapshot directory 993 Snapshotter _snapshotter; 994 WAL _wal; 995 Watcher[] _watchers; 996 997 // Poll _healthPoll; //eventloop for health check 998 Health[string] _healths; 999 ulong _lastLeader = 0; 1000 Mutex _mutex; 1001 }