1 module neton.server.Health;
2 
3 import std.json;
4 import std.net.curl;
5 import std.stdio;
6 import core.time;
7 import std.parallelism;
8 import neton.store.Event;
9 
10 // import zhang2018.dreactor.time.Timer;
11 import hunt.event.timer;
12 import hunt.logging;
13 import neton.server.NetonHttpServer;
14 import neton.network.Http;
15 
16 class Health
17 {
18     this(string key, JSONValue value)
19     {
20         _key = key;
21         _value = value;
22         parseValue();
23     }
24 
25     void parseValue()
26     {
27         try
28         {
29             if (JSONType.object == _value.type)
30             {
31                 if ("check" in _value)
32                 {
33                     auto checkObj = _value["check"];
34                     if (JSONType.object == checkObj.type)
35                     {
36                         if ("interval" in checkObj)
37                         {
38                             _interval = checkObj["interval"].integer;
39                         }
40 
41                         if ("timeout" in checkObj)
42                         {
43                             _timeout = checkObj["timeout"].integer;
44                         }
45 
46                         if ("http" in checkObj)
47                         {
48                             _http_url = checkObj["http"].str;
49                         }
50                     }
51                 }
52 
53                 if ("status" in _value)
54                 {
55                     _sState = cast(ServiceState)(_value["status"].str);
56                 }
57             }
58         }
59         catch (Exception e)
60         {
61 
62         }
63     }
64 
65     void onTimer(AbstractTimer fd)
66     {
67         _timer = fd;
68         //logWarning(_key,"  -- do health check.");
69         if (_http_url.length > 0)
70         {
71             //logWarning(_key,"  -- do health check. url : ",_http_url," timeout : ",_timeout);
72             taskPool.put(task!(makeCheck, Health)(this));
73         }
74     }
75 
76     ulong interval_ms()
77     {
78         return _interval * 1000;
79     }
80 
81     @property AbstractTimer timerFd()
82     {
83         return _timer;
84     }
85 
86     @property string http_url()
87     {
88         return _http_url;
89     }
90 
91     @property long timeout()
92     {
93         return _timeout;
94     }
95 
96     @property string key()
97     {
98         return _key;
99     }
100 
101     @property ServiceState state()
102     {
103         return _sState;
104     }
105 
106     @property void set_state(ServiceState st)
107     {
108         _sState = st;
109     }
110 
111     @property JSONValue value()
112     {
113         return _value;
114     }
115 
116 private:
117     string _key;
118     JSONValue _value;
119     ulong _interval = 10;
120     long _timeout = 10;
121     string _http_url;
122 
123     ServiceState _sState;
124     AbstractTimer _timer;
125 }
126 
127 void makeCheck(Health h)
128 {
129     //logWarning(h.key,"  -- do health check. 2 url : ",h.http_url);
130     try
131     {
132         auto http = HTTP(h.http_url);
133         http.verifyHost = false;
134         http.verifyPeer = false;
135         http.operationTimeout = dur!"seconds"(h.timeout);
136         http.onReceive = (ubyte[] data) { return data.length; };
137         http.onReceiveStatusLine = (HTTP.StatusLine sl) {
138             //logWarning(h.key,"  -- do health check. response code : ",sl.code);
139             if (sl.code != 200)
140                 taskPool.put(task!(updateServiceState, Health,
141                         ServiceState)(h, ServiceState.Critical));
142             else
143                 taskPool.put(task!(updateServiceState, Health,
144                         ServiceState)(h, ServiceState.Passing));
145 
146         };
147         auto code = http.perform();
148         //logWarning(h.key,"  -- do health check. perform code : ",code);
149     }
150     catch (Exception e)
151     {
152         logWarning(h.key, "  -- do health check. exception  : ", e.msg);
153         taskPool.put(task!(updateServiceState, Health, ServiceState)(h, ServiceState.Critical));
154     }
155 }
156 
157 void updateServiceState(Health h, ServiceState state)
158 {
159     try
160     {
161         //logWarning(h.key,"  -- update service state : ",state);
162         if (h.state == state)
163             return;
164         h.set_state(state);
165         auto val = h.value();
166         val["status"] = state;
167 
168         RequestCommand command = {
169         Method:
170             RequestMethod.METHOD_UPDATESERVICE, Key : h.key, Hash : h.toHash(),
171         Params : val.toString};
172             NetonHttpServer.instance().Propose(command);
173         }
174         catch (Exception e)
175         {
176             logWarning(h.key, "  -- update service state exception  : ", e.msg);
177         }
178     }