1 module neton.store.Watcher; 2 3 import neton.store.Event; 4 import std.uuid; 5 import hunt.logging; 6 7 interface WatcherInter { 8 Event[] events(); 9 ulong StartIndex(); // The netonIndex at which the Watcher was created 10 void Remove(); 11 } 12 13 alias RemoveFunc = void delegate(string,string); 14 15 const int MAX_EVENTS = 100; 16 17 class Watcher : WatcherInter { 18 19 this() 20 { 21 _uuid = randomUUID().toString; 22 } 23 24 this(bool stream,bool recursive,ulong sinceIndex,ulong startIndex) 25 { 26 _uuid = randomUUID().toString; 27 _stream = stream; 28 _recursive = recursive; 29 _sinceIndex = sinceIndex; 30 _startIndex = startIndex; 31 } 32 33 Event[] events() 34 { 35 return _events; 36 } 37 38 void clearEvent() 39 { 40 _events.length = 0; 41 } 42 43 void insertEvent(Event e) 44 { 45 _events ~= e; 46 } 47 48 @property bool haveNotify() 49 { 50 return _events.length != 0; 51 } 52 53 @property string uuid() 54 { 55 return _uuid; 56 } 57 58 @property ulong StartIndex() 59 { 60 return _startIndex; 61 } 62 63 @property bool stream() 64 { 65 return _stream; 66 } 67 68 @property bool removed() 69 { 70 return _removed; 71 } 72 73 void setRemoved(bool f) 74 { 75 _removed = f; 76 } 77 78 void setKey(string key) 79 { 80 _key = key; 81 } 82 83 @property string key() 84 { 85 return _key; 86 } 87 88 void setWatchId(size_t wid) 89 { 90 _watchId = wid; 91 } 92 93 @property size_t watchId() 94 { 95 return _watchId; 96 } 97 98 void setHash(size_t hash) 99 { 100 _hash = hash; 101 } 102 103 @property size_t hash() 104 { 105 return _hash; 106 } 107 108 // notify function notifies the watcher. If the watcher interests in the given path, 109 // the function will return true. 110 bool notify(Event e, bool originalPath, bool deleted) { 111 // watcher is interested the path in three cases and under one condition 112 // the condition is that the event happens after the watcher's sinceIndex 113 114 // 1. the path at which the event happens is the path the watcher is watching at. 115 // For example if the watcher is watching at "/foo" and the event happens at "/foo", 116 // the watcher must be interested in that event. 117 118 // 2. the watcher is a recursive watcher, it interests in the event happens after 119 // its watching path. For example if watcher A watches at "/foo" and it is a recursive 120 // one, it will interest in the event happens at "/foo/bar". 121 122 // 3. when we delete a directory, we need to force notify all the watchers who watches 123 // at the file we need to delete. 124 // For example a watcher is watching at "/foo/bar". And we deletes "/foo". The watcher 125 // should get notified even if "/foo" is not the path it is watching. 126 //logInfo("---- have a notify2 : ", _recursive," e.index :", e.Index()," _sinceIndex : ",_sinceIndex ); 127 if ((_recursive || originalPath || deleted) && e.Index() >= _sinceIndex) { 128 // We cannot block here if the event array capacity is full, otherwise 129 // neton will hang. event array capacity is full when the rate of 130 // notifications are higher than our send rate. 131 // If this happens, we close the channel. 132 133 logInfo("---- have a notify : ", _key); 134 if(_events.length >= MAX_EVENTS) 135 { 136 logWarning("---- too many events : ", _key); 137 Remove(); 138 return true; 139 } 140 _events ~= e; 141 return true; 142 } 143 return false; 144 } 145 146 147 // Remove removes the watcher from watcherHub 148 // The actual remove function is guaranteed to only be executed once 149 void Remove() { 150 _events.length = 0; 151 if (_remove !is null ){ 152 _removed = true; 153 _remove(_key,_uuid); 154 } 155 } 156 157 void setRemoveFunc(string key,RemoveFunc func) 158 { 159 _key = key; 160 _remove = func; 161 } 162 163 private : 164 Event[] _events; 165 bool _stream; 166 bool _recursive; 167 ulong _sinceIndex; 168 ulong _startIndex; 169 170 bool _removed; 171 RemoveFunc _remove; 172 string _uuid; 173 string _key; 174 size_t _watchId; 175 size_t _hash; 176 } 177