1 module neton.server.NetonRpcServer;
2 
3 import hunt.raft;
4 import hunt.net;
5 import neton.network.NetServer;
6 import neton.network.NodeClient;
7 import neton.network.ServerHandler;
8 import neton.network.Interface;
9 import neton.store.Store;
10 
11 import core.time;
12 import core.thread;
13 import core.sync.mutex;
14 import std.string;
15 
16 import hunt.io.ByteBuffer;
17 import hunt.logging;
18 import hunt.util.Serialize;
19 import hunt.util.Timer;
20 import hunt.event.EventLoop;
21 import hunt.event.timer;
22 import hunt.event.timer.Common;
23 import neton.server.NetonConfig;
24 import neton.server.PeerServers;
25 import neton.server.WatchServer;
26 
27 import std.conv;
28 import neton.wal.Wal;
29 import neton.snap.SnapShotter;
30 import neton.wal.Record;
31 import neton.wal.Util;
32 import std.file;
33 import std.stdio;
34 import std.json;
35 import neton.store.Event;
36 import neton.store.Watcher;
37 import neton.store.Util;
38 import std.algorithm.mutation;
39 
40 import neton.rpcservice;
41 import neton.protocol.neton;
42 import neton.protocol.neton;
43 import neton.protocol.netonrpc;
44 import grpc;
45 
46 import neton.util.Future;
47 import neton.util.Queue;
48 
49 import neton.lease;
50 
51 enum defaultSnapCount = 10;
52 enum snapshotCatchUpEntriesN = 10000;
53 
54 alias Event = neton.store.Event.Event;
55 class NetonRpcServer : MessageReceiver
56 {
57     private
58     {
59         __gshared NetonRpcServer _gserver;
60 
61         MemoryStorage _storage;
62         ulong _ID;
63         // NetServer!(ServerHandler, MessageReceiver) _server;
64         NetServer _server;
65 
66         RawNode _node;
67 
68         bool _join;
69         ulong _lastIndex;
70         ConfState _confState;
71         ulong _snapshotIndex;
72         ulong _appliedIndex;
73 
74         Object[ulong] _request; /// key is hashId
75 
76         string _waldir; // path to WAL directory
77         string _snapdir; // path to snapshot directory
78         Snapshotter _snapshotter;
79         WAL _wal;
80 
81         ulong _lastLeader = 0;
82         Mutex _mutex;
83 
84         Lessor _lessor;
85         Queue!RpcRequest _proposeQueue;
86     }
87 
88 
89     void start(bool join)
90     {
91         _ID = NetonConfig.instance.selfConf.id;
92         _snapdir = snapDirPath(_ID);
93         _waldir = walDirPath(_ID);
94 
95         _mutex = new Mutex();
96 
97         if (!Exist(_snapdir))
98         {
99             mkdir(_snapdir);
100         }
101         _snapshotter = new Snapshotter(_snapdir);
102 
103         bool oldwal = isEmptyDir(_waldir);
104 
105         replayWAL();
106 
107         Config conf = new Config();
108 
109         LessorConfig lessorConf;
110         lessorConf.MinLeaseTTL = 1;
111         _lessor = NewLessor(lessorConf);
112 
113         Store.instance.Init(_ID, _lessor);
114 
115         conf._ID = _ID;
116         conf._ElectionTick = 10;
117         conf._HeartbeatTick = 1;
118         conf._storage = _storage;
119         conf._MaxSizePerMsg = 1024 * 1024;
120         conf._MaxInflightMsgs = 256;
121 
122         Peer[] peers;
123         Peer slf = {ID:
124         NetonConfig.instance.selfConf.id};
125         peers ~= slf;
126         foreach (peer; NetonConfig.instance.peersConf)
127         {
128             Peer p = {ID:
129             peer.id};
130             peers ~= p;
131         }
132 
133         if (!oldwal)
134         {
135             _node = new RawNode(conf);
136         }
137         else
138         {
139             if (join)
140             {
141                 peers.length = 0;
142             }
143             logInfo("self conf : ", conf, "   peers conf : ", peers);
144             _node = new RawNode(conf, peers);
145         }
146 
147         // _server = new NetServer!(ServerHandler, MessageReceiver)(_ID, this);
148         // _server.listen("0.0.0.0", NetonConfig.instance.selfConf.nodeport);
149 
150         initRpcServer();
151 
152         PeerServers.instance().setID(_ID);
153         foreach (peer; NetonConfig.instance.peersConf)
154         {
155             PeerServers.instance().addPeer(peer.id, peer.ip ~ ":" ~ to!string(peer.nodeport));
156         }
157 
158         WatchServer.instance().run();
159 
160         EventLoop timerLoop = new EventLoop();
161         new Timer(timerLoop, 100.msecs).onTick(&ready).start();
162         new Timer(timerLoop, 1.seconds).onTick(&onTimer).start();
163         new Timer(timerLoop, 200.msecs).onTick(&onLessor).start();
164         new Timer(timerLoop, 100.msecs).onTick(&onPropose)
165             .start();
166 
167         timerLoop.runAsync(-1);
168     }
169 
170     private void initRpcServer() {
171         _server = NetUtil.createNetServer!(ThreadMode.Single)();
172 
173         _server.setHandler(new class NetConnectionHandler {
174             private ServerHandler _serverHandler;
175 
176             override void connectionOpened(Connection connection) {
177                 infof("Connection created: %s", connection.getRemoteAddress());
178                 _serverHandler = new ServerHandler(this.outer);
179             }
180 
181             override void connectionClosed(Connection connection) {
182                 infof("Connection closed: %s", connection.getRemoteAddress());
183                 _serverHandler.onClose();
184             }
185 
186             override void messageReceived(Connection connection, Object message) {
187                 tracef("message type: %s", typeid(message).name);
188                 // string str = format("data received: %s", message.toString());
189                 // tracef(str);
190                 ByteBuffer buffer = cast(ByteBuffer)message;
191                 byte[] data = buffer.getRemaining();
192                 _serverHandler.onRead(cast(ubyte[])data);
193             }
194 
195             override void exceptionCaught(Connection connection, Throwable t) {
196                 warning(t);
197             }
198 
199             override void failedOpeningConnection(int connectionId, Throwable t) {
200                 error(t);
201             }
202 
203             override void failedAcceptingConnection(int connectionId, Throwable t) {
204                 error(t);
205             }			
206         });
207 
208         _server.listen("0.0.0.0", cast(int)NetonConfig.instance.selfConf.nodeport);
209     }
210 
211 
212     bool publishEntries(Entry[] ents)
213     {
214 
215         bool iswatch = false;
216         for (auto i = 0; i < ents.length; i++)
217         {
218             switch (ents[i].Type)
219             {
220             case EntryType.EntryNormal:
221                 {
222                     if (ents[i].Data.length == 0)
223                         break;
224 
225                     RpcRequest command = unserialize!RpcRequest(cast(byte[]) ents[i].Data);
226                     logDebug("publish CMD : ", command, " ID :", _ID);
227                     neton.store.Event.Event resultEvent;
228                     auto h = (command.Hash in _request);
229                     iswatch = false;
230                     switch (command.CMD)
231                     {
232                     case RpcReqCommand.RangeRequest:
233                     case RpcReqCommand.ConfigRangeRequest:
234                     case RpcReqCommand.RegistryRangeRequest:
235                         {
236                             auto respon = Store.instance.get(command);
237                             if (h != null)
238                             {
239                                 auto handler = cast(Future!(RangeRequest, RangeResponse))(*h);
240                                 if (handler !is null)
241                                 {
242                                     handler.done(respon);
243                                     _request.remove(command.Hash);
244 
245                                 }
246                                 else
247                                 {
248                                     logDebug("convert rpc handler is null ");
249                                 }
250                             }
251                             else
252                             {
253                                 // logDebug("rpc handler is null ");
254                             }
255                         }
256                         break;
257                     case RpcReqCommand.PutRequest:
258                     case RpcReqCommand.ConfigPutRequest:
259                     case RpcReqCommand.RegistryPutRequest:
260                         {
261                             auto respon = Store.instance.put(command);
262                             if (h != null)
263                             {
264                                 auto handler = cast(Future!(PutRequest, PutResponse))(*h);
265                                 if (handler !is null)
266                                 {
267                                     handler.done(respon);
268                                     _request.remove(command.Hash);
269                                 }
270                                 else
271                                 {
272                                     logDebug("convert rpc handler is null ");
273                                 }
274                             }
275                             else
276                             {
277                                 // logDebug("rpc handler is null ");
278                             }
279                         }
280                         break;
281                     case RpcReqCommand.DeleteRangeRequest:
282                     case RpcReqCommand.ConfigDeleteRangeRequest:
283                     case RpcReqCommand.RegistryDeleteRangeRequest:
284                         {
285                             auto respon = Store.instance.deleteRange(command);
286                             if (h != null)
287                             {
288                                 auto handler = cast(Future!(DeleteRangeRequest,
289                                         DeleteRangeResponse))(*h);
290                                 if (handler !is null)
291                                 {
292                                     handler.done(respon);
293                                     _request.remove(command.Hash);
294                                 }
295                                 else
296                                 {
297                                     logDebug("convert rpc handler is null ");
298                                 }
299                             }
300                             else
301                             {
302                                 // logDebug("rpc handler is null ");
303                             }
304                         }
305                         break;
306                     case RpcReqCommand.WatchRequest:
307                         {
308                             auto w = Store.instance.Watch(command.Key, false, true, 0);
309                             w.setHash(command.Hash);
310                             w.setWatchId(command.Value.to!long);
311                             WatchServer.instance().addWatcher(w);
312                         }
313                         break;
314                     case RpcReqCommand.LeaseGenIDRequest:
315                         {
316                             if (_node.isLeader) /// leader gen leaseID
317                             {
318                                 long leaseID = Store.instance.generateLeaseID();
319                                 command.LeaseID = leaseID;
320                                 command.CMD = RpcReqCommand.LeaseGrantRequest;
321                                 Propose(command);
322                             }
323                         }
324                         break;
325                     case RpcReqCommand.LeaseGrantRequest:
326                         {
327                             Lease l = Store.instance.grantLease(command.LeaseID, command.TTL);
328                             if (h != null)
329                             {
330                                 LeaseGrantResponse respon = new LeaseGrantResponse();
331                                 respon.ID = command.LeaseID;
332                                 if (l is null)
333                                     respon.error = "grant fail";
334                                 else
335                                     respon.TTL = l.ttl;
336 
337                                 auto handler = cast(Future!(LeaseGrantRequest, LeaseGrantResponse))(
338                                         *h);
339                                 if (handler !is null)
340                                 {
341                                     handler.done(respon);
342                                     _request.remove(command.Hash);
343                                 }
344                                 else
345                                 {
346                                     logDebug("convert rpc handler is null ");
347                                 }
348                             }
349                             else
350                             {
351                                 // logDebug("rpc handler is null ");
352                             }
353                         }
354                         break;
355                     case RpcReqCommand.LeaseRevokeRequest:
356                         {
357                             auto ok = Store.instance.revokeLease(command.LeaseID);
358                             if (!ok)
359                             {
360                                 logWarning("revoke lease fail : ", command.LeaseID);
361                             }
362                             if (h != null)
363                             {
364                                 LeaseRevokeResponse respon = new LeaseRevokeResponse();
365 
366                                 auto handler = cast(Future!(LeaseRevokeRequest,
367                                         LeaseRevokeResponse))(*h);
368                                 if (handler !is null)
369                                 {
370                                     handler.done(respon);
371                                     _request.remove(command.Hash);
372                                 }
373                                 else
374                                 {
375                                     logDebug("convert rpc handler is null ");
376                                 }
377                             }
378                             else
379                             {
380                                 // logDebug("rpc handler is null ");
381                             }
382                         }
383                         break;
384                     case RpcReqCommand.LeaseTimeToLiveRequest:
385                         {
386                             auto respon = Store.instance.leaseTimeToLive(command.LeaseID);
387                             if (respon is null)
388                             {
389                                 logWarning("LeaseTimeToLiveRequest fail : ", command.LeaseID);
390                             }
391                             if (h != null)
392                             {
393                                 auto handler = cast(Future!(LeaseTimeToLiveRequest,
394                                         LeaseTimeToLiveResponse))(*h);
395                                 if (handler !is null)
396                                 {
397                                     logDebug("LeaseTimeToLiveRequest reponse ID:", respon.ID, "  ttl :",
398                                             respon.TTL, " grantedTTL:", respon.grantedTTL);
399                                     handler.done(respon);
400                                     _request.remove(command.Hash);
401                                 }
402                                 else
403                                 {
404                                     logDebug("convert rpc handler is null ");
405                                 }
406                             }
407                             else
408                             {
409                                 // logDebug("rpc handler is null ");
410                             }
411                         }
412                         break;
413                     case RpcReqCommand.LeaseLeasesRequest:
414                         {
415                             auto respon = Store.instance.leaseLeases();
416                             if (respon is null)
417                             {
418                                 logWarning("LeaseLeasesRequest fail ");
419                             }
420                             if (h != null)
421                             {
422                                 auto handler = cast(Future!(LeaseLeasesRequest,
423                                         LeaseLeasesResponse))(*h);
424                                 if (handler !is null)
425                                 {
426                                     handler.done(respon);
427                                     _request.remove(command.Hash);
428                                 }
429                                 else
430                                 {
431                                     logDebug("convert rpc handler is null ");
432                                 }
433                             }
434                             else
435                             {
436                                 // logDebug("rpc handler is null ");
437                             }
438                         }
439                         break;
440                     case RpcReqCommand.LeaseKeepAliveRequest:
441                         {
442                             auto respon = Store.instance.renewLease(command);
443                             if (respon is null)
444                             {
445                                 logWarning("LeaseKeepAliveRequest fail : ", _ID);
446                             }
447                             if (h != null)
448                             {
449                                 auto handler = cast(Future!(ServerReaderWriter!(LeaseKeepAliveRequest,
450                                         LeaseKeepAliveResponse), LeaseKeepAliveResponse))(*h);
451                                 if (handler !is null)
452                                 {
453                                     handler.data().write(respon);
454                                     _request.remove(command.Hash);
455                                 }
456                                 else
457                                 {
458                                     logDebug("convert rpc handler is null ");
459                                 }
460                             }
461                             else
462                             {
463                                 // logDebug("rpc handler is null ");
464                             }
465                         }
466                         break;
467                     case RpcReqCommand.WatchCancelRequest:
468                         {
469                             WatchServer.instance().removeWatcher(command.Value.to!long);
470                             if (h != null)
471                             {
472                                 auto handler = cast(Future!(ServerReaderWriter!(WatchRequest,
473                                         WatchResponse), WatchInfo))(*h);
474                                 if (handler !is null)
475                                 {
476                                     WatchResponse respon = new WatchResponse();
477                                     WatchInfo watchInfo = handler.ExtraData();
478                                     respon.header = watchInfo.header;
479                                     respon.created = false;
480                                     respon.canceled = true;
481                                     respon.watchId = watchInfo.watchId;
482                                     logDebug("watch cancel --------------: ",
483                                             respon.watchId, " revision :", respon.header.revision);
484                                     handler.data().write(respon);
485                                     _request.remove(command.Hash);
486                                 }
487                                 else
488                                 {
489                                     logDebug("convert rpc handler is null ");
490                                 }
491                             }
492                             else
493                             {
494                                 // logDebug("rpc handler is null ");
495                             }
496                         }
497                         break;
498                     default:
499                         break;
500                     }
501 
502                     break;
503                 }
504             case EntryType.EntryConfChange:
505                 {
506                     ConfChange cc = unserialize!ConfChange(cast(byte[]) ents[i].Data);
507                     _confState = _node.ApplyConfChange(cc);
508                     switch (cc.Type)
509                     {
510                     case ConfChangeType.ConfChangeAddNode:
511                         if (cc.Context.length > 0)
512                             PeerServers.instance().addPeer(cc.NodeID, cc.Context);
513                         break;
514                     case ConfChangeType.ConfChangeRemoveNode:
515                         if (cc.NodeID == _ID)
516                         {
517                             logWarning(_ID, " I've been removed from the cluster! Shutting down.");
518                             return false;
519                         }
520                         logWarning(_ID, " del node ", cc.NodeID);
521                         PeerServers.instance().delPeer(cc.NodeID);
522                         break;
523                     default:
524                         break;
525                     }
526                     break;
527                 }
528             default:
529 
530             }
531 
532             _appliedIndex = ents[i].Index;
533 
534         }
535 
536         return true;
537     }
538 
539     void ready(Object sender)
540     {
541         _mutex.lock();
542         scope (exit)
543             _mutex.unlock();
544 
545         Ready rd = _node.ready();
546         if (!rd.containsUpdates())
547         {
548             // logInfo("----- read not update");
549             return;
550         }
551         // logInfo("------ready ------ ", _ID);
552         _wal.Save(rd.hs, rd.Entries);
553         if (!IsEmptySnap(rd.snap))
554         {
555             saveSnap(rd.snap);
556             _storage.ApplySnapshot(rd.snap);
557             publishSnapshot(rd.snap);
558         }
559         _storage.Append(rd.Entries);
560         // logInfo("------ready ------ ",_ID);
561 
562         PeerServers.instance().send(rd.Messages);
563         if (!publishEntries(entriesToApply(rd.CommittedEntries)))
564         {
565             // _poll.stop();
566             logError("----- poll stop");
567             return;
568         }
569 
570         //for readindex
571         foreach (r; rd.ReadStates)
572         {
573             string res;
574             if (r.Index >= _appliedIndex)
575             {
576                 RpcRequest command = unserialize!RpcRequest(cast(byte[]) r.RequestCtx);
577                 auto h = command.Hash in _request;
578                 if (h == null)
579                 {
580                     continue;
581                 }
582                 if (command.CMD == RpcReqCommand.RangeRequest 
583                     || command.CMD == RpcReqCommand.ConfigRangeRequest
584                     || command.CMD == RpcReqCommand.RegistryRangeRequest)
585                 {
586                     auto respon = Store.instance.get(command);
587 
588                     foreach (kv; respon.kvs)
589                         logDebug("KeyValue pair (%s , %s)".format(cast(string)(kv.key),
590                                 cast(string)(kv.value)));
591                     logDebug("handler keyValue len : ", respon.count);
592 
593                     auto handler = cast(Future!(RangeRequest, RangeResponse))(*h);
594                     if (handler !is null)
595                     {
596                         logDebug("response key: ", cast(string)(handler.data().key));
597                         handler.done(respon);
598                         _request.remove(command.Hash);
599                     }
600                     else
601                     {
602                         logDebug("convert rpc handler is null ");
603                     }
604                 }
605                 else if (command.CMD == RpcReqCommand.WatchRequest)
606                 {
607                     auto w = Store.instance.Watch(command.Key, false, true, 0);
608                     w.setHash(command.Hash);
609                     w.setWatchId(command.Value.to!long);
610                     WatchServer.instance().addWatcher(w);
611                 }
612 
613             }
614         }
615 
616         maybeTriggerSnapshot();
617         _node.Advance(rd);
618 
619         if (leader() != _lastLeader)
620         {
621             _lastLeader = leader();
622             if (_node.isLeader())
623             {
624                 if (_lessor !is null)
625                     _lessor.Promote(1);
626             }
627             else
628                 _lessor.Demote();
629         }
630 
631     }
632 
633     void onTimer(Object sender)
634     {
635         _mutex.lock();
636         scope (exit)
637             _mutex.unlock();
638 
639         _node.Tick();
640     }
641 
642     void onPropose(Object)
643     {
644         auto req = _proposeQueue.pop();
645         if (req != req.init)
646         {
647             _mutex.lock();
648             scope (exit)
649                 _mutex.unlock();
650             auto err = _node.Propose(cast(string) serialize(req));
651             if (err != ErrNil)
652             {
653                 logError("---------", err);
654             }
655         }
656     }
657 
658     void onLessor(Object sender)
659     {
660         _mutex.lock();
661         scope (exit)
662             _mutex.unlock();
663 
664         _lessor.runLoop();
665     }
666 
667     void Propose(RpcRequest command, Object h)
668     {
669         logDebug("***** RpcRequest.CMD : ", command.CMD, " key : ", command.Key);
670         if (command.CMD == RpcReqCommand.WatchCancelRequest)
671         {
672             if (command.Hash !in _request)
673                 _request[command.Hash] = h;
674         }
675         else
676             _request[command.Hash] = h;
677         Propose(command);
678     }
679 
680     void Propose(RpcRequest command)
681     {
682         _proposeQueue.push(command);
683     }
684 
685     void ReadIndex(RpcRequest command, Object h)
686     {
687         _mutex.lock();
688         scope (exit)
689             _mutex.unlock();
690 
691         _node.ReadIndex(cast(string) serialize(command));
692         _request[command.Hash] = h;
693     }
694 
695     void step(Message msg)
696     {
697         _mutex.lock();
698         scope (exit)
699             _mutex.unlock();
700 
701         _node.Step(msg);
702     }
703 
704     Object getRequest(size_t hash)
705     {
706         auto obj = (hash in _request);
707         if (obj != null)
708         {
709             return *obj;
710         }
711         else
712             return null;
713     }
714 
715     ulong leader()
716     {
717         _mutex.lock();
718         scope (exit)
719             _mutex.unlock();
720         return _node._raft._lead;
721     }
722 
723     static NetonRpcServer instance()
724     {
725         if (_gserver is null)
726             _gserver = new NetonRpcServer();
727         return _gserver;
728     }
729 
730 private:
731 
732     this()
733     {
734         _proposeQueue = new Queue!RpcRequest();
735     }
736 
737     void publishSnapshot(Snapshot snap)
738     {
739         if (IsEmptySnap(snap))
740             return;
741 
742         if (snap.Metadata.Index <= _appliedIndex)
743         {
744             logError("snapshot index [%d] should > progress.appliedIndex [%d] + 1",
745                     snap.Metadata.Index, _appliedIndex);
746         }
747 
748         _confState = snap.Metadata.CS;
749         _snapshotIndex = snap.Metadata.Index;
750         _appliedIndex = snap.Metadata.Index;
751     }
752 
753     void saveSnap(Snapshot snap)
754     {
755         // must save the snapshot index to the WAL before saving the
756         // snapshot to maintain the invariant that we only Open the
757         // wal at previously-saved snapshot indexes.
758         WalSnapshot walSnap = {
759         index:
760             snap.Metadata.Index, term : snap.Metadata.Term,
761         };
762         _wal.SaveSnapshot(walSnap);
763         _snapshotter.SaveSnap(snap);
764 
765     }
766 
767     Entry[] entriesToApply(Entry[] ents)
768     {
769         if (ents.length == 0)
770             return null;
771 
772         auto firstIdx = ents[0].Index;
773         if (firstIdx > _appliedIndex + 1)
774         {
775             logError("first index of committed entry[%d] should <= progress.appliedIndex[%d] 1",
776                     firstIdx, _appliedIndex);
777         }
778 
779         if (_appliedIndex - firstIdx + 1 < ents.length)
780             return ents[_appliedIndex - firstIdx + 1 .. $];
781 
782         return null;
783     }
784 
785     void ProposeConfChange(ConfChange cc)
786     {
787         _mutex.lock();
788         scope (exit)
789             _mutex.unlock();
790 
791         auto err = _node.ProposeConfChange(cc);
792         if (err != ErrNil)
793         {
794             logError(err);
795         }
796     }
797 
798     Snapshot loadSnapshot()
799     {
800         auto snapshot = _snapshotter.loadSnap();
801 
802         return snapshot;
803     }
804 
805     // openWAL returns a WAL ready for reading.
806     void openWAL(Snapshot snapshot)
807     {
808         if (isEmptyDir(_waldir))
809         {
810             mkdir(_waldir);
811 
812             auto wal = new WAL(_waldir, null);
813 
814             if (wal is null)
815             {
816                 logError("raftexample: create wal error ", _ID);
817             }
818             wal.Close();
819         }
820 
821         WalSnapshot walsnap;
822 
823         walsnap.index = snapshot.Metadata.Index;
824         walsnap.term = snapshot.Metadata.Term;
825 
826         logInfo("loading WAL at term ", walsnap.term, " and index ", walsnap.index);
827 
828         _wal = new WAL(_waldir, walsnap, true);
829 
830         if (_wal is null)
831         {
832             logError("raftexample: error loading wal ", _ID);
833         }
834     }
835 
836     // replayWAL replays WAL entries into the raft instance.
837     void replayWAL()
838     {
839         logInfo("replaying WAL of member ", _ID);
840         auto snapshot = loadSnapshot();
841         openWAL(snapshot);
842 
843         //Snapshot *shot = null;
844         HardState hs;
845         Entry[] ents;
846         byte[] metadata;
847 
848         _wal.ReadAll(metadata, hs, ents);
849 
850         _storage = new MemoryStorage();
851 
852         if (!IsEmptySnap(snapshot))
853         {
854             logInfo("******* exsit snapshot : ", snapshot);
855             _storage.ApplySnapshot(snapshot);
856             _confState = snapshot.Metadata.CS;
857             _snapshotIndex = snapshot.Metadata.Index;
858             _appliedIndex = snapshot.Metadata.Index;
859         }
860 
861         _storage.setHadrdState(hs);
862 
863         _storage.Append(ents);
864         if (ents.length > 0)
865         {
866             _lastIndex = ents[$ - 1].Index;
867         }
868     }
869 
870     void maybeTriggerSnapshot()
871     {
872         if (_appliedIndex - _snapshotIndex <= defaultSnapCount)
873             return;
874 
875         logInfof("start snapshot [applied index: %d | last snapshot index: %d]",
876                 _appliedIndex, _snapshotIndex);
877 
878         auto data = loadSnapshot().Data;
879         Snapshot snap;
880         auto err = _storage.CreateSnapshot(_appliedIndex, &_confState, cast(string) data, snap);
881         if (err != ErrNil)
882         {
883             logError(err);
884         }
885 
886         saveSnap(snap);
887 
888         long compactIndex = 1;
889         if (_appliedIndex > snapshotCatchUpEntriesN)
890             compactIndex = _appliedIndex - snapshotCatchUpEntriesN;
891 
892         _storage.Compact(compactIndex);
893         logInfo("compacted log at index ", compactIndex);
894         _snapshotIndex = _appliedIndex;
895     }
896 
897 }