1 module neton.lease.Lessor; 2 3 import std.typecons; 4 import std.math; 5 import core.sync.mutex; 6 import core.sync.rwmutex; 7 import std.bitmanip; 8 import core.thread; 9 10 import hunt.util.DateTime; 11 import hunt.logging; 12 import hunt.Exceptions; 13 import hunt.collection; 14 15 import neton.protocol.neton; 16 import neton.server.NetonRpcServer; 17 import neton.rpcservice.Command; 18 19 import neton.lease.Heap; 20 import neton.lease.LeaseQueue; 21 22 alias int64 = long; 23 alias LeaseID = long; 24 alias float64 = long; 25 26 // NoLease is a special LeaseID representing the absence of a lease. 27 const LeaseID NoLease = LeaseID(0); 28 29 // MaxLeaseTTL is the maximum lease TTL value 30 const long MaxLeaseTTL = 9000000000; 31 32 enum long FOREVER = long.max; 33 34 enum string leaseBucketName = "lease"; 35 36 // maximum number of leases to revoke per second; configurable for tests 37 enum int leaseRevokeRate = 1000; 38 39 // maximum number of lease checkpoints recorded to the consensus log per second; configurable for tests 40 enum int leaseCheckpointRate = 1000; 41 42 // maximum number of lease checkpoints to batch into a single consensus log entry 43 enum int maxLeaseCheckpointBatchSize = 1000; 44 45 enum string ErrNotPrimary = "not a primary lessor"; 46 enum string ErrLeaseNotFound = "lease not found"; 47 enum string ErrLeaseExists = "lease already exists"; 48 enum string ErrLeaseTTLTooLarge = "too large lease TTL"; 49 50 // TxnDelete is a TxnWrite that only permits deletes. Defined here 51 // to avoid circular dependency with mvcc. 52 interface TxnDelete 53 { 54 Tuple!(int64, int64) DeleteRange(byte[] key, byte[] end); 55 void End(); 56 } 57 58 // RangeDeleter is a TxnDelete constructor. 59 alias RangeDeleter = TxnDelete delegate(); 60 // type RangeDeleter func() TxnDelete 61 62 // Checkpointer permits checkpointing of lease remaining TTLs to the consensus log. Defined here to 63 // avoid circular dependency with mvcc. 64 alias Checkpointer = void delegate( /* context.Context ctx, */ LeaseCheckpointRequest lc); 65 // type Checkpointer func(ctx , lc *) 66 67 // Lessor owns leases. It can grant, revoke, renew and modify leases for lessee. 68 // type Lessor interface { 69 // // SetRangeDeleter lets the lessor create TxnDeletes to the store. 70 // // Lessor deletes the items in the revoked or expired lease by creating 71 // // new TxnDeletes. 72 // SetRangeDeleter(rd RangeDeleter) 73 74 // SetCheckpointer(cp Checkpointer) 75 76 // // Grant grants a lease that expires at least after TTL seconds. 77 // Grant(id LeaseID, ttl int64) (*Lease, error) 78 // // Revoke revokes a lease with given ID. The item attached to the 79 // // given lease will be removed. If the ID does not exist, an error 80 // // will be returned. 81 // Revoke(id LeaseID) error 82 83 // // Checkpoint applies the remainingTTL of a lease. The remainingTTL is used in Promote to set 84 // // the expiry of leases to less than the full TTL when possible. 85 // Checkpoint(id LeaseID, remainingTTL int64) error 86 87 // // Attach attaches given leaseItem to the lease with given LeaseID. 88 // // If the lease does not exist, an error will be returned. 89 // Attach(id LeaseID, items []LeaseItem) error 90 91 // // GetLease returns LeaseID for given item. 92 // // If no lease found, NoLease value will be returned. 93 // GetLease(item LeaseItem) LeaseID 94 95 // // Detach detaches given leaseItem from the lease with given LeaseID. 96 // // If the lease does not exist, an error will be returned. 97 // Detach(id LeaseID, items []LeaseItem) error 98 99 // // Promote promotes the lessor to be the primary lessor. Primary lessor manages 100 // // the expiration and renew of leases. 101 // // Newly promoted lessor renew the TTL of all lease to extend + previous TTL. 102 // Promote(extend time.Duration) 103 104 // // Demote demotes the lessor from being the primary lessor. 105 // Demote() 106 107 // // Renew renews a lease with given ID. It returns the renewed TTL. If the ID does not exist, 108 // // an error will be returned. 109 // Renew(id LeaseID) (int64, error) 110 111 // // Lookup gives the lease at a given lease id, if any 112 // Lookup(id LeaseID) *Lease 113 114 // // Leases lists all leases. 115 // Leases() []*Lease 116 117 // // ExpiredLeasesC returns a chan that is used to receive expired leases. 118 // ExpiredLeasesC() <-chan []*Lease 119 120 // // Recover recovers the lessor state from the given backend and RangeDeleter. 121 // Recover(b backend.Backend, rd RangeDeleter) 122 123 // // Stop stops the lessor for managing leases. The behavior of calling Stop multiple 124 // // times is undefined. 125 // Stop() 126 // } 127 128 // lessor implements Lessor interface. 129 // TODO: use clockwork for testability. 130 class Lessor 131 { 132 // mu sync.RWMutex 133 Mutex _mutex; 134 // demotec is set when the lessor is the primary. 135 // demotec will be closed if the lessor is demoted. 136 // demotec chan struct{} 137 bool _isPrimary = false; 138 139 Lease[LeaseID] leaseMap; 140 Heap!LeaseQueue leaseHeap; 141 Heap!LeaseQueue leaseCheckpointHeap; 142 LeaseID[LeaseItem] itemMap; 143 144 // When a lease expires, the lessor will delete the 145 // leased range (or key) by the RangeDeleter. 146 RangeDeleter rd; 147 148 // When a lease's deadline should be persisted to preserve the remaining TTL across leader 149 // elections and restarts, the lessor will checkpoint the lease by the Checkpointer. 150 Checkpointer cp; 151 152 // backend to persist leases. We only persist lease ID and expiry for now. 153 // The leased items can be recovered by iterating all the keys in kv. 154 // backend.Backend b; 155 156 // minLeaseTTL is the minimum lease TTL that can be granted for a lease. Any 157 // requests for shorter TTLs are extended to the minimum TTL. 158 int64 minLeaseTTL; 159 160 // expiredC chan []*Lease 161 // stopC is a channel whose closure indicates that the lessor should be stopped. 162 // stopC chan struct{} 163 // doneC is a channel whose closure indicates that the lessor is stopped. 164 // doneC chan struct{} 165 166 // lg *zap.Logger 167 168 // Wait duration between lease checkpoints. 169 long checkpointInterval; 170 171 this(int64 ttl, long interval) 172 { 173 _mutex = new Mutex(); 174 minLeaseTTL = ttl; 175 checkpointInterval = interval; 176 } 177 178 // isPrimary indicates if this lessor is the primary lessor. The primary 179 // lessor manages lease expiration and renew. 180 // 181 // in etcd, raft leader is the primary. Thus there might be two primary 182 // leaders at the same time (raft allows concurrent leader but with different term) 183 // for at most a leader election timeout. 184 // The old primary leader cannot affect the correctness since its proposal has a 185 // smaller term and will not be committed. 186 // 187 // TODO: raft follower do not forward lease management proposals. There might be a 188 // very small window (within second normally which depends on go scheduling) that 189 // a raft follow is the primary between the raft leader demotion and lessor demotion. 190 // Usually this should not be a problem. Lease should not be that sensitive to timing. 191 192 bool isPrimary() 193 { 194 // implementationMissing(); 195 return _isPrimary; 196 // return this.demotec != null; 197 } 198 199 void SetRangeDeleter(RangeDeleter rd) 200 { 201 _mutex.lock(); 202 scope (exit) 203 _mutex.unlock(); 204 205 this.rd = rd; 206 } 207 208 void SetCheckpointer(Checkpointer cp) 209 { 210 _mutex.lock(); 211 scope (exit) 212 _mutex.unlock(); 213 214 this.cp = cp; 215 } 216 217 Lease Grant(LeaseID id, int64 ttl) 218 { 219 if (id == NoLease) 220 { 221 return null; 222 } 223 224 if (ttl > MaxLeaseTTL) 225 { 226 return null; 227 } 228 229 // TODO: when lessor is under high load, it should give out lease 230 // with longer TTL to reduce renew load. 231 Lease l = new Lease(); 232 l.ID = id; 233 l.ttl = ttl; // itemSet: make(map[LeaseItem]struct{}), 234 // revokec: make(chan struct{}), 235 236 _mutex.lock(); 237 scope (exit) 238 _mutex.unlock(); 239 240 if (id in this.leaseMap) 241 { 242 logWarning("------LeaseID Exists------ : ",id); 243 return this.leaseMap[id]; 244 // return null; 245 } 246 247 if (l.ttl < this.minLeaseTTL) 248 { 249 l.ttl = this.minLeaseTTL; 250 } 251 252 if (this.isPrimary()) 253 { 254 l.refresh(0); 255 } 256 else 257 { 258 l.forever(); 259 } 260 261 this.leaseMap[id] = l; 262 LeaseWithTime item = {id: 263 l.ID, time : l.expiry}; 264 this.leaseHeap.Push(item); 265 logDebug("---- grant id : ",id," ttl: ",ttl); 266 267 // l.persistTo( /* this.b */ ); 268 269 ///@gxc 270 // leaseTotalTTLs.Observe(float64(l.ttl)); 271 // leaseGranted.Inc(); 272 273 if (this.isPrimary()) 274 { 275 // this.scheduleCheckpointIfNeeded(l); 276 } 277 278 return l; 279 } 280 281 string Revoke(LeaseID id) 282 { 283 logWarning("revoke Id : ", id); 284 _mutex.lock(); 285 286 auto l = (id in this.leaseMap); 287 if (l == null) 288 { 289 this._mutex.unlock(); 290 return ErrLeaseNotFound; 291 } 292 // scope (exit) 293 // close(l.revokec); 294 // unlock before doing external work 295 this._mutex.unlock(); 296 297 // if (this.rd is null) 298 // { 299 // return null; 300 // } 301 302 // auto txn = this.rd(); 303 304 // sort keys so deletes are in same order among all members, 305 // otherwise the backened hashes will be different 306 if(this.isPrimary()) 307 { 308 auto keys = l.Keys(); 309 // sort.StringSlice(keys).Sort(); 310 foreach (key; keys) 311 { 312 // txn.DeleteRange(cast(byte[])(key), null); 313 logWarning("revoke lease attach's key : ", key); 314 RpcRequest rreq; 315 rreq.CMD = RpcReqCommand.DeleteRangeRequest; 316 rreq.Key = key; 317 NetonRpcServer.instance.Propose(rreq); 318 } 319 } 320 321 _mutex.lock(); 322 scope (exit) 323 _mutex.unlock(); 324 // delete(this.leaseMap, l.ID); 325 this.leaseMap.remove(l.ID); 326 // lease deletion needs to be in the same backend transaction with the 327 // kv deletion. Or we might end up with not executing the revoke or not 328 // deleting the keys if etcdserver fails in between. 329 // this.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID))); 330 331 // txn.End(); 332 333 // leaseRevoked.Inc(); 334 return null; 335 } 336 337 string Checkpoint(LeaseID id, int64 remainingTTL) 338 { 339 _mutex.lock(); 340 scope (exit) 341 _mutex.unlock(); 342 343 auto l = (id in this.leaseMap); 344 if (l != null) 345 { 346 // when checkpointing, we only update the remainingTTL, Promote is responsible for applying this to lease expiry 347 l.remainingTTL = remainingTTL; 348 if (this.isPrimary()) 349 { 350 // schedule the next checkpoint as needed 351 this.scheduleCheckpointIfNeeded(*l); 352 } 353 } 354 return null; 355 } 356 357 // Renew renews an existing lease. If the given lease does not exist or 358 // has expired, an error will be returned. 359 int64 Renew(LeaseID id) 360 { 361 _mutex.lock(); 362 scope (exit) 363 _mutex.unlock(); 364 365 // if (!this.isPrimary()) 366 // { 367 // // forward renew request to primary instead of returning error. 368 // return -1; 369 // } 370 371 // auto demotec = this.demotec; 372 373 auto l = (id in this.leaseMap); 374 if (l == null) 375 { 376 logWarning("not found in leaseMap----"); 377 return -1; /* , ErrLeaseNotFound */ 378 } 379 380 if (l.expired()) 381 { 382 // unlock = func() {} 383 ///@gxc 384 // select 385 // { 386 // // A expired lease might be pending for revoking or going through 387 // // quorum to be revoked. To be accurate, renew request must wait for the 388 // // deletion to complete. 389 // case < - l.revokec : return - 1, ErrLeaseNotFound // The expired lease might fail to be revoked if the primary changes. 390 // // The caller will retry on ErrNotPrimary. 391 // case < - demotec 392 // : return - 1, ErrNotPrimarycase < - this.stopC : return - 1, ErrNotPrimary 393 // } 394 logWarning("lease is expire----"); 395 return -1; 396 } 397 398 // Clear remaining TTL when we renew if it is set 399 // By applying a RAFT entry only when the remainingTTL is already set, we limit the number 400 // of RAFT entries written per lease to a max of 2 per checkpoint interval. 401 if (this.cp != null && l.remainingTTL > 0) 402 { 403 LeaseCheckpoint[] cps; 404 LeaseCheckpoint ckp = new LeaseCheckpoint(); 405 ckp.ID = int64(l.ID); 406 ckp.remainingTTL = 0; 407 408 cps ~= ckp; 409 LeaseCheckpointRequest req = new LeaseCheckpointRequest(); 410 req.checkpoints ~= ckp; 411 this.cp( /* context.Background(), */ req); 412 } 413 414 l.refresh(0); 415 LeaseWithTime item = {id: 416 l.ID, time : l.expiry}; 417 this.leaseHeap.Push(item); 418 419 // leaseRenewed.Inc(); 420 return l.ttl /* , null */ ; 421 } 422 423 Lease Lookup(LeaseID id) 424 { 425 this._mutex.lock(); 426 scope (exit) 427 this._mutex.unlock(); 428 auto l = (id in this.leaseMap); 429 return *l; 430 } 431 432 Lease[] unsafeLeases() 433 { 434 Lease[] leases; 435 foreach (LeaseID k, Lease v; this.leaseMap) 436 { 437 leases ~= v; 438 } 439 return leases; 440 } 441 442 Lease[] Leases() 443 { 444 this._mutex.lock(); 445 auto ls = this.unsafeLeases(); 446 this._mutex.unlock(); 447 // sort.Sort(leasesByExpiry(ls)); 448 return ls; 449 } 450 451 void Promote(long extend) 452 { 453 _mutex.lock(); 454 scope (exit) 455 _mutex.unlock(); 456 457 _isPrimary = true; 458 // this.demotec = make(chan struct 459 // { 460 // }); 461 462 // refresh the expiries of all leases. 463 foreach (LeaseID k, Lease l; this.leaseMap) 464 { 465 l.refresh(extend); 466 LeaseWithTime item = {id: 467 l.ID, time : l.expiry}; 468 this.leaseHeap.Push(item); 469 } 470 471 if ((this.leaseMap.length) < leaseRevokeRate) 472 { 473 // no possibility of lease pile-up 474 return; 475 } 476 477 // adjust expiries in case of overlap 478 auto leases = this.unsafeLeases(); 479 // sort.Sort(leasesByExpiry(leases)); 480 481 auto baseWindow = leases[0].Remaining(); 482 auto nextWindow = baseWindow + 1 /* time.Second */ ; 483 auto expires = 0; 484 // have fewer expires than the total revoke rate so piled up leases 485 // don't consume the entire revoke limit 486 auto targetExpiresPerSecond = (3 * leaseRevokeRate) / 4; 487 foreach (l; leases) 488 { 489 auto remaining = l.Remaining(); 490 if (remaining > nextWindow) 491 { 492 baseWindow = remaining; 493 nextWindow = baseWindow + 1 /* time.Second */ ; 494 expires = 1; 495 continue; 496 } 497 expires++; 498 if (expires <= targetExpiresPerSecond) 499 { 500 continue; 501 } 502 auto rateDelay = float64(1) * (float64(expires) / float64(targetExpiresPerSecond)); 503 // If leases are extended by n seconds, leases n seconds ahead of the 504 // base window should be extended by only one second. 505 rateDelay -= float64(remaining - baseWindow); 506 auto delay = (rateDelay); 507 nextWindow = baseWindow + delay; 508 l.refresh(delay + extend); 509 LeaseWithTime item = {id: 510 l.ID, time : l.expiry}; 511 this.leaseHeap.Push(item); 512 this.scheduleCheckpointIfNeeded(l); 513 } 514 } 515 516 void Demote() 517 { 518 _mutex.lock(); 519 scope (exit) 520 _mutex.unlock(); 521 522 // set the expiries of all leases to forever 523 foreach (LeaseID k, Lease l; this.leaseMap) 524 { 525 l.forever(); 526 } 527 528 this.clearScheduledLeasesCheckpoints(); 529 530 _isPrimary = false; 531 // if (this.demotec != null) 532 // { 533 // close(this.demotec); 534 // this.demotec = null; 535 // } 536 } 537 538 // Attach attaches items to the lease with given ID. When the lease 539 // expires, the attached items will be automatically removed. 540 // If the given lease does not exist, an error will be returned. 541 string Attach(LeaseID id, LeaseItem[] items) 542 { 543 _mutex.lock(); 544 scope (exit) 545 _mutex.unlock(); 546 547 auto l = (id in this.leaseMap); 548 if (l == null) 549 { 550 return ErrLeaseNotFound; 551 } 552 553 l._mutex.writer().lock(); 554 foreach (it; items) 555 { 556 l.itemSet.add(it.Key); 557 this.itemMap[it] = id; 558 } 559 l._mutex.writer().unlock(); 560 return null; 561 } 562 563 LeaseID GetLease(LeaseItem item) 564 { 565 this._mutex.lock(); 566 auto id = this.itemMap[item]; 567 this._mutex.unlock(); 568 return id; 569 } 570 571 // Detach detaches items from the lease with given ID. 572 // If the given lease does not exist, an error will be returned. 573 string Detach(LeaseID id, LeaseItem[] items) 574 { 575 _mutex.lock(); 576 scope (exit) 577 _mutex.unlock(); 578 579 auto l = (id in this.leaseMap); 580 if (l == null) 581 { 582 return ErrLeaseNotFound; 583 } 584 585 l._mutex.writer().lock(); 586 foreach (it; items) 587 { 588 // delete(l.itemSet, it); 589 l.itemSet.remove(it.Key); 590 // delete(this.itemMap, it); 591 this.itemMap.remove(it); 592 } 593 l._mutex.writer().unlock(); 594 return null; 595 } 596 597 void Recover( /* backend.Backend b, */ RangeDeleter rd) 598 { 599 _mutex.lock(); 600 scope (exit) 601 _mutex.unlock(); 602 603 // this.b = b; 604 this.rd = rd; 605 this.leaseMap.clear; 606 this.itemMap.clear; 607 this.initAndRecover(); 608 } 609 610 // ExpiredLeasesC() <-chan []*Lease { 611 // return this.expiredC; 612 // } 613 614 // void Stop() { 615 // close(this.stopC) 616 // <-this.doneC 617 // } 618 619 void runLoop() 620 { 621 // if(this.leaseHeap.length() > 0) 622 // { 623 // auto item = this.leaseHeap.get!LeaseWithTime(0); 624 // logWarning("begin lessor check... : "); 625 // } 626 try 627 { 628 this.revokeExpiredLeases(); 629 // this.checkpointScheduledLeases(); 630 } 631 catch (Throwable e) 632 { 633 logError("lease runloop error : ", e.msg); 634 } 635 636 } 637 638 // revokeExpiredLeases finds all leases past their expiry and sends them to epxired channel for 639 // to be revoked. 640 void revokeExpiredLeases() 641 { 642 Lease[] ls; 643 644 // rate limit 645 auto revokeLimit = leaseRevokeRate / 2; 646 647 this._mutex.lock(); 648 if (this.isPrimary()) 649 { 650 ls = this.findExpiredLeases(revokeLimit); 651 } 652 this._mutex.unlock(); 653 654 if ((ls.length) != 0) 655 { 656 new Thread(() { 657 logWarning("TO DO Hanle expired Leases's len : ", ls.length); 658 foreach (l; ls) 659 { 660 logWarning("TO DO Hanle expired Lease ID : ", l.ID); 661 RpcRequest rreq; 662 rreq.CMD = RpcReqCommand.LeaseRevokeRequest; 663 rreq.LeaseID = l.ID; 664 NetonRpcServer.instance().Propose(rreq); 665 } 666 }).start(); 667 } 668 } 669 670 // checkpointScheduledLeases finds all scheduled lease checkpoints that are due and 671 // submits them to the checkpointer to persist them to the consensus log. 672 void checkpointScheduledLeases() 673 { 674 LeaseCheckpoint[] cps; 675 676 // rate limit 677 for (int i = 0; i < leaseCheckpointRate / 2; i++) 678 { 679 _mutex.lock(); 680 if (this.isPrimary()) 681 { 682 cps = this.findDueScheduledCheckpoints(maxLeaseCheckpointBatchSize); 683 } 684 this._mutex.unlock(); 685 686 if ((cps.length) != 0) 687 { 688 LeaseCheckpointRequest req = new LeaseCheckpointRequest(); 689 req.checkpoints = cps; 690 if (this.cp != null) 691 this.cp(req); 692 } 693 if ((cps.length) < maxLeaseCheckpointBatchSize) 694 { 695 return; 696 } 697 } 698 } 699 700 void clearScheduledLeasesCheckpoints() 701 { 702 this.leaseCheckpointHeap.clear(); 703 } 704 705 // expireExists returns true if expiry items exist. 706 // It pops only when expiry item exists. 707 // "next" is true, to indicate that it may exist in next attempt. 708 Tuple!(Lease, bool, bool) expireExists() 709 { 710 Tuple!(Lease, bool, bool) result; 711 result[0] = null; 712 result[1] = false; 713 result[2] = false; 714 if (this.leaseHeap.length() == 0) 715 { 716 result[0] = null; 717 result[1] = false; 718 result[2] = false; 719 return result; 720 } 721 722 723 auto item = this.leaseHeap.get!LeaseWithTime(0); 724 // logInfo("---item id : ",item.id," time :",item.time); 725 726 auto exsit = (item.id in this.leaseMap); 727 if (exsit == null) 728 { 729 this.leaseHeap.Pop!LeaseWithTime(); // O(log N) 730 return result; 731 } 732 result[0] = *exsit; 733 if (result[0] is null) 734 { 735 736 // lease has expired or been revoked 737 // no need to revoke (nothing is expiry) 738 this.leaseHeap.Pop!LeaseWithTime(); // O(log N) 739 result[1] = false; 740 result[2] = true; 741 return result; 742 } 743 744 if (time() < item.time) /* expiration time */ 745 { 746 // Candidate expirations are caught up, reinsert this item 747 // and no need to revoke (nothing is expiry) 748 result[1] = false; 749 result[2] = false; 750 return result; 751 } 752 // if the lease is actually expired, add to the removal list. If it is not expired, we can ignore it because another entry will have been inserted into the heap 753 754 this.leaseHeap.Pop!LeaseWithTime(); // O(log N) 755 result[1] = true; 756 result[2] = false; 757 return result; 758 } 759 760 // findExpiredLeases loops leases in the leaseMap until reaching expired limit 761 // and returns the expired leases that needed to be revoked. 762 Lease[] findExpiredLeases(int limit) 763 { 764 Lease[] leases; 765 766 while (1) 767 { 768 Tuple!(Lease, bool, bool) result = this.expireExists(); 769 Lease l = result[0]; 770 bool ok = result[1]; 771 bool next = result[2]; 772 if (!ok && !next) 773 { 774 break; 775 } 776 if (!ok) 777 { 778 continue; 779 } 780 if (next) 781 { 782 continue; 783 } 784 785 if ((l !is null) && l.expired()) 786 { 787 leases ~= l; 788 789 // reach expired limit 790 if ((leases.length) == limit) 791 { 792 break; 793 } 794 } 795 } 796 797 return leases; 798 } 799 800 void scheduleCheckpointIfNeeded(Lease lease) 801 { 802 if (this.cp is null) 803 { 804 return; 805 } 806 807 if (lease.RemainingTTL() > int64(this.checkpointInterval /* .Seconds() */ )) 808 { 809 // if (this.lg != null) { 810 // this.lg.Debug("Scheduling lease checkpoint", 811 // zap.Int64("leaseID", int64(lease.ID)), 812 // zap.Duration("intervalSeconds", this.checkpointInterval), 813 // ); 814 // } 815 LeaseWithTime item = { 816 id: 817 lease.ID, time : time() + (this.checkpointInterval) /* .UnixNano() */ 818 819 }; 820 this.leaseCheckpointHeap.Push(item); 821 } 822 } 823 824 LeaseCheckpoint[] findDueScheduledCheckpoints(int checkpointLimit) 825 { 826 if (this.cp is null) 827 { 828 return null; 829 } 830 831 auto now = time(); 832 LeaseCheckpoint[] cps; 833 while (this.leaseCheckpointHeap.length() > 0 && (cps.length) < checkpointLimit) 834 { 835 auto lt = this.leaseCheckpointHeap.get!LeaseWithTime(0); 836 if (lt.time > now /* .UnixNano() */ ) 837 { 838 return cps; 839 } 840 this.leaseCheckpointHeap.Pop!LeaseWithTime(); 841 auto l = (lt.id in this.leaseMap); 842 bool ok; 843 if (l == null) 844 { 845 continue; 846 } 847 if (!(now < (l.expiry))) 848 { 849 continue; 850 } 851 auto remainingTTL = int64((l.expiry - (now) /* .Seconds() */ )); 852 if (remainingTTL >= l.ttl) 853 { 854 continue; 855 } 856 // if (this.lg != null) 857 { 858 logDebug("Checkpointing lease --", "leaseID : ", lt.id, 859 "remainingTTL : ", remainingTTL); 860 } 861 LeaseCheckpoint item = new LeaseCheckpoint(); 862 item.ID = int64(lt.id); 863 item.remainingTTL = remainingTTL; 864 cps ~= item; 865 } 866 return cps; 867 } 868 869 void init(Lease l) 870 { 871 logDebug(" -------****------> init add lease : ", l.ID); 872 this.leaseMap[l.ID] = l; 873 } 874 875 void initAndRecover() 876 { 877 ///@gxc 878 implementationMissing(); 879 // auto tx = this.b.BatchTx(); 880 // tx.Lock(); 881 882 // tx.UnsafeCreateBucket(leaseBucketName); 883 // _, vs : = tx.UnsafeRange(leaseBucketName, int64ToBytes(0), int64ToBytes(math.MaxInt64), 0); 884 // // TODO: copy vs and do decoding outside tx lock if lock contention becomes an issue. 885 // for (int i = 0; i < vs.length; i++) 886 // { 887 // leasepb.Lease lpb; 888 // auto err = lpb.Unmarshal(vs[i]); 889 // if (err != null) 890 // { 891 // tx.Unlock(); 892 // panic("failed to unmarshal lease proto item"); 893 // } 894 // auto ID = LeaseID(lpb.ID); 895 // if (lpb.TTL < this.minLeaseTTL) 896 // { 897 // lpb.TTL = this.minLeaseTTL; 898 // } 899 // this.leaseMap[ID] = Lease 900 // { 901 // ID: 902 // ID, ttl : lpb.TTL, // itemSet will be filled in when recover key-value pairs 903 // // set expiry to forever, refresh when promoted 904 // itemSet : make(map[LeaseItem]struct 905 // { 906 // } 907 // ), expiry : forever, revokec : make(chan struct 908 // { 909 // }), 910 // }; 911 // } 912 // this.leaseHeap.Init(); 913 // this.leaseCheckpointHeap.Init(); 914 // tx.Unlock(); 915 916 // this.b.ForceCommit(); 917 } 918 919 } 920 921 struct LessorConfig 922 { 923 int64 MinLeaseTTL; 924 long CheckpointInterval; 925 } 926 927 Lessor NewLessor(LessorConfig cfg) 928 { 929 return newLessor(cfg); 930 } 931 932 Lessor newLessor(LessorConfig cfg) 933 { 934 auto checkpointInterval = cfg.CheckpointInterval; 935 if (checkpointInterval == 0) 936 { 937 checkpointInterval = 5 * 60; 938 } 939 // l := &lessor{ 940 // leaseMap: make(map[LeaseID]*Lease), 941 // itemMap: make(map[LeaseItem]LeaseID), 942 // leaseHeap: make(LeaseQueue, 0), 943 // leaseCheckpointHeap: make(LeaseQueue, 0), 944 // b: b, 945 // minLeaseTTL: cfg.MinLeaseTTL, 946 // checkpointInterval: checkpointInterval, 947 // // expiredC is a small buffered chan to avoid unnecessary blocking. 948 // expiredC: make(chan []*Lease, 16), 949 // stopC: make(chan struct{}), 950 // doneC: make(chan struct{}), 951 // lg: lg, 952 // } 953 auto l = new Lessor(cfg.MinLeaseTTL, checkpointInterval); 954 // l.initAndRecover(); // go l.runLoop() 955 956 return l; 957 } 958 959 alias leasesByExpiry = Lease[]; 960 961 int Len(leasesByExpiry le) 962 { 963 return cast(int)(le.length); 964 } 965 966 bool Less(leasesByExpiry le, int i, int j) 967 { 968 return le[i].Remaining() < le[j].Remaining(); 969 } 970 971 void Swap(leasesByExpiry le, int i, int j) 972 { 973 auto temp = le[i]; 974 le[i] = le[j]; 975 le[j] = temp; 976 } 977 978 class Lease 979 { 980 LeaseID ID; 981 int64 ttl; // time to live of the lease in seconds 982 int64 remainingTTL; // remaining time to live in seconds, if zero valued it is considered unset and the full ttl should be used 983 // expiryMu protects concurrent accesses to expiry 984 Mutex expiryMu; 985 // expiry is time when lease should expire. no expiration when expiry.IsZero() is true 986 long expiry; 987 988 // mu protects concurrent accesses to itemSet 989 ReadWriteMutex _mutex; 990 HashSet!string itemSet; 991 // revokec chan struct{} 992 993 this() 994 { 995 itemSet = new HashSet!string(); 996 expiryMu = new Mutex(); 997 _mutex = new ReadWriteMutex(); 998 } 999 1000 bool expired() 1001 { 1002 return this.Remaining() <= 0; 1003 } 1004 1005 void persistTo( /* backend.Backend b */ ) 1006 { 1007 implementationMissing(); 1008 // auto key = int64ToBytes(int64(this.ID)); 1009 1010 // auto lpb = new leasepb.Lease(); 1011 // lpb.ID = int64(this.ID); 1012 // lpb.TTL = this.ttl; 1013 // lpb.RemainingTTL = this.remainingTTL; 1014 1015 // auto val = lpb.Marshal(); 1016 // if (val is null) 1017 // { 1018 // panic("failed to marshal lease proto item"); 1019 // } 1020 1021 // b.BatchTx().Lock(); 1022 // b.BatchTx().UnsafePut(leaseBucketName, key, val); 1023 // b.BatchTx().Unlock(); 1024 } 1025 1026 // TTL returns the TTL of the Lease. 1027 int64 TTL() 1028 { 1029 return this.ttl; 1030 } 1031 1032 // RemainingTTL returns the last checkpointed remaining TTL of the lease. 1033 // TODO(jpbetz): do not expose this utility method 1034 int64 RemainingTTL() 1035 { 1036 if (this.remainingTTL > 0) 1037 { 1038 return this.remainingTTL; 1039 } 1040 return this.ttl; 1041 } 1042 1043 // refresh refreshes the expiry of the lease. 1044 void refresh(long extend) 1045 { 1046 auto newExpiry = time() + (extend + (this.RemainingTTL()) * 1); 1047 this.expiryMu.lock(); 1048 scope (exit) 1049 this.expiryMu.unlock(); 1050 this.expiry = newExpiry; 1051 } 1052 1053 // forever sets the expiry of lease to be forever. 1054 void forever() 1055 { 1056 this.expiryMu.lock(); 1057 scope (exit) 1058 this.expiryMu.unlock(); 1059 this.expiry = FOREVER; 1060 } 1061 1062 // Keys returns all the keys attached to the lease. 1063 string[] Keys() 1064 { 1065 this._mutex.reader.lock(); 1066 string[] keys; 1067 foreach (k; this.itemSet) 1068 { 1069 keys ~= k; 1070 } 1071 this._mutex.reader.unlock(); 1072 return keys; 1073 } 1074 1075 // Remaining returns the remaining time of the lease. 1076 long Remaining() 1077 { 1078 this.expiryMu.lock(); 1079 scope (exit) 1080 this.expiryMu.unlock(); 1081 if (this.expiry == long.init) 1082 { 1083 return long.max; 1084 } 1085 return /* time.Until */ (this.expiry - time()); 1086 } 1087 1088 } 1089 1090 struct LeaseItem 1091 { 1092 string Key; 1093 } 1094 1095 byte[] int64ToBytes(int64 n) 1096 { 1097 // bytes := make([]byte, 8) 1098 // binary.BigEndian.PutUint64(bytes, uint64(n)) 1099 byte[] bytes; 1100 auto d = nativeToBigEndian(n); 1101 foreach (b; d) 1102 { 1103 bytes ~= cast(byte) b; 1104 } 1105 return bytes; 1106 } 1107 1108 // FakeLessor is a fake implementation of Lessor interface. 1109 // Used for testing only. 1110 // type FakeLessor struct{} 1111 1112 // func (fl *FakeLessor) SetRangeDeleter(dr RangeDeleter) {} 1113 1114 // func (fl *FakeLessor) SetCheckpointer(cp Checkpointer) {} 1115 1116 // func (fl *FakeLessor) Grant(id LeaseID, ttl int64) (*Lease, error) { return null, null } 1117 1118 // func (fl *FakeLessor) Revoke(id LeaseID) error { return null } 1119 1120 // func (fl *FakeLessor) Checkpoint(id LeaseID, remainingTTL int64) error { return null } 1121 1122 // func (fl *FakeLessor) Attach(id LeaseID, items []LeaseItem) error { return null } 1123 1124 // func (fl *FakeLessor) GetLease(item LeaseItem) LeaseID { return 0 } 1125 // func (fl *FakeLessor) Detach(id LeaseID, items []LeaseItem) error { return null } 1126 1127 // func (fl *FakeLessor) Promote(extend time.Duration) {} 1128 1129 // func (fl *FakeLessor) Demote() {} 1130 1131 // func (fl *FakeLessor) Renew(id LeaseID) (int64, error) { return 10, null } 1132 1133 // func (fl *FakeLessor) Lookup(id LeaseID) *Lease { return null } 1134 1135 // func (fl *FakeLessor) Leases() []*Lease { return null } 1136 1137 // func (fl *FakeLessor) ExpiredLeasesC() <-chan []*Lease { return null } 1138 1139 // func (fl *FakeLessor) Recover(b backend.Backend, rd RangeDeleter) {} 1140 1141 // func (fl *FakeLessor) Stop() {}