1 module neton.server.NetonHttpServer;
2 
3 import neton.network.NodeClient;
4 import neton.server.NetonConfig;
5 import neton.server.Health;
6 import neton.wal.Wal;
7 import neton.snap.SnapShotter;
8 import neton.wal.Record;
9 import neton.wal.Util;
10 import neton.store.Event;
11 import neton.store.Watcher;
12 import neton.store.Util;
13 import neton.store.Store;
14 import neton.network;
15 
16 import hunt.io.ByteBuffer;
17 import hunt.logging.ConsoleLogger;
18 import hunt.util.Serialize;
19 import hunt.util.Timer;
20 import hunt.event.timer;
21 import hunt.event.EventLoop;
22 import hunt.event.timer.Common;
23 import hunt.raft;
24 import hunt.net;
25 
26 import core.time;
27 import core.thread;
28 import core.sync.mutex;
29 
30 import std.conv;
31 import std.file;
32 import std.stdio;
33 import std.json;
34 import std.algorithm.mutation;
35 import std.string;
36 
37 enum defaultSnapCount = 10;
38 enum snapshotCatchUpEntriesN = 10000;
39 
40 class NetonHttpServer : MessageReceiver
41 {
42     private static NetonHttpServer _gserver;
43 
44     this()
45     {
46     }
47 
48     void publishSnapshot(Snapshot snap)
49     {
50         if (IsEmptySnap(snap))
51             return;
52 
53         if (snap.Metadata.Index <= _appliedIndex)
54         {
55             logError("snapshot index [%d] should > progress.appliedIndex [%d] + 1",
56                     snap.Metadata.Index, _appliedIndex);
57         }
58 
59         _confState = snap.Metadata.CS;
60         _snapshotIndex = snap.Metadata.Index;
61         _appliedIndex = snap.Metadata.Index;
62     }
63 
64     void saveSnap(Snapshot snap)
65     {
66         // must save the snapshot index to the WAL before saving the
67         // snapshot to maintain the invariant that we only Open the
68         // wal at previously-saved snapshot indexes.
69         WalSnapshot walSnap = {
70         index:
71             snap.Metadata.Index, term : snap.Metadata.Term,
72         };
73         _wal.SaveSnapshot(walSnap);
74         _snapshotter.SaveSnap(snap);
75 
76     }
77 
78     Entry[] entriesToApply(Entry[] ents)
79     {
80         if (ents.length == 0)
81             return null;
82 
83         auto firstIdx = ents[0].Index;
84         if (firstIdx > _appliedIndex + 1)
85         {
86             logError("first index of committed entry[%d] should <= progress.appliedIndex[%d] 1",
87                     firstIdx, _appliedIndex);
88         }
89 
90         if (_appliedIndex - firstIdx + 1 < ents.length)
91             return ents[_appliedIndex - firstIdx + 1 .. $];
92 
93         return null;
94     }
95 
96     string makeJsonString(Event e)
97     {
98         if (e.error.length > 0)
99             return e.error;
100         JSONValue res;
101         try
102         {
103             JSONValue j;
104             j["action"] = e.action;
105             j["netonIndex"] = Store.instance.Index();
106             JSONValue node;
107             node = e.getNodeValue();
108 
109             j["node"] = node;
110             return j.toString();
111         }
112         catch (Exception e)
113         {
114             logError("catch %s", e.msg);
115             res["error"] = e.msg;
116         }
117         return res.toString;
118     }
119 
120     bool publishEntries(Entry[] ents)
121     {
122         bool iswatch = false;
123         for (auto i = 0; i < ents.length; i++)
124         {
125             switch (ents[i].Type)
126             {
127             case EntryType.EntryNormal:
128                 if (ents[i].Data.length == 0)
129                     break;
130 
131                 RequestCommand command = unserialize!RequestCommand(cast(byte[]) ents[i].Data);
132 
133                 string res;
134                 iswatch = false;
135                 switch (command.Method)
136                 {
137                 case RequestMethod.METHOD_GET:
138                     {
139                         //string value;
140                         auto param = tryGetJsonFormat(command.Params);
141                         logInfo("http GET param : ", param);
142                         bool recursive = false;
143                         if (param.type == JSONType.object && "recursive" in param)
144                             recursive = param["recursive"].str == "true" ? true : false;
145                         if (param.type == JSONType.object && ("wait" in param)
146                                 && param["wait"].str == "true")
147                         {
148                             ulong waitIndex = 0;
149                             if ("waitIndex" in param)
150                                 waitIndex = to!ulong(param["waitIndex"].str);
151                             auto w = Store.instance.Watch(command.Key, recursive, false, waitIndex);
152                             w.setWatchId(command.Hash);
153                             _watchers ~= w;
154                             iswatch = true;
155                         }
156                         else
157                         {
158                             auto e = Store.instance.Get(command.Key, recursive, false);
159                             //value = e.getNodeValue();
160                             res = makeJsonString(e);
161                         }
162 
163                     }
164                     break;
165                 case RequestMethod.METHOD_PUT:
166                     {
167                         if (isRemained(command.Key))
168                             res = "the " ~ command.Key ~ " is remained";
169                         else
170                         {
171                             auto param = tryGetJsonFormat(command.Params);
172                             logInfo("http PUT param : ", param);
173                             bool dir = false;
174                             if (param.type == JSONType.object && "dir" in param)
175                                 dir = param["dir"].str == "true" ? true : false;
176 
177                             if (dir)
178                             {
179                                 auto e = Store.instance.CreateDir(command.Key);
180                                 res = makeJsonString(e);
181                             }
182                             else
183                             {
184                                 string value;
185                                 if (param.type == JSONType.object && "value" in param)
186                                     value = param["value"].str;
187                                 auto e = Store.instance.Set(command.Key, false, value);
188                                 res = makeJsonString(e);
189                             }
190                         }
191 
192                     }
193                     break;
194                 case RequestMethod.METHOD_UPDATESERVICE:
195                     {
196                         auto param = tryGetJsonFormat(command.Params);
197                         logInfo("http update service param : ", param);
198                         {
199                             if (param.type == JSONType.object)
200                                 auto e = Store.instance.Set(command.Key, false, param.toString);
201                             //res = makeJsonString(e);
202                         }
203 
204                     }
205                     break;
206                 case RequestMethod.METHOD_DELETE:
207                     {
208                         auto param = tryGetJsonFormat(command.Params);
209                         logInfo("http DELETE param : ", param);
210                         bool recursive = false;
211                         if (param.type == JSONType.object && "recursive" in param)
212                             recursive = param["recursive"].str == "true" ? true : false;
213                         auto e = Store.instance.Delete(command.Key, recursive);
214                         res = makeJsonString(e);
215                     }
216                     break;
217                 case RequestMethod.METHOD_POST:
218                     {
219                         auto param = tryGetJsonFormat(command.Params);
220                         logInfo("http post param : ", param);
221                         bool recursive = false;
222                         if (param.type == JSONType.object)
223                         {
224                             if (startsWith(command.Key, "/register"))
225                             {
226                                 auto e = Store.instance.Register(param);
227                                 res = makeJsonString(e);
228                                 if (e.isOk())
229                                 {
230                                     auto nd = e.getNodeValue();
231                                     if ("key" in nd && "value" in nd)
232                                     {
233                                         // addHealthCheck(nd["key"].str,nd["value"]);
234                                     }
235                                 }
236                             }
237                             else if (startsWith(command.Key, "/deregister"))
238                             {
239                                 auto e = Store.instance.Deregister(param);
240                                 res = makeJsonString(e);
241                                 if (e.isOk)
242                                 {
243                                     auto nd = e.getNodeValue();
244                                     if ("key" in nd)
245                                     {
246                                         // removeHealthCheck(nd["key"].str);
247                                     }
248                                 }
249                             }
250                             else
251                             {
252                                 res = "can not sovle " ~ command.Key;
253                             }
254                         }
255                         else
256                         {
257                             res = "can not sovle " ~ command.Key;
258                         }
259 
260                     }
261                     break;
262                 default:
263                     break;
264                 }
265 
266                 //if leader
267                 //if(_node.isLeader())
268                 {
269                     auto http = (command.Hash in _request);
270                     if (http != null)
271                     {
272                         logInfo("  http request params : ", http.params());
273                         if (!iswatch)
274                         {
275                             http.do_response(res);
276                             http.close();
277                             _request.remove(command.Hash);
278                         }
279 
280                         logInfo("  http request array length : ", _request.length);
281                     }
282                 }
283                 // else
284                 // {
285                 // 	//logInfo("not leader handle http request : ",_ID);
286                 // }
287 
288                 break;
289                 //next
290             case EntryType.EntryConfChange:
291                 ConfChange cc = unserialize!ConfChange(cast(byte[]) ents[i].Data);
292                 _confState = _node.ApplyConfChange(cc);
293                 switch (cc.Type)
294                 {
295                 case ConfChangeType.ConfChangeAddNode:
296                     if (cc.Context.length > 0)
297                         addPeer(cc.NodeID, cc.Context);
298                     break;
299                 case ConfChangeType.ConfChangeRemoveNode:
300                     if (cc.NodeID == _ID)
301                     {
302                         logWarning(_ID, " I've been removed from the cluster! Shutting down.");
303                         return false;
304                     }
305                     logWarning(_ID, " del node ", cc.NodeID);
306                     delPeer(cc.NodeID);
307                     break;
308                 default:
309                     break;
310                 }
311                 break;
312             default:
313 
314             }
315 
316             _appliedIndex = ents[i].Index;
317 
318         }
319 
320         return true;
321     }
322 
323     void maybeTriggerSnapshot()
324     {
325         if (_appliedIndex - _snapshotIndex <= defaultSnapCount)
326             return;
327 
328         logInfo("start snapshot [applied index: %d | last snapshot index: %d]",
329                 _appliedIndex, _snapshotIndex);
330 
331         auto data = loadSnapshot().Data;
332         Snapshot snap;
333         auto err = _storage.CreateSnapshot(_appliedIndex, &_confState, cast(string) data, snap);
334         if (err != ErrNil)
335         {
336             logError(err);
337         }
338 
339         saveSnap(snap);
340 
341         long compactIndex = 1;
342         if (_appliedIndex > snapshotCatchUpEntriesN)
343             compactIndex = _appliedIndex - snapshotCatchUpEntriesN;
344 
345         _storage.Compact(compactIndex);
346         logInfo("compacted log at index ", compactIndex);
347         _snapshotIndex = _appliedIndex;
348     }
349 
350     void Propose(RequestCommand command, HttpBase h)
351     {
352         auto err = _node.Propose(cast(string) serialize(command));
353         if (err != ErrNil)
354         {
355             logError("---------", err);
356         }
357         else
358         {
359             //logInfo("--------- request hash ",command.Hash);
360             _request[command.Hash] = h;
361         }
362     }
363 
364     void Propose(RequestCommand command)
365     {
366         auto err = _node.Propose(cast(string) serialize(command));
367         if (err != ErrNil)
368         {
369             logError("---------", err);
370         }
371     }
372 
373     void ReadIndex(RequestCommand command, HttpBase h)
374     {
375         _node.ReadIndex(cast(string) serialize(command));
376         _request[command.Hash] = h;
377     }
378 
379     void ProposeConfChange(ConfChange cc)
380     {
381         auto err = _node.ProposeConfChange(cc);
382         if (err != ErrNil)
383         {
384             logError(err);
385         }
386     }
387 
388     Snapshot loadSnapshot()
389     {
390         auto snapshot = _snapshotter.loadSnap();
391 
392         return snapshot;
393     }
394 
395     // openWAL returns a WAL ready for reading.
396     void openWAL(Snapshot snapshot)
397     {
398         if (isEmptyDir(_waldir))
399         {
400             mkdir(_waldir);
401 
402             auto wal = new WAL(_waldir, null);
403 
404             if (wal is null)
405             {
406                 logError("raftexample: create wal error ", _ID);
407             }
408             wal.Close();
409         }
410 
411         WalSnapshot walsnap;
412 
413         walsnap.index = snapshot.Metadata.Index;
414         walsnap.term = snapshot.Metadata.Term;
415 
416         logInfo("loading WAL at term ", walsnap.term, " and index ", walsnap.index);
417 
418         _wal = new WAL(_waldir, walsnap, true);
419 
420         if (_wal is null)
421         {
422             logError("raftexample: error loading wal ", _ID);
423         }
424     }
425 
426     // replayWAL replays WAL entries into the raft instance.
427     void replayWAL()
428     {
429         logInfo("replaying WAL of member ", _ID);
430         auto snapshot = loadSnapshot();
431         openWAL(snapshot);
432 
433         //Snapshot *shot = null;
434         HardState hs;
435         Entry[] ents;
436         byte[] metadata;
437 
438         _wal.ReadAll(metadata, hs, ents);
439 
440         _storage = new MemoryStorage();
441 
442         if (!IsEmptySnap(snapshot))
443         {
444             logInfo("******* exsit snapshot : ", snapshot);
445             _storage.ApplySnapshot(snapshot);
446             _confState = snapshot.Metadata.CS;
447             _snapshotIndex = snapshot.Metadata.Index;
448             _appliedIndex = snapshot.Metadata.Index;
449         }
450 
451         _storage.setHadrdState(hs);
452 
453         _storage.Append(ents);
454         if (ents.length > 0)
455         {
456             _lastIndex = ents[$ - 1].Index;
457         }
458     }
459 
460     void start(bool join)
461     {
462         _ID = NetonConfig.instance.selfConf.id;
463         _snapdir = snapDirPath(_ID);
464         _waldir = walDirPath(_ID);
465 
466         _mutex = new Mutex();
467 
468         if (!Exist(_snapdir))
469         {
470             mkdir(_snapdir);
471         }
472         _snapshotter = new Snapshotter(_snapdir);
473 
474         bool oldwal = isEmptyDir(_waldir);
475 
476         replayWAL();
477 
478         Config conf = new Config();
479 
480         Store.instance.Init(_ID , null);
481 
482         conf._ID = _ID;
483         conf._ElectionTick = 10;
484         conf._HeartbeatTick = 1;
485         conf._storage = _storage;
486         conf._MaxSizePerMsg = 1024 * 1024;
487         conf._MaxInflightMsgs = 256;
488 
489         _buffer.length = 1024;
490 
491         Peer[] peers;
492         Peer slf = {ID:
493         NetonConfig.instance.selfConf.id};
494         peers ~= slf;
495         foreach (peer; NetonConfig.instance.peersConf)
496         {
497             Peer p = {ID:
498             peer.id};
499             peers ~= p;
500         }
501 
502         if (!oldwal)
503         {
504             _node = new RawNode(conf);
505         }
506         else
507         {
508             if (join)
509             {
510                 peers.length = 0;
511             }
512             logInfo("self conf : ", conf, "   peers conf : ", peers);
513             _node = new RawNode(conf, peers);
514         }
515 
516         // API server (HTTP)
517         initApiServer();
518 
519         // Node server (TCP)
520         initNodeServer();
521 
522         //
523         foreach (peer; NetonConfig.instance.peersConf)
524         {
525             addPeer(peer.id, peer.ip ~ ":" ~ to!string(peer.nodeport));
526         }
527 
528         EventLoop timerLoop = new EventLoop();
529 
530         new Timer(timerLoop, 100.msecs).onTick(&ready).start();
531 
532         new Timer(timerLoop, 100.msecs).onTick(
533                 &scanWatchers).start();
534 
535         new Timer(timerLoop, 1000.msecs).onTick(&onTimer).start();
536 
537         timerLoop.runAsync(-1);
538     }
539 
540     private void initApiServer() {
541         _http = NetUtil.createNetServer!(ThreadMode.Single)();
542 
543         _http.setHandler(new class NetConnectionHandler {
544             private HttpBase _httpBase;
545 
546             override void connectionOpened(Connection connection) {
547                 infof("Connection created: %s", connection.getRemoteAddress());
548                 _httpBase = new HttpBase(connection, this.outer);
549             }
550 
551             override void connectionClosed(Connection connection) {
552                 infof("Connection closed: %s", connection.getRemoteAddress());
553                 _httpBase.onClose();
554             }
555 
556             override void messageReceived(Connection connection, Object message) {
557                 tracef("message type: %s", typeid(message).name);
558                 // string str = format("data received: %s", message.toString());
559                 // tracef(str);
560                 ByteBuffer buffer = cast(ByteBuffer)message;
561                 byte[] data = buffer.getRemaining();
562                 _httpBase.onRead(cast(ubyte[])data);
563             }
564 
565             override void exceptionCaught(Connection connection, Throwable t) {
566                 warning(t);
567             }
568 
569             override void failedOpeningConnection(int connectionId, Throwable t) {
570                 error(t);
571             }
572 
573             override void failedAcceptingConnection(int connectionId, Throwable t) {
574                 error(t);
575             }			
576         });
577 
578         _http.listen("0.0.0.0", cast(int)NetonConfig.instance.selfConf.apiport);
579     }
580 
581     private void initNodeServer() {
582         // _server = new NetServer!(ServerHandler, MessageReceiver)(_ID, this);
583         _server = NetUtil.createNetServer!(ThreadMode.Single)();
584 
585         _server.setHandler(new class NetConnectionHandler {
586             private ServerHandler _serverHandler;
587 
588             override void connectionOpened(Connection connection) {
589                 infof("Connection created: %s", connection.getRemoteAddress());
590                 _serverHandler = new ServerHandler(this.outer);
591             }
592 
593             override void connectionClosed(Connection connection) {
594                 infof("Connection closed: %s", connection.getRemoteAddress());
595                 _serverHandler.onClose();
596             }
597 
598             override void messageReceived(Connection connection, Object message) {
599                 tracef("message type: %s", typeid(message).name);
600                 // string str = format("data received: %s", message.toString());
601                 // tracef(str);
602                 ByteBuffer buffer = cast(ByteBuffer)message;
603                 byte[] data = buffer.getRemaining();
604                 _serverHandler.onRead(cast(ubyte[])data);
605             }
606 
607             override void exceptionCaught(Connection connection, Throwable t) {
608                 warning(t);
609             }
610 
611             override void failedOpeningConnection(int connectionId, Throwable t) {
612                 error(t);
613             }
614 
615             override void failedAcceptingConnection(int connectionId, Throwable t) {
616                 error(t);
617             }			
618         });
619 
620         _server.listen("0.0.0.0", cast(int)NetonConfig.instance.selfConf.nodeport);
621     }
622 
623     bool addPeer(ulong ID, string data)
624     {
625         if (ID in _clients)
626             return false;
627 
628         auto client = new NodeClient(this._ID, ID);
629         string[] hostport = split(data, ":");
630         // client.connect(hostport[0], to!int(hostport[1]), (Result!NetSocket result) {
631         //     if (result.failed())
632         //     {
633 
634         //         logWarning("connect fail --> : ", data);
635         //         new Thread(() {
636         //             Thread.sleep(dur!"seconds"(1));
637         //             addPeer(ID, data);
638         //         }).start();
639         //         return;
640         //     }
641         //     _clients[ID] = client;
642         //     logInfo(this._ID, " client connected ", hostport[0], " ", hostport[1]);
643         //     // return true;
644         // });
645 
646         client.connect(hostport[0], to!int(hostport[1]), (AsyncResult!Connection result) {
647             if (result.failed())
648             {
649 
650                 logWarning("connect fail --> : ", data);
651                 new Thread(() {
652                     Thread.sleep(dur!"seconds"(1));
653                     addPeer(ID, data);
654                 }).start();
655                 return;
656             }
657             _clients[ID] = client;
658             logInfo(this._ID, " client connected ", hostport[0], " ", hostport[1]);
659             // return true;
660         });
661 
662         return true;
663     }
664 
665     bool delPeer(ulong ID)
666     {
667         if (ID !in _clients)
668             return false;
669 
670         logInfo(_ID, " client disconnect ", ID);
671         _clients[ID].close();
672         _clients.remove(ID);
673 
674         return true;
675     }
676 
677     void send(Message[] msg)
678     {
679         foreach (m; msg)
680             _clients[m.To].write(m);
681     }
682 
683     void step(Message msg)
684     {
685         _mutex.lock();
686         scope (exit)
687             _mutex.unlock();
688 
689         _node.Step(msg);
690     }
691 
692     void onTimer(Object sender)
693     {
694         _mutex.lock();
695         scope (exit)
696             _mutex.unlock();
697 
698         _node.Tick();
699     }
700 
701     void ready(Object sender)
702     {
703         _mutex.lock();
704         scope (exit)
705             _mutex.unlock();
706 
707         Ready rd = _node.ready();
708         if (!rd.containsUpdates())
709         {
710             // logInfo("----- read not update");
711             return;
712         }
713         //logInfo("------ready ------ ",_ID);
714         _wal.Save(rd.hs, rd.Entries);
715         if (!IsEmptySnap(rd.snap))
716         {
717             saveSnap(rd.snap);
718             _storage.ApplySnapshot(rd.snap);
719             publishSnapshot(rd.snap);
720         }
721         _storage.Append(rd.Entries);
722         // logInfo("------ready ------ ",_ID);
723 
724         send(rd.Messages);
725         if (!publishEntries(entriesToApply(rd.CommittedEntries)))
726         {
727             // _poll.stop();
728             logError("----- poll stop");
729             return;
730         }
731 
732         //for readindex
733         foreach (r; rd.ReadStates)
734         {
735             string res;
736             bool iswatch = false;
737             if (r.Index >= _appliedIndex)
738             {
739                 RequestCommand command = unserialize!RequestCommand(cast(byte[]) r.RequestCtx);
740                 auto h = command.Hash in _request;
741                 if (h == null)
742                 {
743                     continue;
744                 }
745                 auto param = tryGetJsonFormat(command.Params);
746                 logInfo("http GET param : ", param);
747                 bool recursive = false;
748                 if (param.type == JSONType.object && "recursive" in param)
749                     recursive = param["recursive"].str == "true" ? true : false;
750                 if (param.type == JSONType.object && ("wait" in param)
751                         && param["wait"].str == "true")
752                 {
753                     ulong waitIndex = 0;
754                     if ("waitIndex" in param)
755                         waitIndex = to!ulong(param["waitIndex"].str);
756                     auto w = Store.instance.Watch(command.Key, recursive, false, waitIndex);
757                     w.setWatchId(command.Hash);
758                     _watchers ~= w;
759                     iswatch = true;
760                 }
761                 else
762                 {
763                     auto e = Store.instance.Get(command.Key, recursive, false);
764                     //value = e.getNodeValue();
765                     res = makeJsonString(e);
766                 }
767                 if (!iswatch)
768                 {
769                     h.do_response(res);
770                     h.close();
771                     _request.remove(command.Hash);
772                 }
773             }
774         }
775 
776         maybeTriggerSnapshot();
777         _node.Advance(rd);
778 
779         // if(_node.isLeader())
780         // {
781         // 	if(leader() != _lastLeader)
782         // 	{
783         // 		logWarning("-----*****start health check *****-----");
784         // 		_lastLeader = leader();
785         // 		starHealthCheck();
786         // 		loadServices(SERVICE_PREFIX[0..$-1]);
787         // 	}
788         // }
789         // else
790         // {
791         // 	if(_healths.length > 0)
792         // 	{
793         // 		logWarning("-----*****stop health check *****-----");
794         // 		synchronized(_mutex)
795         // 		{
796         // 			if(_healthPoll !is null)
797         // 			{
798         // 				_healthPoll.stop();
799         // 				foreach(key;_healths.keys)
800         // 				{
801         // 					_healthPoll.delTimer(_healths[key].timerFd);
802         // 				}
803         // 			}
804         // 			_healths.clear;
805         // 			_healthPoll = null;
806         // 		}
807         // 	}
808         // }
809     }
810 
811     void scanWatchers(Object sender)
812     {
813         //if(_node.isLeader())
814         {
815             foreach (w; _watchers)
816             {
817                 if (w.haveNotify)
818                 {
819                     logInfo("----- scaned notify key: ", w.key, " hash :", w.watchId);
820                     auto http = (w.watchId in _request);
821                     if (http != null)
822                     {
823                         auto es = w.events();
824                         foreach (e; es)
825                         {
826                             auto res = makeJsonString(e);
827                             //logInfo("----- response msg : ",res);
828                             http.do_response(res);
829                             http.close();
830                             break;
831                         }
832                         _request.remove(w.watchId);
833                     }
834                     removeWatcher(w.watchId);
835                 }
836             }
837         }
838     }
839 
840     void handleHttpClose(size_t hash)
841     {
842         auto http = (hash in _request);
843         if (http != null)
844             _request.remove(hash);
845         removeWatcher(hash);
846     }
847 
848     void removeWatcher(size_t hash)
849     {
850         foreach (w; _watchers)
851         {
852             if (w.watchId == hash)
853                 w.Remove();
854         }
855         auto wl = remove!(a => a.watchId == hash)(_watchers);
856         move(wl, _watchers);
857         logInfo("---watchers len : ", _watchers.length);
858     }
859 
860     static NetonHttpServer instance()
861     {
862         if (_gserver is null)
863             _gserver = new NetonHttpServer();
864         return _gserver;
865     }
866 
867     ulong leader()
868     {
869         return _node._raft._lead;
870     }
871 
872     void saveHttp(HttpBase h)
873     {
874         _request[h.toHash] = h;
875     }
876 
877     // void starHealthCheck()
878     // {
879     // 	_healthPoll = new Epoll(100);
880     // 	_healthPoll.start();
881     // }
882 
883     // void addHealthCheck(string key,ref JSONValue value)
884     // {
885     // 	if(!_node.isLeader)
886     // 		return;
887     // 	if(_healthPoll !is null)
888     // 	{
889     // 		 synchronized(_mutex)
890     // 		 {
891     // 			 auto health = new Health(key,value);
892     // 			if(key in _healths)
893     // 			{
894     // 				auto oldHlt = _healths[key];
895     // 				_healthPoll.delTimer(oldHlt.timerFd);
896     // 			}
897     // 			_healths[key] = health;
898     // 			_healthPoll.addTimer(&health.onTimer,health.interval_ms(),WheelType.WHEEL_PERIODIC);
899     // 		 }
900     // 	}
901     // 	logInfo("-----*****health check num : ",_healths.length);
902     // }
903 
904     // void removeHealthCheck(string key)
905     // {
906     // 	if(!_node.isLeader)
907     // 		return;
908     // 	synchronized(_mutex)
909     // 	{
910     // 		if(key in _healths)
911     // 		{
912     // 			auto health = _healths[key];
913     // 			if(_healthPoll !is null)
914     // 			{
915     // 				_healthPoll.delTimer(health.timerFd);
916     // 			}
917     // 			_healths.remove(key);
918     // 		}
919     // 	}
920 
921     // 	logInfo("-----*****health check num : ",_healths.length);
922     // }
923 
924     // void loadServices(string key)
925     // {
926 
927     //     JSONValue node = Store.instance().getJsonValue(key);
928     //     if(node.type != JSONType.null_)
929     //     {
930     //         auto dir = node["dir"].str == "true" ? true:false;
931     //         if(!dir)
932     //         {
933     //             if(startsWith(key,SERVICE_PREFIX))
934     // 			{
935     // 				auto val = tryGetJsonFormat(node["value"].str);
936     // 				addHealthCheck(key,val);
937     // 			}
938     //             else
939     // 			{
940     // 			}
941     //             return ;
942     //         }
943     //         else
944     //         {
945     //             auto children = node["children"].str;
946     //             if(children.length == 0)
947     //             {
948     //                 return ;
949     //             }
950     //             else
951     //             {
952     //                 JSONValue[] subs;
953     //                 auto segments = split(children, ";");
954     //                 foreach(subkey;segments)
955     //                 {
956     //                     if(subkey.length != 0)
957     //                     {
958     //                         loadServices(subkey);
959     //                     }
960     //                 }
961     //                 return ;
962     //             }
963 
964     //         }
965     //     }
966     //     else
967     //     {
968     //         return ;
969     //     }
970     // }
971 
972     MemoryStorage _storage;
973     ulong _ID;
974     // NetServer!(ServerHandler, MessageReceiver) _server;
975     // NetServer!(HttpBase, NetonHttpServer) _http;
976     NetServer _server;
977     NetServer _http;
978 
979     NodeClient[ulong] _clients;
980     RawNode _node;
981     byte[] _buffer;
982 
983     bool _join;
984     ulong _lastIndex;
985     ConfState _confState;
986     ulong _snapshotIndex;
987     ulong _appliedIndex;
988 
989     HttpBase[ulong] _request;
990 
991     string _waldir; // path to WAL directory
992     string _snapdir; // path to snapshot directory
993     Snapshotter _snapshotter;
994     WAL _wal;
995     Watcher[] _watchers;
996 
997     // Poll 									_healthPoll;	 	//eventloop for health check
998     Health[string] _healths;
999     ulong _lastLeader = 0;
1000     Mutex _mutex;
1001 }