4 #include <condition_variable>    10 #include "google/protobuf/descriptor.h"    11 #include "google/protobuf/descriptor.pb.h"    12 #include "google/protobuf/io/zero_copy_stream_impl.h"    15 #include "proio/proto/proio.pb.h"    18 typedef proto::BucketHeader_CompType Compression;
    19 const Compression LZ4 = proto::BucketHeader::LZ4;
    20 const Compression GZIP = proto::BucketHeader::GZIP;
    21 const Compression UNCOMPRESSED = proto::BucketHeader::NONE;
    23 const std::size_t minBucketWriteWindow = 0x100000;
    30     bool Next(
void **data, 
int *size) 
override;
    31     void BackUp(
int count) 
override;
    32     google::protobuf::int64 ByteCount() 
const override;
    33     bool AllowsAliasing() 
const override;
    37     void Reset(uint64_t size);
    38     void WriteTo(google::protobuf::io::ZeroCopyOutputStream *stream);
    39     void SetOffset(uint64_t offset);
    42     std::vector<uint8_t> bytes;
    56     Writer(std::string filename);
    68     void Push(
Event *event);
    73     void PushMetadata(std::string name, 
const std::string &data);
    78     void PushMetadata(std::string name, 
const char *data);
    96     google::protobuf::io::FileOutputStream *fileStream;
    97     uint64_t bucketEvents;
    98     Compression compression;
   101     uint64_t bucketDumpThres;
   102     proto::BucketHeader *header;
   103     std::map<std::string, std::shared_ptr<const std::string>> metadata;
   104     std::set<const google::protobuf::FileDescriptor *> writtenFDs;
   108     std::thread streamWriteThread;
   113         proto::BucketHeader *header;
   114         google::protobuf::io::FileOutputStream *fileStream;
   116         std::mutex doJobMutex;
   117         std::condition_variable doJobCond;
   118         std::mutex workerReadyMutex;
   119         std::condition_variable workerReadyCond;
   121     WriteJob streamWriteJob;
   122     std::unique_lock<std::mutex> workerReadyLock;
   124     static void streamWrite(WriteJob *job);
   127 const uint8_t magicBytes[] = {0xe1, 0xc1, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
   128                               0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
   131     virtual const char *what() 
const throw() { 
return "Failed to serialize message"; }
   132 } serializationError;
   135     virtual const char *what() 
const throw() { 
return "Failed to creating file for writing"; }
   139     virtual const char *what() 
const throw() { 
return "Failed to create LZ4 frame"; }
   140 } lz4FrameCreationError;
   143 #endif  // PROIO_WRITER_H 
void SetBucketDumpThreshold(uint64_t thres=0x1000000)
 
void SetCompression(Compression alg=GZIP, int level=-1)