1 module neton.wal.Wal; 2 3 import neton.wal.Record; 4 import neton.wal.Util; 5 import neton.wal.Decoder; 6 import neton.wal.Encoder; 7 import hunt.util.Serialize; 8 import std.stdio; 9 import hunt.logging; 10 import std.file; 11 import std.experimental.allocator; 12 import hunt.raft; 13 import std.path; 14 15 // SegmentSizeBytes is the preallocated size of each wal segment file. 16 // The actual size might be larger than this. In general, the default 17 // value should be used, but this is defined as an exported variable 18 // so that tests can set a different segment size. 19 const long SegmentSizeBytes= 64 * 1000 * 1000 ;// 64MB 20 21 //plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "wal") 22 23 enum ErrMetadataConflict = "wal: conflicting metadata found"; 24 enum ErrFileNotFound = "wal: file not found"; 25 enum ErrCRCMismatch = "wal: crc mismatch"; 26 enum ErrSnapshotMismatch = "wal: snapshot mismatch"; 27 enum ErrSnapshotNotFound = "wal: snapshot not found"; 28 //crcTable = crc32.MakeTable(crc32.Castagnoli) 29 30 31 // WAL is a logical representation of the stable storage. 32 // WAL is either in read mode or append mode but not both. 33 // A newly created WAL is in append mode, and ready for appending records. 34 // A just opened WAL is in read mode, and ready for reading records. 35 // The WAL will be ready for appending after reading out all the previous records. 36 class WAL { 37 string _dir;// the living directory of the underlay files 38 39 // dirFile is a fd for the wal directory for syncing on Rename 40 //dirFile *os.File 41 42 byte[] _metadata; // metadata recorded at the head of each WAL 43 HardState _hdstate ; // hardstate recorded at the head of WAL 44 45 WalSnapshot _start; // snapshot to start reading 46 Decoder _decoder; // decoder to decode records 47 48 ulong _enti; // index of the last entry saved to the wal 49 Encoder _encoder; // encoder to encode records 50 51 52 File[] _fw; //(the name is increasing) 53 File[] _fr; //(the name is increasing) 54 55 // creates a WAL ready for appending records. The given metadata is 56 // recorded at the head of each WAL file, and can be retrieved with ReadAll. 57 this(string dirpath, byte[] metadata) { 58 if (!isEmptyDir(dirpath)) { 59 logError("wal dir not is empty..."); 60 return; 61 } 62 63 // keep temporary wal directory so WAL initialization appears atomic 64 auto tmpdirpath = dirpath ~ ".tmp"; 65 if (Exist(tmpdirpath)) { 66 rmdir(tmpdirpath); 67 } 68 mkdir(tmpdirpath); 69 70 auto filepath = tmpdirpath ~ "/"~walName(0,0); 71 auto fw = File(filepath,"wb+"); 72 if (!fw.isOpen()) { 73 return; 74 } 75 _fw ~= fw; 76 _dir = dirpath; 77 _metadata = metadata; 78 79 _encoder = theAllocator.make!Encoder(_fw[_fw.length-1]); 80 81 if (_encoder is null) { 82 logError("theAllocator make encoder object error..."); 83 return; 84 } 85 86 Record rec = {type:WalType.metadataType,data:cast(string)metadata}; 87 _encoder.encode(rec); 88 89 WalSnapshot wsp; 90 SaveSnapshot(wsp); 91 92 93 renameDir(tmpdirpath,_dir); 94 } 95 96 this(string dirpath, WalSnapshot snap, bool write) { 97 auto names = readWalNames(dirpath); 98 99 long idx = searchIndex(names, snap.index); 100 if (idx < 0) { 101 logError("search index ",snap.index); 102 return ; 103 } 104 105 // open the wal files 106 107 for(auto i = idx ; i< names.length;i++) 108 { 109 auto filepath = dirpath ~ "/" ~ names[idx]; 110 if(write) 111 { 112 auto fw = File(filepath,"ab+"); 113 if(fw.isOpen()) 114 { 115 _fr ~= fw; 116 _fw ~=fw; 117 } 118 } 119 else 120 { 121 auto fr = File(filepath,"rb"); 122 if(fr.isOpen) 123 { 124 _fr ~= fr; 125 } 126 } 127 } 128 129 _dir = dirpath; 130 _start = snap; 131 _decoder = new Decoder(_fr); 132 } 133 134 135 // ReadAll reads out records of the current WAL. 136 // If opened in write mode, it must read out all records until EOF. Or an error 137 // will be returned. 138 // If opened in read mode, it will try to read all records if possible. 139 // If it cannot read out the expected snap, it will return ErrSnapshotNotFound. 140 // If loaded snap doesn't match with the expected one, it will return 141 // all the records and error ErrSnapshotMismatch. 142 // TODO: detect not-last-snap error. 143 // TODO: maybe loose the checking of match. 144 // After ReadAll, the WAL will be ready for appending new records. 145 void ReadAll(out byte[] metadata, out HardState state, out Entry[] ents) { 146 147 Record rec; 148 149 bool match ; 150 while (_decoder.decode(rec) != -1) 151 { 152 //logInfo("decoder --- --- "); 153 switch(rec.type) 154 { 155 case WalType.entryType: 156 Entry e = unserialize!Entry(cast(byte[])rec.data); 157 if (e.Index > _start.index) { 158 ents ~= e; 159 } 160 _enti = e.Index; 161 break; 162 case WalType.stateType: 163 state = unserialize!HardState(cast(byte[])rec.data); 164 break; 165 case WalType.metadataType: 166 metadata = cast(byte[])rec.data; 167 break; 168 case WalType.crcType: 169 170 break; 171 case WalType.snapshotType: 172 WalSnapshot snap = unserialize!WalSnapshot(cast(byte[])rec.data); 173 if (snap.index == _start.index) { 174 if (snap.term != _start.term ){ 175 HardState st; 176 state = st; 177 logError("ErrSnapshotMismatch "); 178 return ; 179 } 180 match = true; 181 } 182 break; 183 default: 184 HardState st; 185 state = st; 186 logError("unexpected block type ", rec.type); 187 return; 188 } 189 } 190 191 if (!match) { 192 logError("ErrSnapshotNotFound "); 193 } 194 195 // close decoder, disable reading 196 foreach(fp; _fr) 197 fp.close(); 198 _fr.length = 0; 199 200 WalSnapshot wsp; 201 _start = wsp; 202 203 _metadata = metadata; 204 205 if(_fw.length > 0) 206 { 207 logInfo("make encoder ..before "); 208 _encoder = theAllocator.make!Encoder(_fw[_fw.length-1]); 209 logInfo("make encoder ..after "); 210 } 211 _decoder.Close(); 212 _decoder = null; 213 } 214 215 void saveEntry(Entry e) { 216 // TODO: add MustMarshalTo to reduce one allocation. 217 //logInfo("--------------debug 2-----"); 218 auto b = serialize(e); 219 Record rec = {type: WalType.entryType, data:cast(string)b}; 220 if(_encoder !is null) 221 { 222 _encoder.encode(rec); 223 } 224 _enti = e.Index; 225 return; 226 } 227 228 void saveState(HardState s) { 229 //logInfo("--------------debug 1-----"); 230 if (IsEmptyHardState(s)) { 231 return ; 232 } 233 _hdstate = s; 234 auto b = serialize(s); 235 Record rec = {type: WalType.stateType, data: cast(string)b}; 236 if(_encoder !is null) 237 { 238 _encoder.encode(rec); 239 } 240 } 241 242 243 void Save(HardState st,Entry[] ents){ 244 245 // short cut, do not call sync 246 if (IsEmptyHardState(st) && ents.length == 0 ){ 247 return ; 248 } 249 //logInfo("----- save hard state : ",st, " entry : ",ents); 250 251 bool mustSync = Ready.MustSync(st, _hdstate, cast(int)ents.length); 252 253 // TODO(xiangli): no more reference operator 254 foreach(e ; ents) { 255 saveEntry(e); 256 } 257 258 saveState(st); 259 260 //logInfo("--------------debug -----"); 261 auto curOff = _encoder.curOff(); 262 263 if (curOff < SegmentSizeBytes) { 264 //logInfo("----- curoff : ",curOff," mustSync : ",mustSync); 265 if(mustSync) 266 return sync(); 267 return; 268 } 269 270 return cut(); 271 272 } 273 274 275 // cut closes current file written and creates a new one ready to append. 276 // cut first creates a temp wal file and writes necessary headers into it. 277 // Then cut atomically rename temp wal file to a wal file. 278 void cut() { 279 // close old wal file; truncate to avoid wasting space if an early cut 280 logInfo("---- cut new file ."); 281 if(_fw.length > 0) 282 { 283 if(_fw[_fw.length - 1].isOpen) 284 _fw[_fw.length - 1].flush(); 285 } 286 287 auto fpath = _dir ~"/" ~ walName(seq()+1, _enti+1); 288 289 // create a temp wal file with name sequence + 1, or truncate the existing one 290 auto newtail = File(fpath,"wb+"); 291 if (!newtail.isOpen) { 292 return; 293 } 294 295 // update writer and save the previous crc 296 _fw ~= newtail; 297 298 _encoder = theAllocator.make!Encoder(_fw[_fw.length-1]); 299 if (_encoder is null) { 300 return; 301 } 302 Record rec = {type:WalType.metadataType,data:cast(string)_metadata}; 303 304 _encoder.encode(rec); 305 306 saveState(_hdstate); 307 308 sync(); 309 310 _fw[_fw.length-1].seek(0,SEEK_SET); 311 312 _encoder = theAllocator.make!Encoder(_fw[_fw.length-1]); 313 if (_encoder is null) { 314 return; 315 } 316 317 return; 318 } 319 320 ulong seq() { 321 if(_fw.length <= 0) 322 return 0; 323 auto basename = std.path.baseName(_fw[_fw.length-1].name()); 324 ulong seq,indx; 325 parseWalName(basename,seq,indx); 326 return seq; 327 } 328 329 void SaveSnapshot(WalSnapshot sp) { 330 auto spdata = serialize(sp); 331 332 Record rec = {type:WalType.snapshotType,data:cast(string)spdata}; 333 _encoder.encode(rec); 334 335 // update enti only when snapshot is ahead of last index 336 if (_enti < sp.index) { 337 _enti = sp.index; 338 } 339 return sync(); 340 } 341 342 void sync() 343 { 344 if(_fw.length > 0) 345 { 346 if(_fw[_fw.length-1].isOpen) 347 _fw[_fw.length-1].flush(); 348 } 349 if(_encoder !is null) 350 _encoder.flush(); 351 } 352 353 void Close() 354 { 355 foreach(fp;_fw) 356 { 357 logInfo("-----------close file name--------",fp.name); 358 if(fp.isOpen) 359 fp.flush(); 360 fp.close(); 361 } 362 _fw.length = 0; 363 foreach(fp;_fr) 364 { 365 logInfo("-----------close file name--------",fp.name); 366 if(fp.isOpen) 367 fp.flush(); 368 fp.close(); 369 } 370 _fr.length = 0; 371 } 372 } 373