1 module neton.server.Health; 2 3 import std.json; 4 import std.net.curl; 5 import std.stdio; 6 import core.time; 7 import std.parallelism; 8 import neton.store.Event; 9 10 // import zhang2018.dreactor.time.Timer; 11 import hunt.event.timer; 12 import hunt.logging; 13 import neton.server.NetonHttpServer; 14 import neton.network.Http; 15 16 class Health 17 { 18 this(string key, JSONValue value) 19 { 20 _key = key; 21 _value = value; 22 parseValue(); 23 } 24 25 void parseValue() 26 { 27 try 28 { 29 if (JSONType.object == _value.type) 30 { 31 if ("check" in _value) 32 { 33 auto checkObj = _value["check"]; 34 if (JSONType.object == checkObj.type) 35 { 36 if ("interval" in checkObj) 37 { 38 _interval = checkObj["interval"].integer; 39 } 40 41 if ("timeout" in checkObj) 42 { 43 _timeout = checkObj["timeout"].integer; 44 } 45 46 if ("http" in checkObj) 47 { 48 _http_url = checkObj["http"].str; 49 } 50 } 51 } 52 53 if ("status" in _value) 54 { 55 _sState = cast(ServiceState)(_value["status"].str); 56 } 57 } 58 } 59 catch (Exception e) 60 { 61 62 } 63 } 64 65 void onTimer(AbstractTimer fd) 66 { 67 _timer = fd; 68 //logWarning(_key," -- do health check."); 69 if (_http_url.length > 0) 70 { 71 //logWarning(_key," -- do health check. url : ",_http_url," timeout : ",_timeout); 72 taskPool.put(task!(makeCheck, Health)(this)); 73 } 74 } 75 76 ulong interval_ms() 77 { 78 return _interval * 1000; 79 } 80 81 @property AbstractTimer timerFd() 82 { 83 return _timer; 84 } 85 86 @property string http_url() 87 { 88 return _http_url; 89 } 90 91 @property long timeout() 92 { 93 return _timeout; 94 } 95 96 @property string key() 97 { 98 return _key; 99 } 100 101 @property ServiceState state() 102 { 103 return _sState; 104 } 105 106 @property void set_state(ServiceState st) 107 { 108 _sState = st; 109 } 110 111 @property JSONValue value() 112 { 113 return _value; 114 } 115 116 private: 117 string _key; 118 JSONValue _value; 119 ulong _interval = 10; 120 long _timeout = 10; 121 string _http_url; 122 123 ServiceState _sState; 124 AbstractTimer _timer; 125 } 126 127 void makeCheck(Health h) 128 { 129 //logWarning(h.key," -- do health check. 2 url : ",h.http_url); 130 try 131 { 132 auto http = HTTP(h.http_url); 133 http.verifyHost = false; 134 http.verifyPeer = false; 135 http.operationTimeout = dur!"seconds"(h.timeout); 136 http.onReceive = (ubyte[] data) { return data.length; }; 137 http.onReceiveStatusLine = (HTTP.StatusLine sl) { 138 //logWarning(h.key," -- do health check. response code : ",sl.code); 139 if (sl.code != 200) 140 taskPool.put(task!(updateServiceState, Health, 141 ServiceState)(h, ServiceState.Critical)); 142 else 143 taskPool.put(task!(updateServiceState, Health, 144 ServiceState)(h, ServiceState.Passing)); 145 146 }; 147 auto code = http.perform(); 148 //logWarning(h.key," -- do health check. perform code : ",code); 149 } 150 catch (Exception e) 151 { 152 logWarning(h.key, " -- do health check. exception : ", e.msg); 153 taskPool.put(task!(updateServiceState, Health, ServiceState)(h, ServiceState.Critical)); 154 } 155 } 156 157 void updateServiceState(Health h, ServiceState state) 158 { 159 try 160 { 161 //logWarning(h.key," -- update service state : ",state); 162 if (h.state == state) 163 return; 164 h.set_state(state); 165 auto val = h.value(); 166 val["status"] = state; 167 168 RequestCommand command = { 169 Method: 170 RequestMethod.METHOD_UPDATESERVICE, Key : h.key, Hash : h.toHash(), 171 Params : val.toString}; 172 NetonHttpServer.instance().Propose(command); 173 } 174 catch (Exception e) 175 { 176 logWarning(h.key, " -- update service state exception : ", e.msg); 177 } 178 }