1 module neton.store.WatcherHub;
2 
3 import neton.store.EventHistory;
4 import std.algorithm.searching;
5 import std.range;
6 import std.container.dlist;
7 import core.sync.mutex;
8 import std.experimental.allocator;
9 import hunt.logging;
10 import std.string;
11 import neton.store.Watcher;
12 import neton.store.Event;
13 // A watcherHub contains all subscribed watchers
14 // watchers is a map with watched path as key and watcher as value
15 // EventHistory keeps the old events for watcherHub. It is used to help
16 // watcher to get a continuous event history. Or a watcher might miss the
17 // event happens between the end of the first watch command and the start
18 // of the second command.
19 alias WatcherList = DList!Watcher;
20 class WatcherHub {
21 
22     private :
23         ulong   _count; // current number of watchers.
24 
25         Mutex   _mutex;
26         WatcherList[string] _watchersMap;
27         EventHistory      _eventHistory;
28 
29     public :
30     // creates a watcherHub. The capacity determines how many events we will
31     // keep in the eventHistory.
32     // Typically, we only need to keep a small size of history[smaller than 20K].
33     // Ideally, it should smaller than 20K/s[max throughput] * 2 * 50ms[RTT] = 2000
34     this(int capacity)
35     {
36         _eventHistory = new EventHistory(capacity);
37         _mutex = new Mutex;
38     }
39 
40     void remove(string key, string uuid)
41     {
42         if((key in _watchersMap) !is null)
43         {
44             auto wl = _watchersMap[key];
45             auto range = wl[]; 
46             for ( ; !range.empty; range.popFront()) 
47             { 
48                 auto item = range.front; 
49                 if (item.uuid == uuid) 
50                 { 
51                     wl.stableLinearRemove(take(range, 1)); 
52                     break; 
53                 } 
54             } 
55             if (wl[].empty) {
56                     // if we have notified all watcher in the list
57                     // we can delete the list
58                     _watchersMap.remove(key);
59                 }           
60         }
61         logInfo("---- remove fuc call  : ", key);
62     }
63     // Watch function returns a Watcher.
64     // If recursive is true, the first change after index under key will be sent to the event channel of the watcher.
65     // If recursive is false, the first change after index at key will be sent to the event channel of the watcher.
66     // If index is zero, watch will start from the current index + 1.
67     Watcher watch(string key,bool recursive,bool stream, ulong index,ulong storeIndex) {
68         
69         auto event = _eventHistory.scan(key, recursive, index);
70 
71         auto w = theAllocator.make!Watcher(stream,recursive,index,storeIndex);
72 
73         synchronized(_mutex)
74         {
75              // If the event exists in the known history, append the netonIndex and return immediately
76                 if (event !is null) {
77                     logInfo("---- find event from watcher : ", key);
78                     auto ne = event.Clone();
79                     ne.setNetonIndex(storeIndex);
80                     w.insertEvent(ne);
81                     w.setKey(key);
82                     return w;
83                 }
84 
85                 if (key in _watchersMap) { // add the new watcher to the back of the list
86                     logInfo("---- key in _watchersMap : ", key);
87                     auto l = _watchersMap[key];
88                     l.insertBack(w);
89                 } else { // create a new list and add the new watcher
90                     WatcherList wl;
91                     wl.insertBack(w);
92                     _watchersMap[key] = wl;
93                     logInfo("---- new key in _watchersMap : ", wl[]);
94                 }
95 
96                 w.setRemoveFunc(key,&this.remove);
97         }
98 
99         return w;
100     }
101 
102     void add(Event e) {
103 	    _eventHistory.addEvent(e);
104     }
105 
106     // notify function accepts an event and notify to the watchers.
107     void notify(Event ne) {
108         auto e = _eventHistory.addEvent(ne); // add event into the eventHistory
109 
110         auto segments = split(e.nodeKey(), "/");
111         logInfo("---- segments : ", segments);
112 
113         // walk through all the segments of the path and notify the watchers
114         // if the path is "/foo/bar", it will notify watchers with path "/",
115         // "/foo" and "/foo/bar"
116         for(int i = 0; i < segments.length; i++) {
117             string path ;
118             for(int j =0; j <= i ;j++)
119             {   
120                 path ~= "/" ;
121                 path ~= segments[j];
122                 // notify the watchers who interests in the changes of current path
123             }
124 			if(path.length > 1)
125 				notifyWatchers(e, path[1..$], false);
126             else
127                 notifyWatchers(e, path, false);
128         }
129     }
130 
131     void notifyWatchers(Event e, string nodePath , bool deleted) {
132        //logInfo("---- notifyWatchers : ", nodePath);
133         synchronized(_mutex)
134         {
135             if ((nodePath in _watchersMap) !is null) {
136                // logInfo("---- in map--- : ",nodePath);
137                 auto wl = _watchersMap[nodePath];
138 
139                 auto range = wl[]; 
140                 bool originalPath = (e.nodeKey() == nodePath);
141                 for ( ; !range.empty; range.popFront()) 
142                 { 
143                     auto w = range.front; 
144                     // logInfo("---- have watcher for--- : ",nodePath, " is originalPath :",originalPath);
145                     if(/*originalPath && */ w.notify(e, originalPath, deleted))
146                     {
147                         if(!w.stream)
148                         {
149                             w.setRemoved(true);
150                             wl.stableLinearRemove(take(range, 1)); 
151                         }
152                     }
153                 }
154 
155                 if (wl[].empty) {
156                     // if we have notified all watcher in the list
157                     // we can delete the list
158                     _watchersMap.remove(nodePath);
159                 }   
160             }
161         }
162             
163     }
164 }
165