1 module neton.network.NodeClient;
2 
3 import neton.network.Interface;
4 import neton.server.NetonConfig;
5 import neton.server.PeerServers;
6 
7 import hunt.Functions;
8 import hunt.logging;
9 import hunt.raft;
10 import hunt.net;
11 import hunt.util.Serialize;
12 
13 import core.stdc.string;
14 import core.thread;
15 
16 import std.conv;
17 import std.format;
18 import std.bitmanip;
19 import std.stdint;
20 
21 alias AsyncConnectHandler = NetEventHandler!(AsyncResult!Connection);
22 
23 class NodeClient : MessageTransfer
24 {
25     private AsyncConnectHandler _connectionHandler;
26     private Connection _conn;
27 
28     ///
29     this(ulong srcID, ulong dstID)
30     {
31         this.srcID = srcID;
32         this.dstID = dstID;
33         client = NetUtil.createNetClient();
34 
35         client.setHandler(new class NetConnectionHandler {
36 
37             override void connectionOpened(Connection connection) {
38                 infof("Connection created: %s", connection.getRemoteAddress());
39                 _conn = connection;
40                 if(_connectionHandler !is null) {
41                     _connectionHandler(succeededResult(connection));
42                 }
43             }
44 
45             override void connectionClosed(Connection connection) {
46                 infof("Connection closed: %s", connection.getRemoteAddress());
47 
48                 auto conf = NetonConfig.instance().getConf(dstID);
49                 PeerServers.instance().addPeer(dstID, conf.ip ~ ":" ~ to!string(conf.nodeport) );
50             }
51 
52             override void messageReceived(Connection connection, Object message) {
53                 tracef("message type: %s", typeid(message).name);
54                 string str = format("data received: %s", message.toString());
55                 tracef(str);
56             }
57 
58             override void exceptionCaught(Connection connection, Throwable t) {
59                 warning(t);
60             }
61 
62             override void failedOpeningConnection(int connectionId, Throwable t) {
63                 warning(t);
64                 client.close();
65                 
66                 if(_connectionHandler !is null) {
67                     _connectionHandler(failedResult!(Connection)(t));
68                 } 
69             }
70 
71             override void failedAcceptingConnection(int connectionId, Throwable t) {
72                 warning(t);
73             }
74         });
75 
76         //  auto conf = new hunt.net.Config.Config();
77         // CallBackHandler cbHandler = new CallBackHandler(dstID);
78         // conf.setHandler(cbHandler);
79         // client.setConfig(conf);
80     }
81 
82     ///
83     void connect(string host, int port, AsyncConnectHandler handler = null)
84     {
85         _connectionHandler = handler;
86         client.connect(host, port);
87     }
88 
89     ///
90     void write(Message msg)
91     {
92         if (_conn is null)
93         {
94             logWarning(srcID, " not connect now. ", dstID);
95             return;
96         }
97 
98         // logDebug(srcID , " sendto " , dstID , " "  , msg);
99         ubyte[] data = cast(ubyte[]) serialize(msg);
100         int len = cast(int) data.length;
101         ubyte[4] head = nativeToBigEndian(len);
102 
103         _conn.write(head ~ data);
104     }
105 
106     void close()
107     {
108         // sock.close();
109         client.close();
110     }
111 
112 private:
113     ulong srcID;
114     ulong dstID;
115     NetClient client;
116     // conn sock = null;
117 }
118 
119 
120 // class CallBackHandler : NetConnectionHandler
121 // {
122 //     private
123 //     {
124 //         ulong _peerId;
125 //     }
126 
127 //     this(ulong peerid)
128 //     {
129 //         _peerId = peerid;
130 //     }
131     
132 //     override void connectionOpened(Connection connection)
133 //     {
134 //         logInfo("open node client -----");
135 //     }
136 
137 //     override void connectionClosed(Connection connection)
138 //     {
139 //         logInfo("close node client -----");
140 //         auto conf = NetonConfig.instance().getConf(_peerId);
141 //         PeerServers.instance().addPeer(_peerId,conf.ip ~ ":" ~ to!string(conf.nodeport) );
142 //     }
143 
144 //     override void messageReceived(Connection connection, Object message){}
145 
146 //     override void exceptionCaught(Connection connection, Throwable t) {}
147 
148 //     override void failedOpeningConnection(int connectionId, Throwable t)
149 //     {
150 //     }
151 
152 //     override void failedAcceptingConnection(int connectionId, Throwable t)
153 //     {
154 //     }
155 // }