1 module neton.rpcservice.WatchService;
2 
3 import neton.protocol.neton;
4 import neton.protocol.neton;
5 import neton.protocol.netonrpc;
6 import grpc;
7 import hunt.logging;
8 import neton.server.NetonRpcServer;
9 import neton.util.Future;
10 import neton.rpcservice.Command;
11 import std.stdio;
12 import hunt.util.Serialize;
13 import core.sync.mutex;
14 import std.conv;
15 
16 struct WatchInfo
17 {
18     bool created = false;
19     long watchId = 0;
20     ResponseHeader header;
21 }
22 
23 __gshared size_t WATCH_ID = 0;
24 __gshared Mutex _gmutex;
25 
26 shared static this()
27 {
28     _gmutex = new Mutex();
29 }
30 
31 size_t plusWatchId()
32 {
33     synchronized( _gmutex )
34     {
35         ++WATCH_ID;
36     }
37     return WATCH_ID;
38 }
39 
40 class WatchService : WatchBase
41 {
42 
43     override Status Watch(ServerReaderWriter!(WatchRequest, WatchResponse) rw)
44     {
45         logDebug("--------------");
46 
47         WatchRequest watchReq;
48 
49         while (rw.read(watchReq))
50         {
51             logDebug("watch -----> : ", watchReq.requestUnionCase(), " last watchid : ",WATCH_ID);
52             if (watchReq.requestUnionCase() == WatchRequest.RequestUnionCase.createRequest)
53             {
54                 auto f = new Future!(ServerReaderWriter!(WatchRequest, WatchResponse), WatchInfo)(
55                         rw);
56                 WatchResponse respon = new WatchResponse();
57                 respon.created = true;
58                 respon.watchId = plusWatchId();
59                 auto header = new ResponseHeader();
60                 header.clusterId = 1;
61                 header.memberId = 1;
62                 header.raftTerm = 1;
63                 header.revision = 1;
64                 respon.header = header;
65                 
66                 rw.write(respon);
67 
68                 RpcRequest rreq;
69                 rreq.CMD = RpcReqCommand.WatchRequest;
70                 rreq.Key = cast(string)(watchReq._createRequest.key);
71                 rreq.Value = respon.watchId.to!string;
72                 rreq.Hash = this.toHash() + respon.watchId;
73                 WatchInfo info;
74                 info.created = true;
75                 info.watchId = respon.watchId;
76                 info.header = header;
77                 f.setExtraData(info);
78                 NetonRpcServer.instance().ReadIndex(rreq, f);
79             }
80             else if (watchReq.requestUnionCase() == WatchRequest.RequestUnionCase.cancelRequest)
81             {
82                 logDebug("watch cancel ID : ",watchReq._cancelRequest.watchId);
83 
84                 auto f = new Future!(ServerReaderWriter!(WatchRequest, WatchResponse), WatchInfo)(
85                         rw);
86                 auto watchID = watchReq._cancelRequest.watchId;
87                 auto header = new ResponseHeader();
88                 header.clusterId = 1;
89                 header.memberId = 1;
90                 header.raftTerm = 1;
91                 header.revision = 1;
92                 
93                 RpcRequest rreq;
94                 rreq.CMD = RpcReqCommand.WatchCancelRequest;
95                 rreq.Value = watchID.to!string;
96                 rreq.Hash = this.toHash() + watchID;
97                 WatchInfo info;
98                 info.created = false;
99                 info.watchId = watchID;
100                 info.header = header;
101                 f.setExtraData(info);
102                 NetonRpcServer.instance().Propose(rreq, f);
103             }
104         }
105         logWarning("watch service end : ", this.toHash());
106         return Status.OK;
107     }
108 
109 }