proio
writer.h
1 #ifndef PROIO_WRITER_H
2 #define PROIO_WRITER_H
3 
4 #include <condition_variable>
5 #include <cstring>
6 #include <mutex>
7 #include <string>
8 #include <thread>
9 
10 #include "google/protobuf/descriptor.h"
11 #include "google/protobuf/descriptor.pb.h"
12 #include "google/protobuf/io/zero_copy_stream_impl.h"
13 
14 #include "event.h"
15 #include "proio/proto/proio.pb.h"
16 
17 namespace proio {
18 typedef proto::BucketHeader_CompType Compression;
19 const Compression LZ4 = proto::BucketHeader::LZ4;
20 const Compression GZIP = proto::BucketHeader::GZIP;
21 const Compression UNCOMPRESSED = proto::BucketHeader::NONE;
22 
23 const std::size_t minBucketWriteWindow = 0x100000;
24 
25 class BucketOutputStream : public google::protobuf::io::ZeroCopyOutputStream {
26  public:
28  virtual ~BucketOutputStream();
29 
30  bool Next(void **data, int *size) override;
31  void BackUp(int count) override;
32  google::protobuf::int64 ByteCount() const override;
33  bool AllowsAliasing() const override;
34 
35  uint8_t *Bytes();
36  void Reset();
37  void Reset(uint64_t size);
38  void WriteTo(google::protobuf::io::ZeroCopyOutputStream *stream);
39  void SetOffset(uint64_t offset);
40 
41  private:
42  std::vector<uint8_t> bytes;
43  uint64_t offset;
44 };
45 
48 class Writer : public std::mutex {
49  public:
52  Writer(int fd);
56  Writer(std::string filename);
57  virtual ~Writer();
58 
65  void Flush();
68  void Push(Event *event);
73  void PushMetadata(std::string name, const std::string &data);
78  void PushMetadata(std::string name, const char *data);
83  void SetCompression(Compression alg = GZIP, int level = -1) {
84  compression = alg;
85  complevel = level;
86  }
92  void SetBucketDumpThreshold(uint64_t thres = 0x1000000) { bucketDumpThres = thres; }
93 
94  private:
95  BucketOutputStream *bucket;
96  google::protobuf::io::FileOutputStream *fileStream;
97  uint64_t bucketEvents;
98  Compression compression;
99  int complevel;
100  BucketOutputStream *compBucket;
101  uint64_t bucketDumpThres;
102  proto::BucketHeader *header;
103  std::map<std::string, std::shared_ptr<const std::string>> metadata;
104  std::set<const google::protobuf::FileDescriptor *> writtenFDs;
105 
106  void initBucket();
107 
108  std::thread streamWriteThread;
109  typedef struct {
110  bool isValid;
111 
112  BucketOutputStream *compBucket;
113  proto::BucketHeader *header;
114  google::protobuf::io::FileOutputStream *fileStream;
115 
116  std::mutex doJobMutex;
117  std::condition_variable doJobCond;
118  std::mutex workerReadyMutex;
119  std::condition_variable workerReadyCond;
120  } WriteJob;
121  WriteJob streamWriteJob;
122  std::unique_lock<std::mutex> workerReadyLock;
123 
124  static void streamWrite(WriteJob *job);
125 };
126 
127 const uint8_t magicBytes[] = {0xe1, 0xc1, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
128  0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
129 
130 const class SerializationError : public std::exception {
131  virtual const char *what() const throw() { return "Failed to serialize message"; }
132 } serializationError;
133 
134 const class FileCreationError : public std::exception {
135  virtual const char *what() const throw() { return "Failed to creating file for writing"; }
136 } fileCreationError;
137 
138 const class LZ4FrameCreationError : public std::exception {
139  virtual const char *what() const throw() { return "Failed to create LZ4 frame"; }
140 } lz4FrameCreationError;
141 } // namespace proio
142 
143 #endif // PROIO_WRITER_H
Definition: event.h:11
void SetBucketDumpThreshold(uint64_t thres=0x1000000)
Definition: writer.h:92
void SetCompression(Compression alg=GZIP, int level=-1)
Definition: writer.h:83