1 module neton.rpcservice.KVService; 2 3 import neton.protocol.neton; 4 import neton.protocol.neton; 5 import neton.protocol.netonrpc; 6 import grpc; 7 import hunt.logging; 8 import neton.server.NetonRpcServer; 9 import neton.util.Future; 10 import neton.rpcservice.Command; 11 import std.stdio; 12 13 class KVService : KVBase 14 { 15 16 override Status Range(RangeRequest req, ref RangeResponse response) 17 { 18 logDebug("--------------"); 19 if(req.key.length == 0) 20 { 21 logError("get key is null"); 22 return new Status(StatusCode.INVALID_ARGUMENT); 23 } 24 25 auto f = new Future!(RangeRequest, RangeResponse)(req); 26 27 RpcRequest rreq; 28 rreq.CMD = RpcReqCommand.RangeRequest; 29 rreq.Key = cast(string)(req.key); 30 rreq.Hash = f.toHash(); 31 NetonRpcServer.instance().ReadIndex(rreq, f); 32 logDebug("waiting ..... : ", rreq); 33 response = f.get(); 34 logDebug("waiting done..... : ",response); 35 if(response is null) 36 response = new RangeResponse(); 37 38 return Status.OK; 39 } 40 41 override Status Put(PutRequest req, ref PutResponse response) 42 { 43 logDebug("--------------"); 44 45 if(req.key.length == 0) 46 { 47 logError("put key is null"); 48 return new Status(StatusCode.INVALID_ARGUMENT); 49 } 50 51 auto f = new Future!(PutRequest, PutResponse)(req); 52 53 RpcRequest rreq; 54 rreq.CMD = RpcReqCommand.PutRequest; 55 rreq.Key = cast(string)(req.key); 56 rreq.Value = cast(string)(req.value); 57 rreq.LeaseID = req.lease; 58 rreq.Hash = f.toHash(); 59 NetonRpcServer.instance().Propose(rreq, f); 60 logDebug("waiting ..... : ", rreq); 61 response = f.get(); 62 logDebug("waiting done....."); 63 if(response !is null) 64 return Status.OK; 65 else 66 return new Status(StatusCode.INVALID_ARGUMENT); 67 } 68 69 override Status DeleteRange(DeleteRangeRequest req, ref DeleteRangeResponse response) 70 { 71 logDebug("--------------"); 72 73 if(req.key.length == 0) 74 { 75 logError("delete key is null"); 76 return new Status(StatusCode.INVALID_ARGUMENT); 77 } 78 79 auto f = new Future!(DeleteRangeRequest, DeleteRangeResponse)(req); 80 81 RpcRequest rreq; 82 rreq.CMD = RpcReqCommand.DeleteRangeRequest; 83 rreq.Key = cast(string)(req.key); 84 rreq.Hash = f.toHash(); 85 NetonRpcServer.instance().Propose(rreq, f); 86 logDebug("waiting ..... : ", rreq); 87 response = f.get(); 88 logDebug("waiting done....."); 89 90 if(response !is null) 91 return Status.OK; 92 else 93 return new Status(StatusCode.INVALID_ARGUMENT); 94 } 95 96 }