1 module neton.store.WatcherHub; 2 3 import neton.store.EventHistory; 4 import std.algorithm.searching; 5 import std.range; 6 import std.container.dlist; 7 import core.sync.mutex; 8 import std.experimental.allocator; 9 import hunt.logging; 10 import std.string; 11 import neton.store.Watcher; 12 import neton.store.Event; 13 // A watcherHub contains all subscribed watchers 14 // watchers is a map with watched path as key and watcher as value 15 // EventHistory keeps the old events for watcherHub. It is used to help 16 // watcher to get a continuous event history. Or a watcher might miss the 17 // event happens between the end of the first watch command and the start 18 // of the second command. 19 alias WatcherList = DList!Watcher; 20 class WatcherHub { 21 22 private : 23 ulong _count; // current number of watchers. 24 25 Mutex _mutex; 26 WatcherList[string] _watchersMap; 27 EventHistory _eventHistory; 28 29 public : 30 // creates a watcherHub. The capacity determines how many events we will 31 // keep in the eventHistory. 32 // Typically, we only need to keep a small size of history[smaller than 20K]. 33 // Ideally, it should smaller than 20K/s[max throughput] * 2 * 50ms[RTT] = 2000 34 this(int capacity) 35 { 36 _eventHistory = new EventHistory(capacity); 37 _mutex = new Mutex; 38 } 39 40 void remove(string key, string uuid) 41 { 42 if((key in _watchersMap) !is null) 43 { 44 auto wl = _watchersMap[key]; 45 auto range = wl[]; 46 for ( ; !range.empty; range.popFront()) 47 { 48 auto item = range.front; 49 if (item.uuid == uuid) 50 { 51 wl.stableLinearRemove(take(range, 1)); 52 break; 53 } 54 } 55 if (wl[].empty) { 56 // if we have notified all watcher in the list 57 // we can delete the list 58 _watchersMap.remove(key); 59 } 60 } 61 logInfo("---- remove fuc call : ", key); 62 } 63 // Watch function returns a Watcher. 64 // If recursive is true, the first change after index under key will be sent to the event channel of the watcher. 65 // If recursive is false, the first change after index at key will be sent to the event channel of the watcher. 66 // If index is zero, watch will start from the current index + 1. 67 Watcher watch(string key,bool recursive,bool stream, ulong index,ulong storeIndex) { 68 69 auto event = _eventHistory.scan(key, recursive, index); 70 71 auto w = theAllocator.make!Watcher(stream,recursive,index,storeIndex); 72 73 synchronized(_mutex) 74 { 75 // If the event exists in the known history, append the netonIndex and return immediately 76 if (event !is null) { 77 logInfo("---- find event from watcher : ", key); 78 auto ne = event.Clone(); 79 ne.setNetonIndex(storeIndex); 80 w.insertEvent(ne); 81 w.setKey(key); 82 return w; 83 } 84 85 if (key in _watchersMap) { // add the new watcher to the back of the list 86 logInfo("---- key in _watchersMap : ", key); 87 auto l = _watchersMap[key]; 88 l.insertBack(w); 89 } else { // create a new list and add the new watcher 90 WatcherList wl; 91 wl.insertBack(w); 92 _watchersMap[key] = wl; 93 logInfo("---- new key in _watchersMap : ", wl[]); 94 } 95 96 w.setRemoveFunc(key,&this.remove); 97 } 98 99 return w; 100 } 101 102 void add(Event e) { 103 _eventHistory.addEvent(e); 104 } 105 106 // notify function accepts an event and notify to the watchers. 107 void notify(Event ne) { 108 auto e = _eventHistory.addEvent(ne); // add event into the eventHistory 109 110 auto segments = split(e.nodeKey(), "/"); 111 logInfo("---- segments : ", segments); 112 113 // walk through all the segments of the path and notify the watchers 114 // if the path is "/foo/bar", it will notify watchers with path "/", 115 // "/foo" and "/foo/bar" 116 for(int i = 0; i < segments.length; i++) { 117 string path ; 118 for(int j =0; j <= i ;j++) 119 { 120 path ~= "/" ; 121 path ~= segments[j]; 122 // notify the watchers who interests in the changes of current path 123 } 124 if(path.length > 1) 125 notifyWatchers(e, path[1..$], false); 126 else 127 notifyWatchers(e, path, false); 128 } 129 } 130 131 void notifyWatchers(Event e, string nodePath , bool deleted) { 132 //logInfo("---- notifyWatchers : ", nodePath); 133 synchronized(_mutex) 134 { 135 if ((nodePath in _watchersMap) !is null) { 136 // logInfo("---- in map--- : ",nodePath); 137 auto wl = _watchersMap[nodePath]; 138 139 auto range = wl[]; 140 bool originalPath = (e.nodeKey() == nodePath); 141 for ( ; !range.empty; range.popFront()) 142 { 143 auto w = range.front; 144 // logInfo("---- have watcher for--- : ",nodePath, " is originalPath :",originalPath); 145 if(/*originalPath && */ w.notify(e, originalPath, deleted)) 146 { 147 if(!w.stream) 148 { 149 w.setRemoved(true); 150 wl.stableLinearRemove(take(range, 1)); 151 } 152 } 153 } 154 155 if (wl[].empty) { 156 // if we have notified all watcher in the list 157 // we can delete the list 158 _watchersMap.remove(nodePath); 159 } 160 } 161 } 162 163 } 164 } 165