1 module neton.wal.Decoder; 2 3 import std.stdio; 4 import neton.wal.Record; 5 import hunt.logging; 6 import hunt.util.Serialize; 7 8 // const minSectorSize = 512; 9 10 // frameSizeBytes is frame size in bytes, including record size and padding size. 11 const int frameSizeBytes = 8; 12 13 class Decoder 14 { 15 16 // lastValidOff file offset following the last valid decoded record 17 long _lastValidOff; 18 byte[] _uint64buf; 19 // crc hash.Hash32 20 File[] _fr; 21 22 this(File[] fp) 23 { 24 _fr = fp; 25 _lastValidOff = 0; 26 // if(_fr.length > 0) 27 // _fr[0].seek(0,SEEK_SET); 28 } 29 30 int decode(out Record rec) 31 { 32 //logInfo("decode read fd length is ",_fr.length); 33 int res = 0; 34 if (_fr.length == 0) 35 { 36 logError("decode read fd length is 0 .."); 37 return -1; 38 } 39 40 ulong dataLen = readUint64(); 41 if (dataLen == 0) 42 { 43 _fr[0].close; 44 _fr = _fr[1 .. $]; 45 if (_fr.length == 0) 46 { 47 logWarning("next decode read fd length is 0 .."); 48 return -1; 49 } 50 //_fr[0].seek(0,SEEK_SET); 51 _lastValidOff = 0; 52 return decode(rec); 53 } 54 55 auto type = cast(WalType)(_uint64buf[0]); 56 if (type != WalType.recordType) 57 return -1; 58 auto buf = new byte[dataLen]; 59 auto data = _fr[0].rawRead(buf); 60 if (data.length != dataLen) 61 { 62 logError("decode error : read data "); 63 return -1; 64 } 65 //logInfo("----unserialize Record : ",data.length," ",data); 66 rec = unserialize!Record(data); 67 _lastValidOff += frameSizeBytes + dataLen; 68 return res; 69 } 70 71 ulong readUint64() 72 { 73 _uint64buf.length = 0; 74 _uint64buf = new byte[8]; 75 byte[] result; 76 try 77 { 78 //logInfo("read fd open : ",_fr[0].isOpen," filename : ",_fr[0].name); 79 result = _fr[0].rawRead(_uint64buf); 80 } 81 catch (Exception e) 82 { 83 logError("read file exception ", e.msg, " fd open : ", 84 _fr[0].isOpen, " filename : ", _fr[0].name); 85 return 0; 86 } 87 if (result.length != _uint64buf.length) 88 { 89 logWarning("read file is eof .."); 90 return 0; 91 } 92 93 ulong len = 0; 94 byte[] src = _uint64buf.dup; 95 src[0] = 0; 96 int offset = 0; 97 len = cast(ulong)((cast(ulong)(src[offset] & 0xFF) << 56) | ( 98 cast(ulong)(src[offset + 1] & 0xFF) << 48) | (cast(ulong)( 99 src[offset + 2] & 0xFF) << 40) | (cast(ulong)( 100 src[offset + 3] & 0xFF) << 32) | (cast(ulong)( 101 src[offset + 4] & 0xFF) << 24) | (cast(ulong)( 102 src[offset + 5] & 0xFF) << 16) | (cast(ulong)( 103 src[offset + 6] & 0xFF) << 8) | cast(ulong)(src[offset + 7] & 0xFF)); 104 105 //logInfo("------read head : ",_uint64buf, " len : ",len); 106 return len; 107 } 108 109 long lastOffset() 110 { 111 return _lastValidOff; 112 } 113 114 void Close() 115 { 116 foreach (fp; _fr) 117 { 118 fp.close(); 119 } 120 } 121 }