1 module neton.wal.Decoder;
2 
3 import std.stdio;
4 import neton.wal.Record;
5 import hunt.logging;
6 import hunt.util.Serialize;
7 
8 // const minSectorSize = 512;
9 
10 // frameSizeBytes is frame size in bytes, including record size and padding size.
11 const int frameSizeBytes = 8;
12 
13 class Decoder
14 {
15 
16     // lastValidOff file offset following the last valid decoded record
17     long _lastValidOff;
18     byte[] _uint64buf;
19     // crc          hash.Hash32
20     File[] _fr;
21 
22     this(File[] fp)
23     {
24         _fr = fp;
25         _lastValidOff = 0;
26         // if(_fr.length > 0)
27         //     _fr[0].seek(0,SEEK_SET);
28     }
29 
30     int decode(out Record rec)
31     {
32         //logInfo("decode read fd length is ",_fr.length);
33         int res = 0;
34         if (_fr.length == 0)
35         {
36             logError("decode read fd length is 0 ..");
37             return -1;
38         }
39 
40         ulong dataLen = readUint64();
41         if (dataLen == 0)
42         {
43             _fr[0].close;
44             _fr = _fr[1 .. $];
45             if (_fr.length == 0)
46             {
47                 logWarning("next decode read fd length is 0 ..");
48                 return -1;
49             }
50             //_fr[0].seek(0,SEEK_SET);
51             _lastValidOff = 0;
52             return decode(rec);
53         }
54 
55         auto type = cast(WalType)(_uint64buf[0]);
56         if (type != WalType.recordType)
57             return -1;
58         auto buf = new byte[dataLen];
59         auto data = _fr[0].rawRead(buf);
60         if (data.length != dataLen)
61         {
62             logError("decode error : read data ");
63             return -1;
64         }
65         //logInfo("----unserialize Record : ",data.length," ",data);
66         rec = unserialize!Record(data);
67         _lastValidOff += frameSizeBytes + dataLen;
68         return res;
69     }
70 
71     ulong readUint64()
72     {
73         _uint64buf.length = 0;
74         _uint64buf = new byte[8];
75         byte[] result;
76         try
77         {
78             //logInfo("read fd open : ",_fr[0].isOpen," filename : ",_fr[0].name);
79             result = _fr[0].rawRead(_uint64buf);
80         }
81         catch (Exception e)
82         {
83             logError("read file exception  ", e.msg, " fd open : ",
84                     _fr[0].isOpen, " filename : ", _fr[0].name);
85             return 0;
86         }
87         if (result.length != _uint64buf.length)
88         {
89             logWarning("read file is eof ..");
90             return 0;
91         }
92 
93         ulong len = 0;
94         byte[] src = _uint64buf.dup;
95         src[0] = 0;
96         int offset = 0;
97         len = cast(ulong)((cast(ulong)(src[offset] & 0xFF) << 56) | (
98                 cast(ulong)(src[offset + 1] & 0xFF) << 48) | (cast(ulong)(
99                 src[offset + 2] & 0xFF) << 40) | (cast(ulong)(
100                 src[offset + 3] & 0xFF) << 32) | (cast(ulong)(
101                 src[offset + 4] & 0xFF) << 24) | (cast(ulong)(
102                 src[offset + 5] & 0xFF) << 16) | (cast(ulong)(
103                 src[offset + 6] & 0xFF) << 8) | cast(ulong)(src[offset + 7] & 0xFF));
104 
105         //logInfo("------read head  :  ",_uint64buf, " len : ",len);
106         return len;
107     }
108 
109     long lastOffset()
110     {
111         return _lastValidOff;
112     }
113 
114     void Close()
115     {
116         foreach (fp; _fr)
117         {
118             fp.close();
119         }
120     }
121 }