1 module neton.store.RocksdbStore;
2 
3 import core.stdc.stdio;
4 import std.string;
5 import std.stdio;
6 import std.json;
7 import std.experimental.allocator;
8 import std.file;
9 import std.conv : to;
10 import std.algorithm.searching;
11 import std.algorithm.mutation;
12 
13 import hunt.logging;
14 import hunt.util.DateTime;
15 import hunt.util.Serialize;
16 
17 import rocksdb.database;
18 import rocksdb.options;
19 
20 import neton.store.Util;
21 import neton.lease;
22 import neton.protocol.neton;
23 import neton.protocol.neton;
24 import neton.rpcservice.Command;
25 
26 /*
27  * service register/deregister store format
28  *
29  * key(/redis/redis1) : {
30 	 "dir" : "false",
31 	 "value" : "{
32 		 "service" : {
33 		 "id" : "redis1",
34 		 "name" : "redis",
35 		 "address" : "127.0.0.1",
36 		 "port" : 8000
37 	 	},
38 		"check" : {
39 			"http" : "http://127.0.0.1:8001",
40 			"interval" : 10,
41 			"timeout" : 30
42 		},
43 		"status" : "passing"
44 	 }"	 
45  }
46  */
47 
48 /* 
49  * k/v store format
50  *
51  * key : {
52 	 	"dir" : "false",
53 		"value" : "some ..",
54 		"leaseID": long.init,
55 		"create_time" : 2333334422 ///unix seconds timestamp
56  }
57   key : {
58 	  "dir" : "true",
59 	  "children" : "/child1;/child2"
60   }
61  */
62 
63 /***
64    * ****************  Lease  **************
65    * key : /lease/{leaseID}
66    * value : {
67 	   			"dir" : "false",
68    *			"id"  : {leaseID},
69    *			"ttl" : 12344, ///time to live
70    *			"create_time" : 13444244, ///unix seconds timestamp
71    *	        "items":[ 
72    *	   					{
73    *	   						"key" : "attach key",
74    *							"time" : "attach time"
75    *						}
76    *					]
77    *         }
78    ***/
79 
80 /***    Gen Lease ID
81 	key : LEASE_GEN_ID_PREFIX
82 	value : {
83 		  		"ID" : 1234
84 			}
85  ****/
86 
87 class RocksdbStore
88 {
89 
90 	private
91 	{
92 		Database _rocksdb;
93 		Lessor _lessor;
94 	}
95 
96 	this(ulong ID, Lessor l)
97 	{
98 		auto opts = new DBOptions;
99 		opts.createIfMissing = true;
100 		opts.errorIfExists = false;
101 
102 		_rocksdb = new Database(opts, "rocks.db" ~ to!string(ID));
103 		_lessor = l;
104 		init(l);
105 	}
106 
107 	void init(Lessor lessor = null)
108 	{
109 		auto j = getJsonValue("/");
110 		if (j.type == JSONType.null_)
111 		{
112 			JSONValue dirvalue;
113 			dirvalue["dir"] = "true";
114 			dirvalue["children"] = "";
115 
116 			SetValue("/", dirvalue.toString);
117 		}
118 
119 		///init Lease
120 		if (lessor !is null)
121 		{
122 			auto leases = getJsonValue(LEASE_PREFIX[0 .. $ - 1]);
123 			if (leases.type != JSONType.null_)
124 			{
125 				auto lease_keys = leases["children"].str;
126 				if (lease_keys.length > 0)
127 				{
128 					auto leaseIDs = split(lease_keys, ";");
129 					foreach (leaseID; leaseIDs)
130 					{
131 						if (leaseID.length != 0)
132 						{
133 							auto leaseItem = getJsonValue(leaseID);
134 							if (leaseItem.type != JSONType.null_)
135 							{
136 								Lease l = new Lease();
137 								l.ID = leaseItem["id"].integer;
138 								l.ttl = leaseItem["ttl"].integer;
139 								l.expiry = l.ttl + leaseItem["create_time"].integer;
140 								foreach (item; leaseItem["items"].array)
141 								{
142 									l.itemSet.add(item["key"].str);
143 								}
144 								lessor.init(l);
145 							}
146 						}
147 					}
148 				}
149 			}
150 		}
151 
152 	}
153 
154 	bool isDir(string key)
155 	{
156 		key = getSafeKey(key);
157 		auto value = getJsonValue(key);
158 		if (value.type != JSONType.null_)
159 		{
160 			if ("dir" in value)
161 			{
162 				if (value["dir"].str == "true")
163 					return true;
164 			}
165 		}
166 		return false;
167 	}
168 
169 	bool Exsit(string key)
170 	{
171 		return getJsonValue(key).type != JSONType.null_;
172 	}
173 
174 	//递归创建目录
175 	void createDirWithRecur(string dir)
176 	{
177 		logInfo("---create dir : ", dir);
178 		auto parent = getParent(dir);
179 		if (parent != string.init)
180 		{
181 			if (!Exsit(parent))
182 			{
183 				createDirWithRecur(parent);
184 			}
185 			CreateAndAppendSubdir(parent, dir);
186 		}
187 		else
188 		{
189 			CreateAndAppendSubdir(dir, "");
190 		}
191 	}
192 
193 	//创建子目录 并添加key到父目录,父目录不存在则创建
194 	void CreateAndAppendSubdir(string parent, string child)
195 	{
196 		if (child.length > 0)
197 		{
198 			JSONValue subdir;
199 			subdir["dir"] = "true";
200 			subdir["children"] = "";
201 			SetValue(child, subdir.toString);
202 		}
203 
204 		auto j = getJsonValue(parent);
205 		if (j.type != JSONType.null_)
206 		{
207 			auto childs = j["children"].str;
208 			auto segments = split(childs, ";");
209 			if (find(segments, child).empty)
210 			{
211 				childs ~= ";";
212 				childs ~= child;
213 				j["children"] = childs;
214 
215 				SetValue(parent, j.toString);
216 			}
217 		}
218 		else
219 		{
220 			JSONValue dirvalue;
221 			dirvalue["dir"] = "true";
222 			dirvalue["children"] = child;
223 
224 			SetValue(parent, dirvalue.toString);
225 		}
226 	}
227 
228 	string Lookup(string key)
229 	{
230 		if (key.length == 0)
231 			return string.init;
232 		if (_rocksdb is null)
233 			return string.init;
234 
235 		return _rocksdb.getString(key);
236 	}
237 
238 	void Remove(string key, bool recursive = false)
239 	{
240 		key = getSafeKey(key);
241 		if (!recursive)
242 			_rocksdb.removeString(key);
243 		else
244 		{
245 			auto j = getJsonValue(key);
246 			if (j.type == JSONType.object && j["dir"].str == "true")
247 			{
248 				auto children = j["children"].str();
249 				auto segments = split(children, ";");
250 				foreach (subkey; segments)
251 				{
252 					if (subkey.length > 0)
253 						_rocksdb.removeString(subkey);
254 				}
255 			}
256 
257 			_rocksdb.removeString(key);
258 		}
259 		removekeyFromParent(key);
260 	}
261 
262 	JSONValue getJsonValue(string key)
263 	{
264 		auto jsonvalue = Lookup(key);
265 		// logInfo("---getJsonValue key : ",key,"  value : ",jsonvalue);
266 		JSONValue jvalue;
267 		if (jsonvalue.length == 0)
268 			return jvalue;
269 		try
270 		{
271 			//logInfo("parse json value : ",jsonvalue);
272 			jvalue = parseJSON(jsonvalue);
273 		}
274 		catch (Exception e)
275 		{
276 			logError("catch  error : %s", e.msg);
277 		}
278 
279 		return jvalue;
280 	}
281 
282 	string getStringValue(string key)
283 	{
284 		return Lookup(key);
285 	}
286 
287 	bool set(string originKey, string value, out string error, long leaseid = long.init)
288 	{
289 		//不能是目录
290 		auto nodePath = getSafeKey(originKey);
291 		auto node = getJsonValue(nodePath);
292 		if (node.type == JSONType.object && "dir" in node)
293 		{
294 			if (node["dir"].str == "true")
295 			{
296 				error ~= nodePath;
297 				error ~= "  is dir";
298 				logError("set error : ", error, "  info: ", node);
299 				return false;
300 			}
301 		}
302 		//父级目录要么不存在  要么必须是目录
303 		auto p = getParent(nodePath);
304 		if (p == string.init)
305 		{
306 			error = "the key is illegal";
307 			logError("set error : ", error, "  info: ", p);
308 			return false;
309 		}
310 		auto j = getJsonValue(p);
311 		if (j.type != JSONType.null_ && j["dir"].str != "true")
312 		{
313 			error ~= p;
314 			error ~= " not is dir";
315 			logError("set error : ", error, "  info: ", p);
316 			return false;
317 		}
318 
319 		setFileKeyValue(originKey, value, leaseid);
320 		return true;
321 	}
322 
323 	bool createDir(string path, out string error)
324 	{
325 		path = getSafeKey(path);
326 		//目录或文件存在
327 		auto node = getJsonValue(path);
328 		if (node.type != JSONType.null_)
329 		{
330 			error ~= path;
331 			error ~= " is exist";
332 			return false;
333 		}
334 
335 		//父级目录要么不存在  要么必须是目录
336 		auto p = getParent(path);
337 		if (p == string.init)
338 		{
339 			error = "the key is illegal";
340 			return false;
341 		}
342 		auto j = getJsonValue(p);
343 		if (j.type != JSONType.null_ && j["dir"].str != "true")
344 		{
345 			error ~= p;
346 			error ~= " not is dir";
347 			return false;
348 		}
349 
350 		createDirWithRecur(path);
351 		return true;
352 	}
353 
354 	Lease grantLease(long leaseid, long ttl)
355 	{
356 		if (_lessor !is null)
357 		{
358 			auto l = _lessor.Grant(leaseid, ttl);
359 			if (l is null)
360 				return null;
361 
362 			auto lease = getJsonValue(LEASE_PREFIX ~ leaseid.to!string);
363 			if (lease.type == JSONType.null_)
364 			{
365 				JSONValue newLease;
366 				newLease["dir"] = "false";
367 				newLease["ttl"] = l.ttl;
368 				newLease["id"] = leaseid;
369 				JSONValue[] items;
370 				newLease["items"] = items;
371 				newLease["create_time"] = time();
372 				setLeaseKeyValue(LEASE_PREFIX ~ leaseid.to!string, newLease.toString);
373 
374 				///update global leaseID
375 				JSONValue leaseID;
376 				leaseID["ID"] = leaseid;
377 				SetValue(LEASE_GEN_ID_PREFIX, leaseID.toString);
378 				return l;
379 			}
380 		}
381 		return null;
382 	}
383 
384 	bool revokeLease(long leaseid)
385 	{
386 		if (_lessor.Revoke(leaseid) is null)
387 			Remove(LEASE_PREFIX ~ leaseid.to!string);
388 		else
389 			return false;
390 		return true;
391 	}
392 
393 	bool attachToLease(string key, long leaseid)
394 	{
395 		auto lease = getJsonValue(LEASE_PREFIX ~ leaseid.to!string);
396 		if (lease.type != JSONType.null_)
397 		{
398 			if (_lessor !is null)
399 			{
400 				LeaseItem item = {Key:
401 				key};
402 				_lessor.Attach(leaseid, [item]);
403 			}
404 
405 			if (!canFind!((JSONValue b, string a) => b["key"].str == a)(lease["items"].array, key))
406 			{
407 				JSONValue item;
408 				item["key"] = key;
409 				item["time"] = time();
410 				lease["items"].array ~= item;
411 				SetValue(LEASE_PREFIX ~ leaseid.to!string, lease.toString);
412 			}
413 			return true;
414 		}
415 		return false;
416 	}
417 
418 	bool detachFromLease(string key, long leaseid)
419 	{
420 		auto lease = getJsonValue(LEASE_PREFIX ~ leaseid.to!string);
421 		if (lease.type != JSONType.null_)
422 		{
423 			if (_lessor !is null)
424 			{
425 				LeaseItem item = {Key:
426 				key};
427 				_lessor.Detach(leaseid, [item]);
428 			}
429 
430 			JSONValue[] oldItems = lease["items"].array;
431 			JSONValue[] newItems;
432 			foreach (JSONValue item; oldItems)
433 			{
434 				if (item["key"].str != key)
435 				{
436 					newItems ~= item;
437 				}
438 			}
439 			lease["items"] = newItems;
440 			SetValue(LEASE_PREFIX ~ leaseid.to!string, lease.toString);
441 			return true;
442 		}
443 		return false;
444 	}
445 
446 	bool foreverKey(string key)
447 	{
448 		auto value = getJsonValue(key);
449 		if (value.type != JSONType.null_)
450 		{
451 			value["leaseID"] = long.init;
452 			SetValue(key, value.toString);
453 			return true;
454 		}
455 		return false;
456 	}
457 
458 	long generateLeaseID()
459 	{
460 		auto value = getJsonValue(LEASE_GEN_ID_PREFIX);
461 		if (value.type != JSONType.null_)
462 		{
463 			return value["ID"].integer + 1;
464 		}
465 		else
466 			return 1;
467 	}
468 
469 	LeaseTimeToLiveResponse leaseTimeToLive(long leaseid)
470 	{
471 		auto leaseItem = getJsonValue(LEASE_PREFIX ~ leaseid.to!string);
472 		LeaseTimeToLiveResponse respon = new LeaseTimeToLiveResponse();
473 		if (leaseItem.type != JSONType.null_)
474 		{
475 			respon.ID = leaseid;
476 			auto remainTTL = (leaseItem["create_time"].integer + leaseItem["ttl"].integer - time());
477 			respon.TTL = remainTTL > 0 ? remainTTL : 0;
478 			respon.grantedTTL = leaseItem["ttl"].integer;
479 			foreach (item; leaseItem["items"].array)
480 			{
481 				respon.keys ~= cast(ubyte[])(item["key"].str);
482 			}
483 		}
484 		else
485 		{
486 			respon.ID = leaseid;
487 			respon.TTL = -1;
488 			respon.grantedTTL = 0;
489 		}
490 		return respon;
491 	}
492 
493 	LeaseLeasesResponse leaseLeases()
494 	{
495 		auto leases = getJsonValue(LEASE_PREFIX[0 .. $ - 1]);
496 		if (leases.type != JSONType.null_)
497 		{
498 			auto lease_keys = leases["children"].str;
499 			LeaseLeasesResponse response = new LeaseLeasesResponse();
500 			if (lease_keys.length > 0)
501 			{
502 				auto leaseIDs = split(lease_keys, ";");
503 				foreach (leaseID; leaseIDs)
504 				{
505 					if (leaseID.length != 0)
506 					{
507 						auto leaseItem = getJsonValue(leaseID);
508 						if (leaseItem.type != JSONType.null_)
509 						{
510 							LeaseStatus ls = new LeaseStatus();
511 							ls.ID = leaseItem["id"].integer;
512 							response.leases ~= ls;
513 						}
514 					}
515 				}
516 			}
517 			return response;
518 		}
519 		return null;
520 	}
521 
522 	LeaseKeepAliveResponse renewLease(RpcRequest req)
523 	{
524 		if (_lessor !is null)
525 		{
526 			auto newTTL = _lessor.Renew(req.LeaseID);
527 			if (newTTL > 0)
528 			{
529 				auto lease = getJsonValue(LEASE_PREFIX ~ req.LeaseID.to!string);
530 				if (lease.type != JSONType.null_)
531 				{
532 					lease["ttl"] = newTTL;
533 					lease["create_time"] = time();
534 
535 					setLeaseKeyValue(LEASE_PREFIX ~ req.LeaseID.to!string, lease.toString);
536 
537 					LeaseKeepAliveResponse respon = new LeaseKeepAliveResponse();
538 					respon.ID = req.LeaseID;
539 					respon.TTL = newTTL;
540 					return respon;
541 				}
542 			}
543 		}
544 		logWarning("-----renewLease fail --------");
545 		return null;
546 	}
547 
548 	PutResponse put(RpcRequest req)
549 	{
550 		string nodePath;
551 		if(req.CMD == RpcReqCommand.ConfigPutRequest)
552 		{
553 			nodePath = getConfigKey(req.Key);
554 		}
555 		else if(req.CMD == RpcReqCommand.RegistryPutRequest)
556 		{
557 			nodePath = getRegistryKey(req.Key);
558 		}
559 		else
560 		{
561 			nodePath = getSafeKey(req.Key);
562 		}
563 		// Set new value
564 		string error;
565 		if (req.LeaseID != 0)
566 		{
567 			if (attachToLease(nodePath, req.LeaseID))
568 			{
569 				auto ok = set(nodePath, req.Value, error, req.LeaseID);
570 				if (ok)
571 				{
572 					PutResponse respon = new PutResponse();
573 					auto kv = new KeyValue();
574 					kv.key = cast(ubyte[])(req.Key);
575 					kv.value = cast(ubyte[])(req.Value);
576 					respon.prevKv = kv;
577 					return respon;
578 				}
579 			}
580 		}
581 		else
582 		{
583 			auto ok = set(nodePath, req.Value, error);
584 			if (ok)
585 			{
586 				PutResponse respon = new PutResponse();
587 				auto kv = new KeyValue();
588 				kv.key = cast(ubyte[])(req.Key);
589 				kv.value = cast(ubyte[])(req.Value);
590 				respon.prevKv = kv;
591 				return respon;
592 			}
593 		}
594 		return null;
595 	}
596 
597 	DeleteRangeResponse deleteRange(RpcRequest req)
598 	{
599 		DeleteRangeResponse respon = new DeleteRangeResponse();
600 
601 		string nodePath;
602 		if(req.CMD == RpcReqCommand.ConfigDeleteRangeRequest)
603 		{
604 			nodePath = getConfigKey(req.Key);
605 		}
606 		else if(req.CMD == RpcReqCommand.RegistryDeleteRangeRequest)
607 		{
608 			nodePath = getRegistryKey(req.Key);
609 		}
610 		else
611 		{
612 			nodePath = getSafeKey(req.Key);
613 		}
614 
615 		if (Exsit(nodePath))
616 		{
617 			Remove(nodePath, true);
618 			respon.deleted = 1;
619 		}
620 		else
621 			respon.deleted = 0;
622 		return respon;
623 	}
624 
625 protected:
626 	void SetValue(string key, string value)
627 	{
628 		_rocksdb.putString(key, value);
629 	}
630 
631 	void setFileKeyValue(string originKey, string value, long leaseid = long.init)
632 	{
633 		auto key = getSafeKey(originKey);
634 
635 		auto p = getParent(key);
636 		if (p != string.init)
637 		{
638 			if (!Exsit(p))
639 				createDirWithRecur(p);
640 		}
641 		auto j = getJsonValue(p);
642 		auto children = j["children"].str();
643 		auto segments = split(children, ";");
644 		if (find(segments, key).empty)
645 		{
646 			children ~= ";";
647 			children ~= key;
648 			j["children"] = children;
649 			SetValue(p, j.toString);
650 		}
651 
652 		JSONValue filevalue;
653 		filevalue["dir"] = "false";
654 		filevalue["key"] = originKey;
655 		filevalue["value"] = value;
656 		filevalue["leaseID"] = leaseid;
657 		filevalue["create_time"] = time();
658 
659 		SetValue(key, filevalue.toString);
660 	}
661 
662 	void removekeyFromParent(string skey)
663 	{
664 		auto pkey = getParent(skey);
665 		if (pkey != string.init)
666 		{
667 			auto pnode = getJsonValue(pkey);
668 			if (pnode.type == JSONType.object && "children" in pnode)
669 			{
670 				auto children = pnode["children"].str;
671 				auto segments = split(children, ";");
672 				auto childs = remove!(a => a == skey)(segments);
673 				string newvalue;
674 				foreach (child; childs)
675 				{
676 					if (child != ";" && child.length > 0)
677 					{
678 						newvalue ~= child;
679 						newvalue ~= ";";
680 					}
681 				}
682 				pnode["children"] = newvalue;
683 				SetValue(pkey, pnode.toString);
684 			}
685 		}
686 	}
687 
688 	void setLeaseKeyValue(string key, string jsonValue)
689 	{
690 		auto p = getParent(key);
691 		if (p != string.init)
692 		{
693 			if (!Exsit(p))
694 				createDirWithRecur(p);
695 		}
696 		auto j = getJsonValue(p);
697 		auto children = j["children"].str();
698 		auto segments = split(children, ";");
699 		if (find(segments, key).empty)
700 		{
701 			children ~= ";";
702 			children ~= key;
703 			j["children"] = children;
704 			SetValue(p, j.toString);
705 		}
706 
707 		SetValue(key, jsonValue);
708 	}
709 
710 }