1 module neton.wal.Wal;
2 
3 import neton.wal.Record;
4 import neton.wal.Util;
5 import neton.wal.Decoder;
6 import neton.wal.Encoder;
7 import hunt.util.Serialize;	
8 import std.stdio;
9 import hunt.logging;
10 import std.file;
11 import std.experimental.allocator;
12 import hunt.raft;
13 import std.path;
14 
15 	// SegmentSizeBytes is the preallocated size of each wal segment file.
16 	// The actual size might be larger than this. In general, the default
17 	// value should be used, but this is defined as an exported variable
18 	// so that tests can set a different segment size.
19 const long  SegmentSizeBytes= 64 * 1000 * 1000 ;// 64MB
20 
21 	//plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "wal")
22 
23 	enum ErrMetadataConflict = "wal: conflicting metadata found";
24 	enum ErrFileNotFound     = "wal: file not found";
25 	enum ErrCRCMismatch      = "wal: crc mismatch";
26 	enum ErrSnapshotMismatch = "wal: snapshot mismatch";
27 	enum ErrSnapshotNotFound = "wal: snapshot not found";
28 	//crcTable            = crc32.MakeTable(crc32.Castagnoli)
29 
30 
31 // WAL is a logical representation of the stable storage.
32 // WAL is either in read mode or append mode but not both.
33 // A newly created WAL is in append mode, and ready for appending records.
34 // A just opened WAL is in read mode, and ready for reading records.
35 // The WAL will be ready for appending after reading out all the previous records.
36 class WAL  {
37 	 string  _dir;// the living directory of the underlay files
38 
39 	// dirFile is a fd for the wal directory for syncing on Rename
40 	//dirFile *os.File
41 
42 	byte[]  _metadata;          // metadata recorded at the head of each WAL
43 	HardState  _hdstate  ; // hardstate recorded at the head of WAL
44 
45 	WalSnapshot  _start;     // snapshot to start reading
46 	Decoder   _decoder;       // decoder to decode records
47 
48 	ulong  _enti; // index of the last entry saved to the wal
49 	Encoder _encoder; // encoder to encode records
50 
51 
52 	File[] _fw; //(the name is increasing)
53 	File[] _fr; //(the name is increasing)
54 
55     //  creates a WAL ready for appending records. The given metadata is
56     // recorded at the head of each WAL file, and can be retrieved with ReadAll.
57 		this(string dirpath,  byte[] metadata) {
58 			if (!isEmptyDir(dirpath)) {
59 				logError("wal dir not is empty...");
60 				return;
61 			}
62 
63 			// keep temporary wal directory so WAL initialization appears atomic
64 			auto tmpdirpath = dirpath ~ ".tmp";
65 			if (Exist(tmpdirpath)) {
66 				rmdir(tmpdirpath);
67 			}
68 			mkdir(tmpdirpath);
69 
70 			auto filepath = tmpdirpath ~ "/"~walName(0,0);
71 			auto fw = File(filepath,"wb+");
72 			if (!fw.isOpen()) {
73 				return;
74 			}
75 			_fw ~= fw;
76 			_dir = dirpath;
77 			_metadata = metadata;
78 
79 			_encoder = theAllocator.make!Encoder(_fw[_fw.length-1]);
80 			
81 			if (_encoder is null) {
82 				logError("theAllocator make encoder object error...");
83 				return;
84 			}
85 
86 			Record rec = {type:WalType.metadataType,data:cast(string)metadata};
87 			_encoder.encode(rec);
88 			
89 			WalSnapshot wsp;
90 			SaveSnapshot(wsp);
91 			
92 			
93 			renameDir(tmpdirpath,_dir);
94 	}
95 
96 		this(string dirpath, WalSnapshot snap, bool write) {
97 			auto names =  readWalNames(dirpath);
98 			
99 			long idx = searchIndex(names, snap.index);
100 			if (idx < 0) {
101 				logError("search index ",snap.index);
102 				return ;
103 			}
104 
105 			// open the wal files
106 
107 			for(auto i = idx ; i< names.length;i++)
108 			{
109 				auto filepath = dirpath ~ "/" ~ names[idx];
110 				if(write)
111 				{
112 					auto fw = File(filepath,"ab+");
113 					if(fw.isOpen())
114 					{
115 						_fr ~= fw;
116 						_fw ~=fw;
117 					}
118 				}
119 				else
120 				{
121 					auto fr = File(filepath,"rb");
122 					if(fr.isOpen)
123 					{
124 						_fr ~= fr;
125 					}
126 				}
127 			}
128 			
129 			_dir = dirpath;
130 			_start = snap;
131 			_decoder = new Decoder(_fr);			
132 	}
133 
134 
135 	// ReadAll reads out records of the current WAL.
136 	// If opened in write mode, it must read out all records until EOF. Or an error
137 	// will be returned.
138 	// If opened in read mode, it will try to read all records if possible.
139 	// If it cannot read out the expected snap, it will return ErrSnapshotNotFound.
140 	// If loaded snap doesn't match with the expected one, it will return
141 	// all the records and error ErrSnapshotMismatch.
142 	// TODO: detect not-last-snap error.
143 	// TODO: maybe loose the checking of match.
144 	// After ReadAll, the WAL will be ready for appending new records.
145 	void ReadAll(out byte[] metadata, out HardState state, out Entry[] ents) {
146 		
147 		Record rec;
148 
149 		bool match ;
150 		while (_decoder.decode(rec) != -1)
151 		{
152 			//logInfo("decoder --- --- ");	
153 			switch(rec.type) 
154 			{
155 			case WalType.entryType:
156 				Entry e = unserialize!Entry(cast(byte[])rec.data);
157 				if (e.Index > _start.index) {
158 					ents ~= e;
159 				}
160 				_enti = e.Index;
161 				break;
162 			case WalType.stateType:
163 				state = unserialize!HardState(cast(byte[])rec.data);
164 				break;
165 			case WalType.metadataType:
166 				metadata = cast(byte[])rec.data;
167 				break;
168 			case WalType.crcType:
169 				
170 				break;
171 			case WalType.snapshotType:
172 				WalSnapshot snap = unserialize!WalSnapshot(cast(byte[])rec.data);
173 				if (snap.index == _start.index) {
174 					if (snap.term != _start.term ){
175 						HardState st;
176 						state = st;
177 						logError("ErrSnapshotMismatch ");
178 						return ;
179 					}
180 					match = true;
181 				}
182 				break;
183 			default:
184 				HardState st;
185 				state = st;
186 				logError("unexpected block type ", rec.type);
187 				return;
188 			}
189 		}
190 
191 		if (!match) {
192 			logError("ErrSnapshotNotFound ");
193 		}
194 
195 		// close decoder, disable reading
196 		foreach(fp; _fr)
197 			fp.close();
198 		_fr.length = 0;
199 
200 		WalSnapshot wsp;
201 		_start = wsp;
202 
203 		_metadata = metadata;
204 
205 		if(_fw.length > 0)
206 		{
207 			logInfo("make encoder ..before ");
208 			_encoder = theAllocator.make!Encoder(_fw[_fw.length-1]);
209 			logInfo("make encoder ..after ");
210 		}
211 		_decoder.Close();
212 		_decoder = null;
213 	}
214 
215 	void saveEntry(Entry e) {
216 		// TODO: add MustMarshalTo to reduce one allocation.
217 		//logInfo("--------------debug 2-----");
218 		auto b = serialize(e);
219 		Record rec = {type: WalType.entryType, data:cast(string)b};
220 		if(_encoder !is null)
221 		{
222 			_encoder.encode(rec);
223 		}
224 		_enti = e.Index;
225 		return;
226 	}
227 
228 	void saveState(HardState s) {
229 		//logInfo("--------------debug 1-----");
230 		if (IsEmptyHardState(s)) {
231 			return ;
232 		}
233 		_hdstate = s;
234 		auto b = serialize(s);
235 		Record rec = {type: WalType.stateType, data: cast(string)b};
236 		if(_encoder !is null)
237 		{
238 			_encoder.encode(rec);
239 		}
240 	}
241 
242 
243 	void Save(HardState st,Entry[] ents){
244 
245 		// short cut, do not call sync
246 		if (IsEmptyHardState(st) && ents.length == 0 ){
247 			return ;
248 		}
249 		//logInfo("----- save hard state : ",st, " entry  : ",ents);
250 
251 		bool mustSync = Ready.MustSync(st, _hdstate, cast(int)ents.length);
252 
253 		// TODO(xiangli): no more reference operator
254 		foreach(e ; ents)  {
255 			saveEntry(e);
256 		}
257 
258 		saveState(st);
259 
260 		//logInfo("--------------debug -----");
261 		auto curOff = _encoder.curOff();
262 
263 		if (curOff < SegmentSizeBytes) {
264 			//logInfo("----- curoff : ",curOff," mustSync : ",mustSync);
265 			if(mustSync)
266 				return sync();
267 			return;
268 		}
269 
270 		return cut();
271 
272 }
273 
274 
275 // cut closes current file written and creates a new one ready to append.
276 // cut first creates a temp wal file and writes necessary headers into it.
277 // Then cut atomically rename temp wal file to a wal file.
278 void cut()  {
279 		// close old wal file; truncate to avoid wasting space if an early cut
280 		logInfo("---- cut new file .");
281 		if(_fw.length > 0)
282 		{
283 			if(_fw[_fw.length - 1].isOpen)
284 				_fw[_fw.length - 1].flush();
285 		}
286 
287 		auto fpath = _dir ~"/" ~ walName(seq()+1, _enti+1);
288 
289 		// create a temp wal file with name sequence + 1, or truncate the existing one
290 		auto newtail = File(fpath,"wb+");
291 		if (!newtail.isOpen) {
292 			return;
293 		}
294 
295 		// update writer and save the previous crc
296 		_fw ~= newtail;
297 
298 		_encoder = theAllocator.make!Encoder(_fw[_fw.length-1]);
299 		if (_encoder is null) {
300 			return;
301 		}
302 		Record rec = {type:WalType.metadataType,data:cast(string)_metadata};
303 
304 		_encoder.encode(rec);
305 
306 		saveState(_hdstate);
307 		
308 		sync();
309 
310 		_fw[_fw.length-1].seek(0,SEEK_SET);
311 
312 		_encoder = theAllocator.make!Encoder(_fw[_fw.length-1]);
313 		if (_encoder is null) {
314 			return;
315 		}
316 
317 		return;
318 }
319 
320 	ulong seq()  {
321 		if(_fw.length <= 0)
322 			return 0;
323 	    auto basename = std.path.baseName(_fw[_fw.length-1].name());
324 		ulong seq,indx;
325 		parseWalName(basename,seq,indx);
326 		return seq;
327 }
328 	
329 	void SaveSnapshot(WalSnapshot sp)  {
330 		auto spdata = serialize(sp);
331 
332 		Record rec = {type:WalType.snapshotType,data:cast(string)spdata};
333 		_encoder.encode(rec); 
334 
335 		// update enti only when snapshot is ahead of last index
336 		if (_enti < sp.index) {
337 			_enti = sp.index;
338 		}
339 		return sync();
340 	}
341 
342 	void sync()
343 	{
344 		if(_fw.length > 0)
345 		{
346 			if(_fw[_fw.length-1].isOpen)
347 				_fw[_fw.length-1].flush();
348 		}
349 		if(_encoder !is null)
350 			_encoder.flush();
351 	}
352 
353 	void Close()
354 	{
355 		foreach(fp;_fw)
356 		{
357 		    logInfo("-----------close file name--------",fp.name);
358 			if(fp.isOpen)
359 				fp.flush();
360 			fp.close();
361 		}
362 		_fw.length = 0;
363 		foreach(fp;_fr)
364 		{
365 		    logInfo("-----------close file name--------",fp.name);
366 			if(fp.isOpen)
367 				fp.flush();
368 			fp.close();
369 		}
370 		_fr.length = 0;
371 	}
372 }
373