proio
reader.cc
1 #include <fcntl.h>
2 #include <sys/types.h>
3 
4 #include "google/protobuf/io/gzip_stream.h"
5 #include "lz4.h"
6 
7 #include "reader.h"
8 #include "writer.h"
9 
10 using namespace proio;
11 using namespace google::protobuf;
12 
13 Reader::Reader(int fd) {
14  this->fd = fd;
15  fileStream = new io::FileInputStream(fd);
16  closeFDOnDelete = false;
17 
18  initBucket();
19 }
20 
21 Reader::Reader(std::string filename) {
22  fd = open(filename.c_str(), O_RDONLY);
23  if (fd == -1) throw fileOpenError;
24  fileStream = new io::FileInputStream(fd);
25  closeFDOnDelete = true;
26 
27  initBucket();
28 }
29 
30 Reader::~Reader() {
31  if (bucketHeader) delete bucketHeader;
32  delete compBucket;
33  LZ4F_freeDecompressionContext(dctxPtr);
34  delete bucket;
35  delete fileStream;
36  if (closeFDOnDelete) close(fd);
37 }
38 
39 Event *Reader::Next(Event *event, bool metaOnly) {
40  if (event) event->Clear();
41 
42  while (!bucketHeader || bucketIndex >= bucketHeader->nevents()) {
43  if (bucketHeader) bucketIndex -= bucketHeader->nevents();
44  readHeader();
45  if (!bucketHeader) return NULL;
46  }
47 
48  if (!event) event = new Event();
49  event->metadata = metadata;
50  if (!metaOnly) {
51  if (bucket->BytesRemaining() == 0) readBucket();
52  readFromBucket(event);
53  } else
54  bucketIndex++;
55 
56  return event;
57 }
58 
59 bool Reader::Next(std::string *data) {
60  if (!data) return false;
61 
62  while (!bucketHeader || bucketIndex >= bucketHeader->nevents()) {
63  if (bucketHeader) bucketIndex -= bucketHeader->nevents();
64  readHeader();
65  if (!bucketHeader) return false;
66  }
67 
68  if (bucket->BytesRemaining() == 0) readBucket();
69  readFromBucket(data);
70 
71  return true;
72 }
73 
74 uint64_t Reader::Skip(uint64_t nEvents) {
75  uint64_t nSkipped = 0;
76 
77  uint64_t startIndex = bucketIndex;
78  bucketIndex += nEvents;
79  while (!bucketHeader || bucketIndex >= bucketHeader->nevents()) {
80  if (bucketHeader) {
81  uint64_t nBucketEvents = bucketHeader->nevents();
82  bucketIndex -= nBucketEvents;
83  nSkipped += nBucketEvents - startIndex;
84  if (nBucketEvents > 0 && bucket->BytesRemaining() == 0)
85  if (!fileStream->Skip(bucketHeader->bucketsize())) throw ioError;
86  }
87  readHeader();
88  if (!bucketHeader) return nSkipped;
89  startIndex = 0;
90  }
91  nSkipped += bucketIndex - startIndex;
92 
93  return nSkipped;
94 }
95 
97  delete fileStream;
98  if (lseek(fd, 0, SEEK_SET) == -1) throw seekError;
99  fileStream = new io::FileInputStream(fd);
100  metadata.clear();
101  bucketIndex = 0;
102  readHeader();
103 }
104 
105 void Reader::initBucket() {
106  compBucket = new BucketInputStream(0);
107  bucketHeader = NULL;
108  bucketEventsRead = 0;
109  bucketIndex = 0;
110  LZ4F_createDecompressionContext(&dctxPtr, LZ4F_VERSION);
111  bucket = new BucketInputStream(0);
112 }
113 
114 void Reader::readFromBucket(Event *event) {
115  auto stream = new io::CodedInputStream(bucket);
116 
117  while (bucketEventsRead <= bucketIndex) {
118  uint32_t protoSize;
119  if (!stream->ReadLittleEndian32(&protoSize)) {
120  delete stream;
121  throw corruptBucketError;
122  }
123 
124  if (event && bucketEventsRead == bucketIndex) {
125  auto eventLimit = stream->PushLimit(protoSize);
126  auto eventProto = event->getProto();
127  if (!eventProto->MergeFromCodedStream(stream) || !stream->ConsumedEntireMessage()) {
128  delete stream;
129  throw deserializationError;
130  }
131  event->SetDescriptorPool(DescriptorPool());
132  stream->PopLimit(eventLimit);
133  } else if (!stream->Skip(protoSize)) {
134  delete stream;
135  throw corruptBucketError;
136  }
137 
138  bucketEventsRead++;
139  }
140  bucketIndex++;
141 
142  delete stream;
143 }
144 
145 void Reader::readFromBucket(std::string *data) {
146  auto stream = new io::CodedInputStream(bucket);
147 
148  while (bucketEventsRead <= bucketIndex) {
149  uint32_t protoSize;
150  if (!stream->ReadLittleEndian32(&protoSize)) {
151  delete stream;
152  throw corruptBucketError;
153  }
154 
155  if (data && bucketEventsRead == bucketIndex) {
156  data->resize(protoSize);
157  if (!stream->ReadString(data, protoSize)) {
158  delete stream;
159  throw corruptBucketError;
160  }
161  } else if (!stream->Skip(protoSize)) {
162  delete stream;
163  throw corruptBucketError;
164  }
165 
166  bucketEventsRead++;
167  }
168  bucketIndex++;
169 
170  delete stream;
171 }
172 
173 void Reader::readHeader() {
174  if (bucketHeader) {
175  delete bucketHeader;
176  bucketHeader = NULL;
177  }
178  bucketEventsRead = 0;
179  compBucket->Reset(0);
180  bucket->Reset(0);
181 
182  auto stream = new io::CodedInputStream(fileStream);
183  syncToMagic(stream);
184  uint32_t headerSize;
185  if (!stream->ReadLittleEndian32(&headerSize)) {
186  delete stream;
187  return;
188  }
189 
190  auto headerLimit = stream->PushLimit(headerSize);
191  bucketHeader = new proto::BucketHeader;
192  if (!bucketHeader->MergeFromCodedStream(stream) || !stream->ConsumedEntireMessage()) {
193  delete stream;
194  throw deserializationError;
195  }
196  stream->PopLimit(headerLimit);
197 
198  // Set metadata for future events
199  for (auto keyValuePair : bucketHeader->metadata())
200  metadata[keyValuePair.first] = std::make_shared<std::string>(keyValuePair.second);
201 
202  // Add descriptors to pool owned by reader
203  for (const std::string &fdString : bucketHeader->filedescriptor()) {
204  FileDescriptorProto fdProto;
205  fdProto.ParseFromString(fdString);
206  descriptorPool.BuildFile(fdProto);
207  }
208 
209  delete stream;
210 }
211 
212 void Reader::readBucket() {
213  auto stream = new io::CodedInputStream(fileStream);
214 
215  uint64_t bucketSize = bucketHeader->bucketsize();
216  compBucket->Reset(bucketSize);
217  if (!stream->ReadRaw(compBucket->Bytes(), bucketSize)) {
218  delete stream;
219  throw corruptBucketError;
220  }
221 
222  delete stream;
223 
224  switch (bucketHeader->compression()) {
225  case LZ4: {
226  bucket->Reset(dctxPtr, compBucket);
227  break;
228  }
229  case GZIP: {
230  io::GzipInputStream *gzipStream = new io::GzipInputStream(compBucket);
231  bucket->Reset(*gzipStream);
232  delete gzipStream;
233  break;
234  }
235  default:
236  BucketInputStream *tmpBucket = bucket;
237  bucket = compBucket;
238  compBucket = tmpBucket;
239  }
240 }
241 
242 uint64_t Reader::syncToMagic(io::CodedInputStream *stream) {
243  uint8_t num;
244  uint64_t nRead = 0;
245 
246  while (stream->ReadRaw(&num, 1)) {
247  nRead++;
248 
249  if (num == magicBytes[0]) {
250  bool goodSeq = true;
251 
252  for (int i = 1; i < 16; i++) {
253  if (!stream->ReadRaw(&num, 1)) break;
254  nRead++;
255 
256  if (num != magicBytes[i]) {
257  goodSeq = false;
258  break;
259  }
260  }
261  if (goodSeq) break;
262  }
263  }
264  return nRead;
265 }
266 
267 BucketInputStream::BucketInputStream(uint64_t size) {
268  offset = 0;
269  bytes.resize(size);
270  this->size = size;
271 }
272 
273 BucketInputStream::~BucketInputStream() { ; }
274 
275 inline bool BucketInputStream::Next(const void **data, int *size) {
276  *data = &bytes[offset];
277  *size = this->size - offset;
278  offset = this->size;
279  if (*size == 0) return false;
280  return true;
281 }
282 
283 inline void BucketInputStream::BackUp(int count) { offset -= count; }
284 
285 inline bool BucketInputStream::Skip(int count) {
286  offset += count;
287  if (offset > size) {
288  offset = size;
289  return false;
290  }
291  return true;
292 }
293 
294 inline int64 BucketInputStream::ByteCount() const { return offset; }
295 
296 uint8_t *BucketInputStream::Bytes() { return &bytes[0]; }
297 
298 uint64_t BucketInputStream::BytesRemaining() { return size - offset; }
299 
300 void BucketInputStream::Reset(uint64_t size) {
301  offset = 0;
302  if (bytes.size() < size) bytes.resize(size);
303  this->size = size;
304 }
305 
306 uint64_t BucketInputStream::Reset(io::ZeroCopyInputStream &stream) {
307  Reset(0);
308  uint8_t *data;
309  int size;
310  while (stream.Next((const void **)&data, &size)) {
311  offset = this->size;
312  this->size += size;
313  if (this->size > bytes.size()) bytes.resize(this->size);
314  std::memcpy(&bytes[offset], data, size);
315  }
316  offset = 0;
317  return this->size;
318 }
319 
320 uint64_t BucketInputStream::Reset(LZ4F_dctx *dctxPtr, BucketInputStream *compBucket) {
321  offset = 0;
322  size = bytes.size();
323  if (size == 0) Reset(minBucketWriteWindow);
324  int srcSize;
325  uint8_t *srcBuffer;
326  compBucket->Next((const void **)&srcBuffer, &srcSize);
327  int srcBytesRemaining = srcSize;
328  int dstSize;
329  uint8_t *dstBuffer;
330  size_t hint = -1;
331  while (srcBytesRemaining > 0) {
332  Next((const void **)&dstBuffer, &dstSize);
333  size_t srcSizeTmp = srcSize;
334  size_t dstSizeTmp = dstSize;
335  hint = LZ4F_decompress(dctxPtr, dstBuffer, &dstSizeTmp, srcBuffer, &srcSizeTmp, NULL);
336  if (LZ4F_isError(hint)) throw badLZ4FrameError;
337  srcBytesRemaining -= srcSizeTmp;
338  BackUp(dstSize - dstSizeTmp);
339  if (offset == size) {
340  size += minBucketWriteWindow;
341  bytes.resize(size);
342  }
343  compBucket->BackUp(srcBytesRemaining);
344  compBucket->Next((const void **)&srcBuffer, &srcSize);
345  }
346  size = offset;
347  offset = 0;
348  if (hint != 0) {
349  LZ4F_resetDecompressionContext(dctxPtr);
350  throw badLZ4FrameError;
351  }
352  return size;
353 }
void SeekToStart()
Definition: reader.cc:96
Definition: event.h:11
void Clear()
Definition: event.cc:191
Reader(int fd)
Definition: reader.cc:13
Event * Next(Event *recycledEvent=NULL, bool metadataOnly=false)
Definition: reader.cc:39
uint64_t Skip(uint64_t nEvents)
Definition: reader.cc:74