1 module neton.network.NodeClient; 2 3 import neton.network.Interface; 4 import neton.server.NetonConfig; 5 import neton.server.PeerServers; 6 7 import hunt.Functions; 8 import hunt.logging; 9 import hunt.raft; 10 import hunt.net; 11 import hunt.util.Serialize; 12 13 import core.stdc.string; 14 import core.thread; 15 16 import std.conv; 17 import std.format; 18 import std.bitmanip; 19 import std.stdint; 20 21 alias AsyncConnectHandler = NetEventHandler!(AsyncResult!Connection); 22 23 class NodeClient : MessageTransfer 24 { 25 private AsyncConnectHandler _connectionHandler; 26 private Connection _conn; 27 28 /// 29 this(ulong srcID, ulong dstID) 30 { 31 this.srcID = srcID; 32 this.dstID = dstID; 33 client = NetUtil.createNetClient(); 34 35 client.setHandler(new class NetConnectionHandler { 36 37 override void connectionOpened(Connection connection) { 38 infof("Connection created: %s", connection.getRemoteAddress()); 39 _conn = connection; 40 if(_connectionHandler !is null) { 41 _connectionHandler(succeededResult(connection)); 42 } 43 } 44 45 override void connectionClosed(Connection connection) { 46 infof("Connection closed: %s", connection.getRemoteAddress()); 47 48 auto conf = NetonConfig.instance().getConf(dstID); 49 PeerServers.instance().addPeer(dstID, conf.ip ~ ":" ~ to!string(conf.nodeport) ); 50 } 51 52 override void messageReceived(Connection connection, Object message) { 53 tracef("message type: %s", typeid(message).name); 54 string str = format("data received: %s", message.toString()); 55 tracef(str); 56 } 57 58 override void exceptionCaught(Connection connection, Throwable t) { 59 warning(t); 60 } 61 62 override void failedOpeningConnection(int connectionId, Throwable t) { 63 warning(t); 64 client.close(); 65 66 if(_connectionHandler !is null) { 67 _connectionHandler(failedResult!(Connection)(t)); 68 } 69 } 70 71 override void failedAcceptingConnection(int connectionId, Throwable t) { 72 warning(t); 73 } 74 }); 75 76 // auto conf = new hunt.net.Config.Config(); 77 // CallBackHandler cbHandler = new CallBackHandler(dstID); 78 // conf.setHandler(cbHandler); 79 // client.setConfig(conf); 80 } 81 82 /// 83 void connect(string host, int port, AsyncConnectHandler handler = null) 84 { 85 _connectionHandler = handler; 86 client.connect(host, port); 87 } 88 89 /// 90 void write(Message msg) 91 { 92 if (_conn is null) 93 { 94 logWarning(srcID, " not connect now. ", dstID); 95 return; 96 } 97 98 // logDebug(srcID , " sendto " , dstID , " " , msg); 99 ubyte[] data = cast(ubyte[]) serialize(msg); 100 int len = cast(int) data.length; 101 ubyte[4] head = nativeToBigEndian(len); 102 103 _conn.write(head ~ data); 104 } 105 106 void close() 107 { 108 // sock.close(); 109 client.close(); 110 } 111 112 private: 113 ulong srcID; 114 ulong dstID; 115 NetClient client; 116 // conn sock = null; 117 } 118 119 120 // class CallBackHandler : NetConnectionHandler 121 // { 122 // private 123 // { 124 // ulong _peerId; 125 // } 126 127 // this(ulong peerid) 128 // { 129 // _peerId = peerid; 130 // } 131 132 // override void connectionOpened(Connection connection) 133 // { 134 // logInfo("open node client -----"); 135 // } 136 137 // override void connectionClosed(Connection connection) 138 // { 139 // logInfo("close node client -----"); 140 // auto conf = NetonConfig.instance().getConf(_peerId); 141 // PeerServers.instance().addPeer(_peerId,conf.ip ~ ":" ~ to!string(conf.nodeport) ); 142 // } 143 144 // override void messageReceived(Connection connection, Object message){} 145 146 // override void exceptionCaught(Connection connection, Throwable t) {} 147 148 // override void failedOpeningConnection(int connectionId, Throwable t) 149 // { 150 // } 151 152 // override void failedAcceptingConnection(int connectionId, Throwable t) 153 // { 154 // } 155 // }