1 module neton.rpcservice.WatchService; 2 3 import neton.protocol.neton; 4 import neton.protocol.neton; 5 import neton.protocol.netonrpc; 6 import grpc; 7 import hunt.logging; 8 import neton.server.NetonRpcServer; 9 import neton.util.Future; 10 import neton.rpcservice.Command; 11 import std.stdio; 12 import hunt.util.Serialize; 13 import core.sync.mutex; 14 import std.conv; 15 16 struct WatchInfo 17 { 18 bool created = false; 19 long watchId = 0; 20 ResponseHeader header; 21 } 22 23 __gshared size_t WATCH_ID = 0; 24 __gshared Mutex _gmutex; 25 26 shared static this() 27 { 28 _gmutex = new Mutex(); 29 } 30 31 size_t plusWatchId() 32 { 33 synchronized( _gmutex ) 34 { 35 ++WATCH_ID; 36 } 37 return WATCH_ID; 38 } 39 40 class WatchService : WatchBase 41 { 42 43 override Status Watch(ServerReaderWriter!(WatchRequest, WatchResponse) rw) 44 { 45 logDebug("--------------"); 46 47 WatchRequest watchReq; 48 49 while (rw.read(watchReq)) 50 { 51 logDebug("watch -----> : ", watchReq.requestUnionCase(), " last watchid : ",WATCH_ID); 52 if (watchReq.requestUnionCase() == WatchRequest.RequestUnionCase.createRequest) 53 { 54 auto f = new Future!(ServerReaderWriter!(WatchRequest, WatchResponse), WatchInfo)( 55 rw); 56 WatchResponse respon = new WatchResponse(); 57 respon.created = true; 58 respon.watchId = plusWatchId(); 59 auto header = new ResponseHeader(); 60 header.clusterId = 1; 61 header.memberId = 1; 62 header.raftTerm = 1; 63 header.revision = 1; 64 respon.header = header; 65 66 rw.write(respon); 67 68 RpcRequest rreq; 69 rreq.CMD = RpcReqCommand.WatchRequest; 70 rreq.Key = cast(string)(watchReq._createRequest.key); 71 rreq.Value = respon.watchId.to!string; 72 rreq.Hash = this.toHash() + respon.watchId; 73 WatchInfo info; 74 info.created = true; 75 info.watchId = respon.watchId; 76 info.header = header; 77 f.setExtraData(info); 78 NetonRpcServer.instance().ReadIndex(rreq, f); 79 } 80 else if (watchReq.requestUnionCase() == WatchRequest.RequestUnionCase.cancelRequest) 81 { 82 logDebug("watch cancel ID : ",watchReq._cancelRequest.watchId); 83 84 auto f = new Future!(ServerReaderWriter!(WatchRequest, WatchResponse), WatchInfo)( 85 rw); 86 auto watchID = watchReq._cancelRequest.watchId; 87 auto header = new ResponseHeader(); 88 header.clusterId = 1; 89 header.memberId = 1; 90 header.raftTerm = 1; 91 header.revision = 1; 92 93 RpcRequest rreq; 94 rreq.CMD = RpcReqCommand.WatchCancelRequest; 95 rreq.Value = watchID.to!string; 96 rreq.Hash = this.toHash() + watchID; 97 WatchInfo info; 98 info.created = false; 99 info.watchId = watchID; 100 info.header = header; 101 f.setExtraData(info); 102 NetonRpcServer.instance().Propose(rreq, f); 103 } 104 } 105 logWarning("watch service end : ", this.toHash()); 106 return Status.OK; 107 } 108 109 }