proio
writer.cc
1 #include <fcntl.h>
2 #include <functional>
3 
4 #include "google/protobuf/io/gzip_stream.h"
5 #include "lz4frame.h"
6 
7 #include "writer.h"
8 
9 using namespace proio;
10 using namespace google::protobuf;
11 
12 Writer::Writer(int fd) {
13  fileStream = new io::FileOutputStream(fd);
14  fileStream->SetCloseOnDelete(false);
15 
16  initBucket();
17 }
18 
19 Writer::Writer(std::string filename) {
20  int fd = open(filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
21  if (fd == -1) throw fileCreationError;
22  fileStream = new io::FileOutputStream(fd);
23  fileStream->SetCloseOnDelete(true);
24 
25  initBucket();
26 }
27 
28 Writer::~Writer() {
29  Flush();
30 
31  streamWriteJob.workerReadyCond.wait(workerReadyLock);
32  streamWriteJob.isValid = false;
33  streamWriteJob.doJobMutex.lock();
34  streamWriteJob.doJobCond.notify_all();
35  streamWriteJob.doJobMutex.unlock();
36  workerReadyLock.unlock();
37 
38  streamWriteThread.join();
39 
40  delete bucket;
41  delete fileStream;
42  delete compBucket;
43  delete header;
44 }
45 
46 void Writer::Flush() {
47  if (bucketEvents == 0) return;
48 
49  streamWriteJob.workerReadyCond.wait(workerReadyLock);
50  compBucket->Reset();
51  switch (compression) {
52  case LZ4: {
53  LZ4F_frameInfo_t info;
54  std::memset(&info, 0, sizeof(info));
55  LZ4F_preferences_t prefs;
56  std::memset(&prefs, 0, sizeof(prefs));
57  prefs.frameInfo = info;
58  prefs.compressionLevel = 0;
59  if (complevel > 0) prefs.compressionLevel = complevel;
60  size_t compBound = LZ4F_compressFrameBound(bucket->ByteCount(), &prefs);
61  compBucket->Reset(compBound);
62  size_t nWritten = LZ4F_compressFrame(compBucket->Bytes(), compBound, bucket->Bytes(),
63  bucket->ByteCount(), &prefs);
64  if (LZ4F_isError(nWritten)) throw lz4FrameCreationError;
65  compBucket->SetOffset(nWritten);
66  } break;
67  case GZIP: {
68  io::GzipOutputStream::Options options;
69  if (complevel >= 0) options.compression_level = complevel;
70  io::GzipOutputStream *gzipStream = new io::GzipOutputStream(compBucket, options);
71  bucket->WriteTo(gzipStream);
72  delete gzipStream;
73  } break;
74  default:
75  BucketOutputStream *tmpBucket = bucket;
76  bucket = compBucket;
77  compBucket = tmpBucket;
78  }
79 
80  header->set_nevents(bucketEvents);
81  header->set_bucketsize(compBucket->ByteCount());
82  header->set_compression(compression);
83 
84  streamWriteJob.compBucket = compBucket;
85  streamWriteJob.header = header;
86  header = new proto::BucketHeader();
87  streamWriteJob.isValid = true;
88  streamWriteJob.doJobMutex.lock();
89  streamWriteJob.doJobCond.notify_all();
90  streamWriteJob.doJobMutex.unlock();
91 
92  bucket->Reset();
93  bucketEvents = 0;
94 }
95 
96 void Writer::Push(Event *event) {
97  // Add metadata that have changed
98  for (auto keyValuePair : event->metadata)
99  if (metadata[keyValuePair.first] != keyValuePair.second) {
100  PushMetadata(keyValuePair.first, *keyValuePair.second);
101  metadata[keyValuePair.first] = keyValuePair.second;
102  }
103 
104  event->FlushCache();
105  proto::Event *proto = event->getProto();
106 
107  // Add new protobuf FileDescriptors to the stream that are required to
108  // describe event data
109  std::set<const FileDescriptor *> newFDs;
110  std::function<void(const FileDescriptor *)> addFDsToSet;
111  addFDsToSet = [&](const FileDescriptor *fd) {
112  for (int i = 0; i < fd->dependency_count(); i++) addFDsToSet(fd->dependency(i));
113  if (writtenFDs.count(fd) == 0) {
114  newFDs.insert(fd);
115  this->writtenFDs.insert(fd);
116  }
117  };
118  for (auto keyValuePair : proto->type()) addFDsToSet(event->getDescriptor(keyValuePair.first)->file());
119  if (newFDs.size() > 0) Flush();
120  for (auto fd : newFDs) {
121  FileDescriptorProto fdProto;
122  fd->CopyTo(&fdProto);
123  fdProto.SerializeToString(header->add_filedescriptor());
124  }
125 
126  auto stream = new io::CodedOutputStream(bucket);
127 #if GOOGLE_PROTOBUF_VERSION >= 3004000
128  stream->WriteLittleEndian32((uint32_t)proto->ByteSizeLong());
129 #else
130  stream->WriteLittleEndian32((uint32_t)proto->ByteSize());
131 #endif
132  if (!proto->SerializeToCodedStream(stream)) throw serializationError;
133  delete stream;
134 
135  bucketEvents++;
136 
137  if (uint64_t(bucket->ByteCount()) > bucketDumpThres) Flush();
138 }
139 
140 void Writer::PushMetadata(std::string name, const std::string &data) {
141  Flush();
142  (*header->mutable_metadata())[name] = data;
143 }
144 
145 void Writer::PushMetadata(std::string name, const char *data) {
146  Flush();
147  (*header->mutable_metadata())[name] = data;
148 }
149 
150 void Writer::initBucket() {
151  bucket = new BucketOutputStream();
152  bucketEvents = 0;
153  SetCompression();
154  compBucket = new BucketOutputStream();
155  SetBucketDumpThreshold();
156  header = new proto::BucketHeader;
157 
158  streamWriteJob.fileStream = fileStream;
159  streamWriteJob.isValid = false;
160 
161  workerReadyLock = std::unique_lock<std::mutex>(streamWriteJob.workerReadyMutex);
162  streamWriteThread = std::thread(Writer::streamWrite, &streamWriteJob);
163 }
164 
165 void Writer::streamWrite(WriteJob *job) {
166  std::unique_lock<std::mutex> doJobLock(job->doJobMutex);
167  while (true) {
168  job->workerReadyMutex.lock();
169  job->workerReadyCond.notify_all();
170  job->workerReadyMutex.unlock();
171  job->doJobCond.wait(doJobLock);
172 
173  if (job->isValid) {
174  auto stream = new io::CodedOutputStream(job->fileStream);
175  stream->WriteRaw(magicBytes, 16);
176 #if GOOGLE_PROTOBUF_VERSION >= 3004000
177  stream->WriteLittleEndian32((uint32_t)job->header->ByteSizeLong());
178 #else
179  stream->WriteLittleEndian32((uint32_t)job->header->ByteSize());
180 #endif
181  if (!job->header->SerializeToCodedStream(stream)) throw serializationError;
182  stream->WriteRaw(job->compBucket->Bytes(), job->compBucket->ByteCount());
183  delete stream;
184 
185  delete job->header;
186  job->isValid = false;
187  } else
188  break;
189  }
190 }
191 
192 BucketOutputStream::BucketOutputStream() { offset = 0; }
193 
194 BucketOutputStream::~BucketOutputStream() { ; }
195 
196 inline bool BucketOutputStream::Next(void **data, int *size) {
197  if (bytes.size() - offset < minBucketWriteWindow) bytes.resize(offset + minBucketWriteWindow);
198  *data = &bytes[offset];
199  *size = bytes.size() - offset;
200  offset = bytes.size();
201  return true;
202 }
203 
204 inline void BucketOutputStream::BackUp(int count) { offset -= count; }
205 
206 inline int64 BucketOutputStream::ByteCount() const { return offset; }
207 
208 inline bool BucketOutputStream::AllowsAliasing() const { return false; }
209 
210 uint8_t *BucketOutputStream::Bytes() { return &bytes[0]; }
211 
212 void BucketOutputStream::Reset() { offset = 0; }
213 
214 void BucketOutputStream::Reset(uint64_t size) {
215  offset = 0;
216  if (bytes.size() < size) bytes.resize(size);
217 }
218 
219 void BucketOutputStream::WriteTo(io::ZeroCopyOutputStream *stream) {
220  uint8_t *data;
221  int size;
222  uint64_t bytesWritten = 0;
223  while (stream->Next((void **)&data, &size)) {
224  uint64_t bytesLeft = offset - bytesWritten;
225  uint64_t bytesToCopy = (bytesLeft < uint64_t(size)) ? bytesLeft : size;
226  std::memcpy(data, Bytes() + bytesWritten, bytesToCopy);
227  bytesLeft -= bytesToCopy;
228  bytesWritten += bytesToCopy;
229  if (bytesToCopy < uint64_t(size)) stream->BackUp(size - bytesToCopy);
230  if (bytesLeft == 0) break;
231  }
232 }
233 
234 void BucketOutputStream::SetOffset(uint64_t offset) { this->offset = offset; }
Writer(int fd)
Definition: writer.cc:12
Definition: event.h:11
void PushMetadata(std::string name, const std::string &data)
Definition: writer.cc:140
void Flush()
Definition: writer.cc:46
void Push(Event *event)
Definition: writer.cc:96