1 module neton.server.WatchServer;
2 
3 import core.thread;
4 import core.sync.mutex;
5 import std.array;
6 import std.conv;
7 import std.algorithm.mutation;
8 
9 import hunt.logging;
10 import hunt.raft;
11 import hunt.net;
12 import hunt.event.timer.Common;
13 import hunt.util.Timer;
14 import hunt.event.EventLoop;
15 import hunt.event.timer;
16 
17 import neton.server.NetonRpcServer;
18 import neton.store.Watcher;
19 import neton.store.Event;
20 
21 import neton.util.Future;
22 
23 import neton.rpcservice;
24 import neton.protocol.neton;
25 import neton.protocol.neton;
26 import neton.protocol.netonrpc;
27 import grpc;
28 
29 class WatchServer
30 {
31 	private __gshared WatchServer _gserver;
32     private Watcher[] _watchers;
33     private Mutex _mutex;
34 	private EventLoop _eventLoop;
35 	private ITimer _timer;
36 
37     private this()
38     {
39         _mutex = new Mutex();
40 		_eventLoop = new EventLoop();
41         _timer = new Timer(_eventLoop, 100.msecs).onTick(&scanWatchers);
42 		_eventLoop.run(-1);
43     }
44 
45 	static WatchServer instance()
46 	{
47 		if (_gserver is null)
48 			_gserver = new WatchServer();
49 		return _gserver;
50 	}
51 
52     void run()
53     {
54         _timer.start();
55     }
56 
57     void addWatcher(ref Watcher w)
58     {
59         _mutex.lock();
60 		scope (exit)
61 			_mutex.unlock();
62 
63         _watchers ~= w;
64     }
65 
66     void removeWatcher(size_t hash)
67 	{
68         _mutex.lock();
69 		scope (exit)
70 			_mutex.unlock();
71 
72 		foreach (w; _watchers)
73 		{
74 			if (w.watchId == hash)
75 				w.Remove();
76 		}
77 		auto wl = remove!(a => a.watchId == hash)(_watchers);
78 		move(wl, _watchers);
79 		logInfo("---watchers len : ", _watchers.length);
80 	}
81 
82     void scanWatchers(Object sender)
83 	{
84 		_mutex.lock();
85 		scope (exit)
86 			_mutex.unlock();
87 		{
88 			foreach (w; _watchers)
89 			{
90 				if (w.haveNotify)
91 				{
92 					logInfo("----- scaned notify key: ", w.key, " hash :", w.hash);
93 					auto h = NetonRpcServer.instance().getRequest(w.hash);
94 					if (h !is null)
95 					{
96 						auto handler = cast(Future!(ServerReaderWriter!(WatchRequest,
97 								WatchResponse), WatchInfo))(h);
98 						if (handler is null)
99 						{
100 							logWarning("--- watch handler convert fail ---");
101 							continue;
102 						}
103 						WatchResponse respon = new WatchResponse();
104 						WatchInfo watchInfo = handler.ExtraData();
105 						respon.header = watchInfo.header;
106 						respon.created = false;
107 						respon.watchId = watchInfo.watchId;
108 
109 						handler.ExtraData().header.revision +=1;
110 
111 						auto es = w.events();
112 						foreach (e; es)
113 						{
114 							auto event = new neton.protocol.neton.Event();
115 							auto kv = new KeyValue();
116 							kv.key = cast(ubyte[])((e.nodeOriginKey()));
117 							kv.value = cast(ubyte[])(e.rpcValue());
118 							event.kv = kv;
119 							event.prevKv = kv;
120 							if (e.action() == EventAction.Delete)
121 								event.type = neton.protocol.neton.Event.EventType.DELETE;
122 							else
123 								event.type = neton.protocol.neton.Event.EventType.PUT;
124 							respon.events ~= event;
125 							logInfo("--- -> notify event key: ", e.nodeOriginKey(),
126 									" value : ", e.rpcValue(), " type :", e.action());
127 						}
128 						w.clearEvent();
129 						handler.data().write(respon);
130 					}
131 				}
132 			}
133 		}
134 	}
135 }