4 #include <condition_variable> 10 #include "google/protobuf/descriptor.h" 11 #include "google/protobuf/descriptor.pb.h" 12 #include "google/protobuf/io/zero_copy_stream_impl.h" 15 #include "proio/proto/proio.pb.h" 18 typedef proto::BucketHeader_CompType Compression;
19 const Compression LZ4 = proto::BucketHeader::LZ4;
20 const Compression GZIP = proto::BucketHeader::GZIP;
21 const Compression UNCOMPRESSED = proto::BucketHeader::NONE;
23 const std::size_t minBucketWriteWindow = 0x100000;
30 bool Next(
void **data,
int *size)
override;
31 void BackUp(
int count)
override;
32 google::protobuf::int64 ByteCount()
const override;
33 bool AllowsAliasing()
const override;
37 void Reset(uint64_t size);
38 void WriteTo(google::protobuf::io::ZeroCopyOutputStream *stream);
39 void SetOffset(uint64_t offset);
42 std::vector<uint8_t> bytes;
56 Writer(std::string filename);
68 void Push(
Event *event);
73 void PushMetadata(std::string name,
const std::string &data);
78 void PushMetadata(std::string name,
const char *data);
96 google::protobuf::io::FileOutputStream *fileStream;
97 uint64_t bucketEvents;
98 Compression compression;
101 uint64_t bucketDumpThres;
102 proto::BucketHeader *header;
103 std::map<std::string, std::shared_ptr<const std::string>> metadata;
104 std::set<const google::protobuf::FileDescriptor *> writtenFDs;
108 std::thread streamWriteThread;
113 proto::BucketHeader *header;
114 google::protobuf::io::FileOutputStream *fileStream;
116 std::mutex doJobMutex;
117 std::condition_variable doJobCond;
118 std::mutex workerReadyMutex;
119 std::condition_variable workerReadyCond;
121 WriteJob streamWriteJob;
122 std::unique_lock<std::mutex> workerReadyLock;
124 static void streamWrite(WriteJob *job);
127 const uint8_t magicBytes[] = {0xe1, 0xc1, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
128 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
131 virtual const char *what()
const throw() {
return "Failed to serialize message"; }
132 } serializationError;
135 virtual const char *what()
const throw() {
return "Failed to creating file for writing"; }
139 virtual const char *what()
const throw() {
return "Failed to create LZ4 frame"; }
140 } lz4FrameCreationError;
143 #endif // PROIO_WRITER_H
void SetBucketDumpThreshold(uint64_t thres=0x1000000)
void SetCompression(Compression alg=GZIP, int level=-1)