1 module neton.lease.Lessor;
2 
3 import std.typecons;
4 import std.math;
5 import core.sync.mutex;
6 import core.sync.rwmutex;
7 import std.bitmanip;
8 import core.thread;
9 
10 import hunt.util.DateTime;
11 import hunt.logging;
12 import hunt.Exceptions;
13 import hunt.collection;
14 
15 import neton.protocol.neton;
16 import neton.server.NetonRpcServer;
17 import neton.rpcservice.Command;
18 
19 import neton.lease.Heap;
20 import neton.lease.LeaseQueue;
21 
22 alias int64 = long;
23 alias LeaseID = long;
24 alias float64 = long;
25 
26 // NoLease is a special LeaseID representing the absence of a lease.
27 const LeaseID NoLease = LeaseID(0);
28 
29 // MaxLeaseTTL is the maximum lease TTL value
30 const long MaxLeaseTTL = 9000000000;
31 
32 enum long FOREVER = long.max;
33 
34 enum string leaseBucketName = "lease";
35 
36 // maximum number of leases to revoke per second; configurable for tests
37 enum int leaseRevokeRate = 1000;
38 
39 // maximum number of lease checkpoints recorded to the consensus log per second; configurable for tests
40 enum int leaseCheckpointRate = 1000;
41 
42 // maximum number of lease checkpoints to batch into a single consensus log entry
43 enum int maxLeaseCheckpointBatchSize = 1000;
44 
45 enum string ErrNotPrimary = "not a primary lessor";
46 enum string ErrLeaseNotFound = "lease not found";
47 enum string ErrLeaseExists = "lease already exists";
48 enum string ErrLeaseTTLTooLarge = "too large lease TTL";
49 
50 // TxnDelete is a TxnWrite that only permits deletes. Defined here
51 // to avoid circular dependency with mvcc.
52 interface TxnDelete
53 {
54 	Tuple!(int64, int64) DeleteRange(byte[] key, byte[] end);
55 	void End();
56 }
57 
58 // RangeDeleter is a TxnDelete constructor.
59 alias RangeDeleter = TxnDelete delegate();
60 // type RangeDeleter func() TxnDelete
61 
62 // Checkpointer permits checkpointing of lease remaining TTLs to the consensus log. Defined here to
63 // avoid circular dependency with mvcc.
64 alias Checkpointer = void delegate( /* context.Context ctx, */ LeaseCheckpointRequest lc);
65 // type Checkpointer func(ctx , lc *)
66 
67 // Lessor owns leases. It can grant, revoke, renew and modify leases for lessee.
68 // type Lessor interface {
69 // 	// SetRangeDeleter lets the lessor create TxnDeletes to the store.
70 // 	// Lessor deletes the items in the revoked or expired lease by creating
71 // 	// new TxnDeletes.
72 // 	SetRangeDeleter(rd RangeDeleter)
73 
74 // 	SetCheckpointer(cp Checkpointer)
75 
76 // 	// Grant grants a lease that expires at least after TTL seconds.
77 // 	Grant(id LeaseID, ttl int64) (*Lease, error)
78 // 	// Revoke revokes a lease with given ID. The item attached to the
79 // 	// given lease will be removed. If the ID does not exist, an error
80 // 	// will be returned.
81 // 	Revoke(id LeaseID) error
82 
83 // 	// Checkpoint applies the remainingTTL of a lease. The remainingTTL is used in Promote to set
84 // 	// the expiry of leases to less than the full TTL when possible.
85 // 	Checkpoint(id LeaseID, remainingTTL int64) error
86 
87 // 	// Attach attaches given leaseItem to the lease with given LeaseID.
88 // 	// If the lease does not exist, an error will be returned.
89 // 	Attach(id LeaseID, items []LeaseItem) error
90 
91 // 	// GetLease returns LeaseID for given item.
92 // 	// If no lease found, NoLease value will be returned.
93 // 	GetLease(item LeaseItem) LeaseID
94 
95 // 	// Detach detaches given leaseItem from the lease with given LeaseID.
96 // 	// If the lease does not exist, an error will be returned.
97 // 	Detach(id LeaseID, items []LeaseItem) error
98 
99 // 	// Promote promotes the lessor to be the primary lessor. Primary lessor manages
100 // 	// the expiration and renew of leases.
101 // 	// Newly promoted lessor renew the TTL of all lease to extend + previous TTL.
102 // 	Promote(extend time.Duration)
103 
104 // 	// Demote demotes the lessor from being the primary lessor.
105 // 	Demote()
106 
107 // 	// Renew renews a lease with given ID. It returns the renewed TTL. If the ID does not exist,
108 // 	// an error will be returned.
109 // 	Renew(id LeaseID) (int64, error)
110 
111 // 	// Lookup gives the lease at a given lease id, if any
112 // 	Lookup(id LeaseID) *Lease
113 
114 // 	// Leases lists all leases.
115 // 	Leases() []*Lease
116 
117 // 	// ExpiredLeasesC returns a chan that is used to receive expired leases.
118 // 	ExpiredLeasesC() <-chan []*Lease
119 
120 // 	// Recover recovers the lessor state from the given backend and RangeDeleter.
121 // 	Recover(b backend.Backend, rd RangeDeleter)
122 
123 // 	// Stop stops the lessor for managing leases. The behavior of calling Stop multiple
124 // 	// times is undefined.
125 // 	Stop()
126 // }
127 
128 // lessor implements Lessor interface.
129 // TODO: use clockwork for testability.
130 class Lessor
131 {
132 	// mu sync.RWMutex
133 	Mutex _mutex;
134 	// demotec is set when the lessor is the primary.
135 	// demotec will be closed if the lessor is demoted.
136 	// demotec chan struct{}
137 	bool _isPrimary = false;
138 
139 	Lease[LeaseID] leaseMap;
140 	Heap!LeaseQueue leaseHeap;
141 	Heap!LeaseQueue leaseCheckpointHeap;
142 	LeaseID[LeaseItem] itemMap;
143 
144 	// When a lease expires, the lessor will delete the
145 	// leased range (or key) by the RangeDeleter.
146 	RangeDeleter rd;
147 
148 	// When a lease's deadline should be persisted to preserve the remaining TTL across leader
149 	// elections and restarts, the lessor will checkpoint the lease by the Checkpointer.
150 	Checkpointer cp;
151 
152 	// backend to persist leases. We only persist lease ID and expiry for now.
153 	// The leased items can be recovered by iterating all the keys in kv.
154 	// backend.Backend b;
155 
156 	// minLeaseTTL is the minimum lease TTL that can be granted for a lease. Any
157 	// requests for shorter TTLs are extended to the minimum TTL.
158 	int64 minLeaseTTL;
159 
160 	// expiredC chan []*Lease
161 	// stopC is a channel whose closure indicates that the lessor should be stopped.
162 	// stopC chan struct{}
163 	// doneC is a channel whose closure indicates that the lessor is stopped.
164 	// doneC chan struct{}
165 
166 	// lg *zap.Logger
167 
168 	// Wait duration between lease checkpoints.
169 	long checkpointInterval;
170 
171 	this(int64 ttl, long interval)
172 	{
173 		_mutex = new Mutex();
174 		minLeaseTTL = ttl;
175 		checkpointInterval = interval;
176 	}
177 
178 	// isPrimary indicates if this lessor is the primary lessor. The primary
179 	// lessor manages lease expiration and renew.
180 	//
181 	// in etcd, raft leader is the primary. Thus there might be two primary
182 	// leaders at the same time (raft allows concurrent leader but with different term)
183 	// for at most a leader election timeout.
184 	// The old primary leader cannot affect the correctness since its proposal has a
185 	// smaller term and will not be committed.
186 	//
187 	// TODO: raft follower do not forward lease management proposals. There might be a
188 	// very small window (within second normally which depends on go scheduling) that
189 	// a raft follow is the primary between the raft leader demotion and lessor demotion.
190 	// Usually this should not be a problem. Lease should not be that sensitive to timing.
191 
192 	bool isPrimary()
193 	{
194 		// implementationMissing();
195 		return _isPrimary;
196 		// return this.demotec != null;
197 	}
198 
199 	void SetRangeDeleter(RangeDeleter rd)
200 	{
201 		_mutex.lock();
202 		scope (exit)
203 			_mutex.unlock();
204 
205 		this.rd = rd;
206 	}
207 
208 	void SetCheckpointer(Checkpointer cp)
209 	{
210 		_mutex.lock();
211 		scope (exit)
212 			_mutex.unlock();
213 
214 		this.cp = cp;
215 	}
216 
217 	Lease Grant(LeaseID id, int64 ttl)
218 	{
219 		if (id == NoLease)
220 		{
221 			return null;
222 		}
223 
224 		if (ttl > MaxLeaseTTL)
225 		{
226 			return null;
227 		}
228 
229 		// TODO: when lessor is under high load, it should give out lease
230 		// with longer TTL to reduce renew load.
231 		Lease l = new Lease();
232 		l.ID = id;
233 		l.ttl = ttl; // itemSet: make(map[LeaseItem]struct{}),
234 		// revokec: make(chan struct{}),
235 
236 		_mutex.lock();
237 		scope (exit)
238 			_mutex.unlock();
239 
240 		if (id in this.leaseMap)
241 		{
242 			logWarning("------LeaseID Exists------ : ",id);
243 			return this.leaseMap[id];
244 			// return null;
245 		}
246 
247 		if (l.ttl < this.minLeaseTTL)
248 		{
249 			l.ttl = this.minLeaseTTL;
250 		}
251 
252 		if (this.isPrimary())
253 		{
254 			l.refresh(0);
255 		}
256 		else
257 		{
258 			l.forever();
259 		}
260 
261 		this.leaseMap[id] = l;
262 		LeaseWithTime item = {id:
263 		l.ID, time : l.expiry};
264 		this.leaseHeap.Push(item);
265 		logDebug("---- grant id : ",id," ttl: ",ttl);
266 
267 		// l.persistTo( /* this.b */ );
268 
269 		///@gxc
270 		// leaseTotalTTLs.Observe(float64(l.ttl));
271 		// leaseGranted.Inc();
272 
273 		if (this.isPrimary())
274 		{
275 			// this.scheduleCheckpointIfNeeded(l);
276 		}
277 
278 		return l;
279 	}
280 
281 	string Revoke(LeaseID id)
282 	{
283 		logWarning("revoke Id : ", id);
284 		_mutex.lock();
285 
286 		auto l = (id in this.leaseMap);
287 		if (l == null)
288 		{
289 			this._mutex.unlock();
290 			return ErrLeaseNotFound;
291 		}
292 		// scope (exit)
293 		// 	close(l.revokec);
294 		// unlock before doing external work
295 		this._mutex.unlock();
296 
297 		// if (this.rd is null)
298 		// {
299 		// 	return null;
300 		// }
301 
302 		// auto txn = this.rd();
303 
304 		// sort keys so deletes are in same order among all members,
305 		// otherwise the backened hashes will be different
306 		if(this.isPrimary())
307 		{
308 			auto keys = l.Keys();
309 			// sort.StringSlice(keys).Sort();
310 			foreach (key; keys)
311 			{
312 				// txn.DeleteRange(cast(byte[])(key), null);
313 				logWarning("revoke lease attach's key : ", key);
314 				RpcRequest rreq;
315 				rreq.CMD = RpcReqCommand.DeleteRangeRequest;
316 				rreq.Key = key;
317 				NetonRpcServer.instance.Propose(rreq);
318 			}
319 		}
320 
321 		_mutex.lock();
322 		scope (exit)
323 			_mutex.unlock();
324 		// delete(this.leaseMap, l.ID);
325 		this.leaseMap.remove(l.ID);
326 		// lease deletion needs to be in the same backend transaction with the
327 		// kv deletion. Or we might end up with not executing the revoke or not
328 		// deleting the keys if etcdserver fails in between.
329 		// this.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID)));
330 
331 		// txn.End();
332 
333 		// leaseRevoked.Inc();
334 		return null;
335 	}
336 
337 	string Checkpoint(LeaseID id, int64 remainingTTL)
338 	{
339 		_mutex.lock();
340 		scope (exit)
341 			_mutex.unlock();
342 
343 		auto l = (id in this.leaseMap);
344 		if (l != null)
345 		{
346 			// when checkpointing, we only update the remainingTTL, Promote is responsible for applying this to lease expiry
347 			l.remainingTTL = remainingTTL;
348 			if (this.isPrimary())
349 			{
350 				// schedule the next checkpoint as needed
351 				this.scheduleCheckpointIfNeeded(*l);
352 			}
353 		}
354 		return null;
355 	}
356 
357 	// Renew renews an existing lease. If the given lease does not exist or
358 	// has expired, an error will be returned.
359 	int64 Renew(LeaseID id)
360 	{
361 		_mutex.lock();
362 		scope (exit)
363 			_mutex.unlock();
364 
365 		// if (!this.isPrimary())
366 		// {
367 		// 	// forward renew request to primary instead of returning error.
368 		// 	return -1;
369 		// }
370 
371 		// auto demotec = this.demotec;
372 
373 		auto l = (id in this.leaseMap);
374 		if (l == null)
375 		{
376 			logWarning("not found in leaseMap----");
377 			return -1; /* , ErrLeaseNotFound */
378 		}
379 
380 		if (l.expired())
381 		{
382 			// unlock = func() {}
383 			///@gxc
384 			// select
385 			// {
386 			// 	// A expired lease might be pending for revoking or going through
387 			// 	// quorum to be revoked. To be accurate, renew request must wait for the
388 			// 	// deletion to complete.
389 			// 	case  <  - l.revokec : return  - 1, ErrLeaseNotFound // The expired lease might fail to be revoked if the primary changes.
390 			// 	// The caller will retry on ErrNotPrimary.
391 			// 	case  <  - demotec
392 			// 		: return  - 1, ErrNotPrimarycase <  - this.stopC : return  - 1, ErrNotPrimary
393 			// }
394 			logWarning("lease is expire----");
395 			return -1;
396 		}
397 
398 		// Clear remaining TTL when we renew if it is set
399 		// By applying a RAFT entry only when the remainingTTL is already set, we limit the number
400 		// of RAFT entries written per lease to a max of 2 per checkpoint interval.
401 		if (this.cp != null && l.remainingTTL > 0)
402 		{
403 			LeaseCheckpoint[] cps;
404 			LeaseCheckpoint ckp = new LeaseCheckpoint();
405 			ckp.ID = int64(l.ID);
406 			ckp.remainingTTL = 0;
407 
408 			cps ~= ckp;
409 			LeaseCheckpointRequest req = new LeaseCheckpointRequest();
410 			req.checkpoints ~= ckp;
411 			this.cp( /* context.Background(),  */ req);
412 		}
413 
414 		l.refresh(0);
415 		LeaseWithTime item = {id:
416 		l.ID, time : l.expiry};
417 		this.leaseHeap.Push(item);
418 
419 		// leaseRenewed.Inc();
420 		return l.ttl /* , null */ ;
421 	}
422 
423 	Lease Lookup(LeaseID id)
424 	{
425 		this._mutex.lock();
426 		scope (exit)
427 			this._mutex.unlock();
428 		auto l = (id in this.leaseMap);
429 		return *l;
430 	}
431 
432 	Lease[] unsafeLeases()
433 	{
434 		Lease[] leases;
435 		foreach (LeaseID k, Lease v; this.leaseMap)
436 		{
437 			leases ~= v;
438 		}
439 		return leases;
440 	}
441 
442 	Lease[] Leases()
443 	{
444 		this._mutex.lock();
445 		auto ls = this.unsafeLeases();
446 		this._mutex.unlock();
447 		// sort.Sort(leasesByExpiry(ls));
448 		return ls;
449 	}
450 
451 	void Promote(long extend)
452 	{
453 		_mutex.lock();
454 		scope (exit)
455 			_mutex.unlock();
456 
457 		_isPrimary = true;
458 		// this.demotec = make(chan struct
459 		// 	{
460 		// });
461 
462 		// refresh the expiries of all leases.
463 		foreach (LeaseID k, Lease l; this.leaseMap)
464 		{
465 			l.refresh(extend);
466 			LeaseWithTime item = {id:
467 			l.ID, time : l.expiry};
468 			this.leaseHeap.Push(item);
469 		}
470 
471 		if ((this.leaseMap.length) < leaseRevokeRate)
472 		{
473 			// no possibility of lease pile-up
474 			return;
475 		}
476 
477 		// adjust expiries in case of overlap
478 		auto leases = this.unsafeLeases();
479 		// sort.Sort(leasesByExpiry(leases));
480 
481 		auto baseWindow = leases[0].Remaining();
482 		auto nextWindow = baseWindow + 1 /* time.Second */ ;
483 		auto expires = 0;
484 		// have fewer expires than the total revoke rate so piled up leases
485 		// don't consume the entire revoke limit
486 		auto targetExpiresPerSecond = (3 * leaseRevokeRate) / 4;
487 		foreach (l; leases)
488 		{
489 			auto remaining = l.Remaining();
490 			if (remaining > nextWindow)
491 			{
492 				baseWindow = remaining;
493 				nextWindow = baseWindow + 1 /* time.Second */ ;
494 				expires = 1;
495 				continue;
496 			}
497 			expires++;
498 			if (expires <= targetExpiresPerSecond)
499 			{
500 				continue;
501 			}
502 			auto rateDelay = float64(1) * (float64(expires) / float64(targetExpiresPerSecond));
503 			// If leases are extended by n seconds, leases n seconds ahead of the
504 			// base window should be extended by only one second.
505 			rateDelay -= float64(remaining - baseWindow);
506 			auto delay = (rateDelay);
507 			nextWindow = baseWindow + delay;
508 			l.refresh(delay + extend);
509 			LeaseWithTime item = {id:
510 			l.ID, time : l.expiry};
511 			this.leaseHeap.Push(item);
512 			this.scheduleCheckpointIfNeeded(l);
513 		}
514 	}
515 
516 	void Demote()
517 	{
518 		_mutex.lock();
519 		scope (exit)
520 			_mutex.unlock();
521 
522 		// set the expiries of all leases to forever
523 		foreach (LeaseID k, Lease l; this.leaseMap)
524 		{
525 			l.forever();
526 		}
527 
528 		this.clearScheduledLeasesCheckpoints();
529 
530 		_isPrimary = false;
531 		// if (this.demotec != null)
532 		// {
533 		// 	close(this.demotec);
534 		// 	this.demotec = null;
535 		// }
536 	}
537 
538 	// Attach attaches items to the lease with given ID. When the lease
539 	// expires, the attached items will be automatically removed.
540 	// If the given lease does not exist, an error will be returned.
541 	string Attach(LeaseID id, LeaseItem[] items)
542 	{
543 		_mutex.lock();
544 		scope (exit)
545 			_mutex.unlock();
546 
547 		auto l = (id in this.leaseMap);
548 		if (l == null)
549 		{
550 			return ErrLeaseNotFound;
551 		}
552 
553 		l._mutex.writer().lock();
554 		foreach (it; items)
555 		{
556 			l.itemSet.add(it.Key);
557 			this.itemMap[it] = id;
558 		}
559 		l._mutex.writer().unlock();
560 		return null;
561 	}
562 
563 	LeaseID GetLease(LeaseItem item)
564 	{
565 		this._mutex.lock();
566 		auto id = this.itemMap[item];
567 		this._mutex.unlock();
568 		return id;
569 	}
570 
571 	// Detach detaches items from the lease with given ID.
572 	// If the given lease does not exist, an error will be returned.
573 	string Detach(LeaseID id, LeaseItem[] items)
574 	{
575 		_mutex.lock();
576 		scope (exit)
577 			_mutex.unlock();
578 
579 		auto l = (id in this.leaseMap);
580 		if (l == null)
581 		{
582 			return ErrLeaseNotFound;
583 		}
584 
585 		l._mutex.writer().lock();
586 		foreach (it; items)
587 		{
588 			// delete(l.itemSet, it);
589 			l.itemSet.remove(it.Key);
590 			// delete(this.itemMap, it);
591 			this.itemMap.remove(it);
592 		}
593 		l._mutex.writer().unlock();
594 		return null;
595 	}
596 
597 	void Recover( /* backend.Backend b, */ RangeDeleter rd)
598 	{
599 		_mutex.lock();
600 		scope (exit)
601 			_mutex.unlock();
602 
603 		// this.b = b;
604 		this.rd = rd;
605 		this.leaseMap.clear;
606 		this.itemMap.clear;
607 		this.initAndRecover();
608 	}
609 
610 	//  ExpiredLeasesC() <-chan []*Lease {
611 	// 	return this.expiredC;
612 	// }
613 
614 	//  void Stop() {
615 	// 	close(this.stopC)
616 	// 	<-this.doneC
617 	// }
618 
619 	void runLoop()
620 	{
621 		// if(this.leaseHeap.length() > 0)
622 		// {
623 		// 	auto item = this.leaseHeap.get!LeaseWithTime(0);
624 			// logWarning("begin lessor check... : ");
625 		// }
626 		try
627 		{
628 			this.revokeExpiredLeases();
629 			// this.checkpointScheduledLeases();
630 		}
631 		catch (Throwable e)
632 		{
633 			logError("lease runloop error : ", e.msg);
634 		}
635 
636 	}
637 
638 	// revokeExpiredLeases finds all leases past their expiry and sends them to epxired channel for
639 	// to be revoked.
640 	void revokeExpiredLeases()
641 	{
642 		Lease[] ls;
643 
644 		// rate limit
645 		auto revokeLimit = leaseRevokeRate / 2;
646 
647 		this._mutex.lock();
648 		if (this.isPrimary())
649 		{
650 			ls = this.findExpiredLeases(revokeLimit);
651 		}
652 		this._mutex.unlock();
653 
654 		if ((ls.length) != 0)
655 		{
656 			new Thread(() {
657 				logWarning("TO DO Hanle expired Leases's len : ", ls.length);
658 				foreach (l; ls)
659 				{
660 					logWarning("TO DO Hanle expired Lease ID : ", l.ID);
661 					RpcRequest rreq;
662 					rreq.CMD = RpcReqCommand.LeaseRevokeRequest;
663 					rreq.LeaseID = l.ID;
664 					NetonRpcServer.instance().Propose(rreq);
665 				}
666 			}).start();
667 		}
668 	}
669 
670 	// checkpointScheduledLeases finds all scheduled lease checkpoints that are due and
671 	// submits them to the checkpointer to persist them to the consensus log.
672 	void checkpointScheduledLeases()
673 	{
674 		LeaseCheckpoint[] cps;
675 
676 		// rate limit
677 		for (int i = 0; i < leaseCheckpointRate / 2; i++)
678 		{
679 			_mutex.lock();
680 			if (this.isPrimary())
681 			{
682 				cps = this.findDueScheduledCheckpoints(maxLeaseCheckpointBatchSize);
683 			}
684 			this._mutex.unlock();
685 
686 			if ((cps.length) != 0)
687 			{
688 				LeaseCheckpointRequest req = new LeaseCheckpointRequest();
689 				req.checkpoints = cps;
690 				if (this.cp != null)
691 					this.cp(req);
692 			}
693 			if ((cps.length) < maxLeaseCheckpointBatchSize)
694 			{
695 				return;
696 			}
697 		}
698 	}
699 
700 	void clearScheduledLeasesCheckpoints()
701 	{
702 		this.leaseCheckpointHeap.clear();
703 	}
704 
705 	// expireExists returns true if expiry items exist.
706 	// It pops only when expiry item exists.
707 	// "next" is true, to indicate that it may exist in next attempt.
708 	Tuple!(Lease, bool, bool) expireExists()
709 	{
710 		Tuple!(Lease, bool, bool) result;
711 		result[0] = null;
712 		result[1] = false;
713 		result[2] = false;
714 		if (this.leaseHeap.length() == 0)
715 		{
716 			result[0] = null;
717 			result[1] = false;
718 			result[2] = false;
719 			return result;
720 		}
721 
722 
723 		auto item = this.leaseHeap.get!LeaseWithTime(0);
724 		// logInfo("---item id : ",item.id," time :",item.time);
725 
726 		auto exsit = (item.id in this.leaseMap);
727 		if (exsit == null)
728 		{
729 			this.leaseHeap.Pop!LeaseWithTime(); // O(log N)
730 			return result;
731 		}	
732 		result[0] = *exsit;
733 		if (result[0] is null)
734 		{
735 
736 			// lease has expired or been revoked
737 			// no need to revoke (nothing is expiry)
738 			this.leaseHeap.Pop!LeaseWithTime(); // O(log N)
739 			result[1] = false;
740 			result[2] = true;
741 			return result;
742 		}
743 
744 		if (time() < item.time) /* expiration time */
745 		{
746 			// Candidate expirations are caught up, reinsert this item
747 			// and no need to revoke (nothing is expiry)
748 			result[1] = false;
749 			result[2] = false;
750 			return result;
751 		}
752 		// if the lease is actually expired, add to the removal list. If it is not expired, we can ignore it because another entry will have been inserted into the heap
753 
754 		this.leaseHeap.Pop!LeaseWithTime(); // O(log N)
755 		result[1] = true;
756 		result[2] = false;
757 		return result;
758 	}
759 
760 	// findExpiredLeases loops leases in the leaseMap until reaching expired limit
761 	// and returns the expired leases that needed to be revoked.
762 	Lease[] findExpiredLeases(int limit)
763 	{
764 		Lease[] leases;
765 
766 		while (1)
767 		{
768 			Tuple!(Lease, bool, bool) result = this.expireExists();
769 			Lease l = result[0];
770 			bool ok = result[1];
771 			bool next = result[2];
772 			if (!ok && !next)
773 			{
774 				break;
775 			}
776 			if (!ok)
777 			{
778 				continue;
779 			}
780 			if (next)
781 			{
782 				continue;
783 			}
784 
785 			if ((l !is null) && l.expired())
786 			{
787 				leases ~= l;
788 
789 				// reach expired limit
790 				if ((leases.length) == limit)
791 				{
792 					break;
793 				}
794 			}
795 		}
796 
797 		return leases;
798 	}
799 
800 	void scheduleCheckpointIfNeeded(Lease lease)
801 	{
802 		if (this.cp is null)
803 		{
804 			return;
805 		}
806 
807 		if (lease.RemainingTTL() > int64(this.checkpointInterval /* .Seconds() */ ))
808 		{
809 			// if (this.lg != null) {
810 			// 	this.lg.Debug("Scheduling lease checkpoint",
811 			// 		zap.Int64("leaseID", int64(lease.ID)),
812 			// 		zap.Duration("intervalSeconds", this.checkpointInterval),
813 			// 	);
814 			// }
815 			LeaseWithTime item = {
816 			id:
817 				lease.ID, time : time() + (this.checkpointInterval) /* .UnixNano() */
818 
819 			};
820 			this.leaseCheckpointHeap.Push(item);
821 		}
822 	}
823 
824 	LeaseCheckpoint[] findDueScheduledCheckpoints(int checkpointLimit)
825 	{
826 		if (this.cp is null)
827 		{
828 			return null;
829 		}
830 
831 		auto now = time();
832 		LeaseCheckpoint[] cps;
833 		while (this.leaseCheckpointHeap.length() > 0 && (cps.length) < checkpointLimit)
834 		{
835 			auto lt = this.leaseCheckpointHeap.get!LeaseWithTime(0);
836 			if (lt.time > now /* .UnixNano() */ )
837 			{
838 				return cps;
839 			}
840 			this.leaseCheckpointHeap.Pop!LeaseWithTime();
841 			auto l = (lt.id in this.leaseMap);
842 			bool ok;
843 			if (l == null)
844 			{
845 				continue;
846 			}
847 			if (!(now < (l.expiry)))
848 			{
849 				continue;
850 			}
851 			auto remainingTTL = int64((l.expiry - (now) /* .Seconds() */ ));
852 			if (remainingTTL >= l.ttl)
853 			{
854 				continue;
855 			}
856 			// if (this.lg != null)
857 			{
858 				logDebug("Checkpointing lease --", "leaseID : ", lt.id,
859 						"remainingTTL : ", remainingTTL);
860 			}
861 			LeaseCheckpoint item = new LeaseCheckpoint();
862 			item.ID = int64(lt.id);
863 			item.remainingTTL = remainingTTL;
864 			cps ~= item;
865 		}
866 		return cps;
867 	}
868 
869 	void init(Lease l)
870 	{
871 		logDebug(" -------****------> init add lease : ", l.ID);
872 		this.leaseMap[l.ID] = l;
873 	}
874 
875 	void initAndRecover()
876 	{
877 		///@gxc
878 		implementationMissing();
879 		// 	auto tx = this.b.BatchTx();
880 		// 	tx.Lock();
881 
882 		// 	tx.UnsafeCreateBucket(leaseBucketName);
883 		// 	_, vs :  = tx.UnsafeRange(leaseBucketName, int64ToBytes(0), int64ToBytes(math.MaxInt64), 0);
884 		// 	// TODO: copy vs and do decoding outside tx lock if lock contention becomes an issue.
885 		// 	for (int i = 0; i < vs.length; i++)
886 		// 	{
887 		// 		leasepb.Lease lpb;
888 		// 		auto err = lpb.Unmarshal(vs[i]);
889 		// 		if (err != null)
890 		// 		{
891 		// 			tx.Unlock();
892 		// 			panic("failed to unmarshal lease proto item");
893 		// 		}
894 		// 		auto ID = LeaseID(lpb.ID);
895 		// 		if (lpb.TTL < this.minLeaseTTL)
896 		// 		{
897 		// 			lpb.TTL = this.minLeaseTTL;
898 		// 		}
899 		// 		this.leaseMap[ID] = Lease
900 		// 		{
901 		// 		ID:
902 		// 			ID, ttl : lpb.TTL, // itemSet will be filled in when recover key-value pairs
903 		// 				// set expiry to forever, refresh when promoted
904 		// 		itemSet : make(map[LeaseItem]struct
905 		// 					{
906 		// 				}
907 		// ), expiry : forever, revokec : make(chan struct
908 		// 				{
909 		// 			}),
910 		// 		};
911 		// 	}
912 		// this.leaseHeap.Init();
913 		// this.leaseCheckpointHeap.Init();
914 		// tx.Unlock();
915 
916 		// this.b.ForceCommit();
917 	}
918 
919 }
920 
921 struct LessorConfig
922 {
923 	int64 MinLeaseTTL;
924 	long CheckpointInterval;
925 }
926 
927 Lessor NewLessor(LessorConfig cfg)
928 {
929 	return newLessor(cfg);
930 }
931 
932 Lessor newLessor(LessorConfig cfg)
933 {
934 	auto checkpointInterval = cfg.CheckpointInterval;
935 	if (checkpointInterval == 0)
936 	{
937 		checkpointInterval = 5 * 60;
938 	}
939 	// l := &lessor{
940 	// 	leaseMap:            make(map[LeaseID]*Lease),
941 	// 	itemMap:             make(map[LeaseItem]LeaseID),
942 	// 	leaseHeap:           make(LeaseQueue, 0),
943 	// 	leaseCheckpointHeap: make(LeaseQueue, 0),
944 	// 	b:                   b,
945 	// 	minLeaseTTL:         cfg.MinLeaseTTL,
946 	// 	checkpointInterval:  checkpointInterval,
947 	// 	// expiredC is a small buffered chan to avoid unnecessary blocking.
948 	// 	expiredC: make(chan []*Lease, 16),
949 	// 	stopC:    make(chan struct{}),
950 	// 	doneC:    make(chan struct{}),
951 	// 	lg:       lg,
952 	// }
953 	auto l = new Lessor(cfg.MinLeaseTTL, checkpointInterval);
954 	// l.initAndRecover(); // go l.runLoop()
955 
956 	return l;
957 }
958 
959 alias leasesByExpiry = Lease[];
960 
961 int Len(leasesByExpiry le)
962 {
963 	return cast(int)(le.length);
964 }
965 
966 bool Less(leasesByExpiry le, int i, int j)
967 {
968 	return le[i].Remaining() < le[j].Remaining();
969 }
970 
971 void Swap(leasesByExpiry le, int i, int j)
972 {
973 	auto temp = le[i];
974 	le[i] = le[j];
975 	le[j] = temp;
976 }
977 
978 class Lease
979 {
980 	LeaseID ID;
981 	int64 ttl; // time to live of the lease in seconds
982 	int64 remainingTTL; // remaining time to live in seconds, if zero valued it is considered unset and the full ttl should be used
983 	// expiryMu protects concurrent accesses to expiry
984 	Mutex expiryMu;
985 	// expiry is time when lease should expire. no expiration when expiry.IsZero() is true
986 	long expiry;
987 
988 	// mu protects concurrent accesses to itemSet
989 	ReadWriteMutex _mutex;
990 	HashSet!string itemSet;
991 	// revokec chan struct{}
992 
993 	this()
994 	{
995 		itemSet = new HashSet!string();
996 		expiryMu = new Mutex();
997 		_mutex = new ReadWriteMutex();
998 	}
999 
1000 	bool expired()
1001 	{
1002 		return this.Remaining() <= 0;
1003 	}
1004 
1005 	void persistTo( /* backend.Backend b */ )
1006 	{
1007 		implementationMissing();
1008 		// auto key = int64ToBytes(int64(this.ID));
1009 
1010 		// auto lpb = new leasepb.Lease();
1011 		// lpb.ID = int64(this.ID);
1012 		// lpb.TTL = this.ttl;
1013 		// lpb.RemainingTTL = this.remainingTTL;
1014 
1015 		// auto val = lpb.Marshal();
1016 		// if (val is null)
1017 		// {
1018 		// 	panic("failed to marshal lease proto item");
1019 		// }
1020 
1021 		// b.BatchTx().Lock();
1022 		// b.BatchTx().UnsafePut(leaseBucketName, key, val);
1023 		// b.BatchTx().Unlock();
1024 	}
1025 
1026 	// TTL returns the TTL of the Lease.
1027 	int64 TTL()
1028 	{
1029 		return this.ttl;
1030 	}
1031 
1032 	// RemainingTTL returns the last checkpointed remaining TTL of the lease.
1033 	// TODO(jpbetz): do not expose this utility method
1034 	int64 RemainingTTL()
1035 	{
1036 		if (this.remainingTTL > 0)
1037 		{
1038 			return this.remainingTTL;
1039 		}
1040 		return this.ttl;
1041 	}
1042 
1043 	// refresh refreshes the expiry of the lease.
1044 	void refresh(long extend)
1045 	{
1046 		auto newExpiry = time() + (extend + (this.RemainingTTL()) * 1);
1047 		this.expiryMu.lock();
1048 		scope (exit)
1049 			this.expiryMu.unlock();
1050 		this.expiry = newExpiry;
1051 	}
1052 
1053 	// forever sets the expiry of lease to be forever.
1054 	void forever()
1055 	{
1056 		this.expiryMu.lock();
1057 		scope (exit)
1058 			this.expiryMu.unlock();
1059 		this.expiry = FOREVER;
1060 	}
1061 
1062 	// Keys returns all the keys attached to the lease.
1063 	string[] Keys()
1064 	{
1065 		this._mutex.reader.lock();
1066 		string[] keys;
1067 		foreach (k; this.itemSet)
1068 		{
1069 			keys ~= k;
1070 		}
1071 		this._mutex.reader.unlock();
1072 		return keys;
1073 	}
1074 
1075 	// Remaining returns the remaining time of the lease.
1076 	long Remaining()
1077 	{
1078 		this.expiryMu.lock();
1079 		scope (exit)
1080 			this.expiryMu.unlock();
1081 		if (this.expiry == long.init)
1082 		{
1083 			return long.max;
1084 		}
1085 		return  /* time.Until */ (this.expiry - time());
1086 	}
1087 
1088 }
1089 
1090 struct LeaseItem
1091 {
1092 	string Key;
1093 }
1094 
1095 byte[] int64ToBytes(int64 n)
1096 {
1097 	// bytes := make([]byte, 8)
1098 	// binary.BigEndian.PutUint64(bytes, uint64(n))
1099 	byte[] bytes;
1100 	auto d = nativeToBigEndian(n);
1101 	foreach (b; d)
1102 	{
1103 		bytes ~= cast(byte) b;
1104 	}
1105 	return bytes;
1106 }
1107 
1108 // FakeLessor is a fake implementation of Lessor interface.
1109 // Used for testing only.
1110 // type FakeLessor struct{}
1111 
1112 // func (fl *FakeLessor) SetRangeDeleter(dr RangeDeleter) {}
1113 
1114 // func (fl *FakeLessor) SetCheckpointer(cp Checkpointer) {}
1115 
1116 // func (fl *FakeLessor) Grant(id LeaseID, ttl int64) (*Lease, error) { return null, null }
1117 
1118 // func (fl *FakeLessor) Revoke(id LeaseID) error { return null }
1119 
1120 // func (fl *FakeLessor) Checkpoint(id LeaseID, remainingTTL int64) error { return null }
1121 
1122 // func (fl *FakeLessor) Attach(id LeaseID, items []LeaseItem) error { return null }
1123 
1124 // func (fl *FakeLessor) GetLease(item LeaseItem) LeaseID            { return 0 }
1125 // func (fl *FakeLessor) Detach(id LeaseID, items []LeaseItem) error { return null }
1126 
1127 // func (fl *FakeLessor) Promote(extend time.Duration) {}
1128 
1129 // func (fl *FakeLessor) Demote() {}
1130 
1131 // func (fl *FakeLessor) Renew(id LeaseID) (int64, error) { return 10, null }
1132 
1133 // func (fl *FakeLessor) Lookup(id LeaseID) *Lease { return null }
1134 
1135 // func (fl *FakeLessor) Leases() []*Lease { return null }
1136 
1137 // func (fl *FakeLessor) ExpiredLeasesC() <-chan []*Lease { return null }
1138 
1139 // func (fl *FakeLessor) Recover(b backend.Backend, rd RangeDeleter) {}
1140 
1141 // func (fl *FakeLessor) Stop() {}