proio
reader.h
1 #ifndef PROIO_READER_H
2 #define PROIO_READER_H
3 
4 #include <cstring>
5 #include <mutex>
6 #include <string>
7 
8 #include "google/protobuf/io/zero_copy_stream_impl.h"
9 #include "lz4frame.h"
10 
11 #include "event.h"
12 
13 namespace proio {
14 class BucketInputStream : public google::protobuf::io::ZeroCopyInputStream {
15  public:
16  BucketInputStream(uint64_t size);
17  virtual ~BucketInputStream();
18 
19  bool Next(const void **data, int *size) override;
20  void BackUp(int count) override;
21  bool Skip(int count) override;
22  google::protobuf::int64 ByteCount() const override;
23 
24  uint8_t *Bytes();
25  uint64_t BytesRemaining();
26  void Reset(uint64_t size);
27  uint64_t Reset(google::protobuf::io::ZeroCopyInputStream &stream);
28  uint64_t Reset(LZ4F_dctx *dctxPtr, BucketInputStream *compBucket);
29 
30  private:
31  uint64_t offset;
32  std::vector<uint8_t> bytes;
33  uint64_t size;
34 };
35 
38 class Reader : public std::mutex {
39  public:
42  Reader(int fd);
46  Reader(std::string filename);
47  virtual ~Reader();
48 
56  Event *Next(Event *recycledEvent = NULL, bool metadataOnly = false);
60  bool Next(std::string *data);
63  uint64_t Skip(uint64_t nEvents);
67  void SeekToStart();
73  const google::protobuf::DescriptorPool *DescriptorPool() { return &descriptorPool; }
74 
75  private:
76  void initBucket();
77  void readFromBucket(Event *event);
78  void readFromBucket(std::string *data);
79  void readHeader();
80  void readBucket();
81  uint64_t syncToMagic(google::protobuf::io::CodedInputStream *stream);
82 
83  BucketInputStream *compBucket;
84  google::protobuf::io::FileInputStream *fileStream;
85  int fd;
86  bool closeFDOnDelete;
87  proto::BucketHeader *bucketHeader;
88  uint64_t bucketEventsRead;
89  uint64_t bucketIndex;
90  LZ4F_dctx *dctxPtr;
91  BucketInputStream *bucket;
92  std::map<std::string, std::shared_ptr<const std::string>> metadata;
93  google::protobuf::DescriptorPool descriptorPool;
94 };
95 
96 const class FileOpenError : public std::exception {
97  virtual const char *what() const throw() { return "Failed to open file for reading"; }
98 } fileOpenError;
99 
100 const class DeserializationError : public std::exception {
101  virtual const char *what() const throw() { return "Failed to deserialize message"; }
102 } deserializationError;
103 
104 const class CorruptBucketError : public std::exception {
105  virtual const char *what() const throw() { return "Bucket is corrupt"; }
106 } corruptBucketError;
107 
108 const class BadLZ4FrameError : public std::exception {
109  virtual const char *what() const throw() { return "Bad LZ4 frame"; }
110 } badLZ4FrameError;
111 
112 const class SeekError : public std::exception {
113  virtual const char *what() const throw() { return "Failed to seek file"; }
114 } seekError;
115 
116 const class IOError : public std::exception {
117  virtual const char *what() const throw() { return "Unexpected IO Error"; }
118 } ioError;
119 } // namespace proio
120 
121 #endif // PROIO_READER_H
Definition: event.h:11
const google::protobuf::DescriptorPool * DescriptorPool()
Definition: reader.h:73