1 module neton.server.WatchServer; 2 3 import core.thread; 4 import core.sync.mutex; 5 import std.array; 6 import std.conv; 7 import std.algorithm.mutation; 8 9 import hunt.logging; 10 import hunt.raft; 11 import hunt.net; 12 import hunt.event.timer.Common; 13 import hunt.util.Timer; 14 import hunt.event.EventLoop; 15 import hunt.event.timer; 16 17 import neton.server.NetonRpcServer; 18 import neton.store.Watcher; 19 import neton.store.Event; 20 21 import neton.util.Future; 22 23 import neton.rpcservice; 24 import neton.protocol.neton; 25 import neton.protocol.neton; 26 import neton.protocol.netonrpc; 27 import grpc; 28 29 class WatchServer 30 { 31 private __gshared WatchServer _gserver; 32 private Watcher[] _watchers; 33 private Mutex _mutex; 34 private EventLoop _eventLoop; 35 private ITimer _timer; 36 37 private this() 38 { 39 _mutex = new Mutex(); 40 _eventLoop = new EventLoop(); 41 _timer = new Timer(_eventLoop, 100.msecs).onTick(&scanWatchers); 42 _eventLoop.run(-1); 43 } 44 45 static WatchServer instance() 46 { 47 if (_gserver is null) 48 _gserver = new WatchServer(); 49 return _gserver; 50 } 51 52 void run() 53 { 54 _timer.start(); 55 } 56 57 void addWatcher(ref Watcher w) 58 { 59 _mutex.lock(); 60 scope (exit) 61 _mutex.unlock(); 62 63 _watchers ~= w; 64 } 65 66 void removeWatcher(size_t hash) 67 { 68 _mutex.lock(); 69 scope (exit) 70 _mutex.unlock(); 71 72 foreach (w; _watchers) 73 { 74 if (w.watchId == hash) 75 w.Remove(); 76 } 77 auto wl = remove!(a => a.watchId == hash)(_watchers); 78 move(wl, _watchers); 79 logInfo("---watchers len : ", _watchers.length); 80 } 81 82 void scanWatchers(Object sender) 83 { 84 _mutex.lock(); 85 scope (exit) 86 _mutex.unlock(); 87 { 88 foreach (w; _watchers) 89 { 90 if (w.haveNotify) 91 { 92 logInfo("----- scaned notify key: ", w.key, " hash :", w.hash); 93 auto h = NetonRpcServer.instance().getRequest(w.hash); 94 if (h !is null) 95 { 96 auto handler = cast(Future!(ServerReaderWriter!(WatchRequest, 97 WatchResponse), WatchInfo))(h); 98 if (handler is null) 99 { 100 logWarning("--- watch handler convert fail ---"); 101 continue; 102 } 103 WatchResponse respon = new WatchResponse(); 104 WatchInfo watchInfo = handler.ExtraData(); 105 respon.header = watchInfo.header; 106 respon.created = false; 107 respon.watchId = watchInfo.watchId; 108 109 handler.ExtraData().header.revision +=1; 110 111 auto es = w.events(); 112 foreach (e; es) 113 { 114 auto event = new neton.protocol.neton.Event(); 115 auto kv = new KeyValue(); 116 kv.key = cast(ubyte[])((e.nodeOriginKey())); 117 kv.value = cast(ubyte[])(e.rpcValue()); 118 event.kv = kv; 119 event.prevKv = kv; 120 if (e.action() == EventAction.Delete) 121 event.type = neton.protocol.neton.Event.EventType.DELETE; 122 else 123 event.type = neton.protocol.neton.Event.EventType.PUT; 124 respon.events ~= event; 125 logInfo("--- -> notify event key: ", e.nodeOriginKey(), 126 " value : ", e.rpcValue(), " type :", e.action()); 127 } 128 w.clearEvent(); 129 handler.data().write(respon); 130 } 131 } 132 } 133 } 134 } 135 }