4 #include "google/protobuf/io/gzip_stream.h" 13 fileStream =
new io::FileOutputStream(fd);
14 fileStream->SetCloseOnDelete(
false);
20 int fd = open(filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
21 if (fd == -1)
throw fileCreationError;
22 fileStream =
new io::FileOutputStream(fd);
23 fileStream->SetCloseOnDelete(
true);
31 streamWriteJob.workerReadyCond.wait(workerReadyLock);
32 streamWriteJob.isValid =
false;
33 streamWriteJob.doJobMutex.lock();
34 streamWriteJob.doJobCond.notify_all();
35 streamWriteJob.doJobMutex.unlock();
36 workerReadyLock.unlock();
38 streamWriteThread.join();
47 if (bucketEvents == 0)
return;
49 streamWriteJob.workerReadyCond.wait(workerReadyLock);
51 switch (compression) {
53 LZ4F_frameInfo_t info;
54 std::memset(&info, 0,
sizeof(info));
55 LZ4F_preferences_t prefs;
56 std::memset(&prefs, 0,
sizeof(prefs));
57 prefs.frameInfo = info;
58 prefs.compressionLevel = 0;
59 if (complevel > 0) prefs.compressionLevel = complevel;
60 size_t compBound = LZ4F_compressFrameBound(bucket->ByteCount(), &prefs);
61 compBucket->Reset(compBound);
62 size_t nWritten = LZ4F_compressFrame(compBucket->Bytes(), compBound, bucket->Bytes(),
63 bucket->ByteCount(), &prefs);
64 if (LZ4F_isError(nWritten))
throw lz4FrameCreationError;
65 compBucket->SetOffset(nWritten);
68 io::GzipOutputStream::Options options;
69 if (complevel >= 0) options.compression_level = complevel;
70 io::GzipOutputStream *gzipStream =
new io::GzipOutputStream(compBucket, options);
71 bucket->WriteTo(gzipStream);
77 compBucket = tmpBucket;
80 header->set_nevents(bucketEvents);
81 header->set_bucketsize(compBucket->ByteCount());
82 header->set_compression(compression);
84 streamWriteJob.compBucket = compBucket;
85 streamWriteJob.header = header;
86 header =
new proto::BucketHeader();
87 streamWriteJob.isValid =
true;
88 streamWriteJob.doJobMutex.lock();
89 streamWriteJob.doJobCond.notify_all();
90 streamWriteJob.doJobMutex.unlock();
98 for (
auto keyValuePair : event->metadata)
99 if (metadata[keyValuePair.first] != keyValuePair.second) {
100 PushMetadata(keyValuePair.first, *keyValuePair.second);
101 metadata[keyValuePair.first] = keyValuePair.second;
105 proto::Event *proto =
event->getProto();
109 std::set<const FileDescriptor *> newFDs;
110 std::function<void(const FileDescriptor *)> addFDsToSet;
111 addFDsToSet = [&](
const FileDescriptor *fd) {
112 for (
int i = 0; i < fd->dependency_count(); i++) addFDsToSet(fd->dependency(i));
113 if (writtenFDs.count(fd) == 0) {
115 this->writtenFDs.insert(fd);
118 for (
auto keyValuePair : proto->type()) addFDsToSet(event->getDescriptor(keyValuePair.first)->file());
119 if (newFDs.size() > 0) Flush();
120 for (
auto fd : newFDs) {
121 FileDescriptorProto fdProto;
122 fd->CopyTo(&fdProto);
123 fdProto.SerializeToString(header->add_filedescriptor());
126 auto stream =
new io::CodedOutputStream(bucket);
127 #if GOOGLE_PROTOBUF_VERSION >= 3004000 128 stream->WriteLittleEndian32((uint32_t)proto->ByteSizeLong());
130 stream->WriteLittleEndian32((uint32_t)proto->ByteSize());
132 if (!proto->SerializeToCodedStream(stream))
throw serializationError;
137 if (uint64_t(bucket->ByteCount()) > bucketDumpThres) Flush();
142 (*header->mutable_metadata())[name] = data;
147 (*header->mutable_metadata())[name] = data;
150 void Writer::initBucket() {
155 SetBucketDumpThreshold();
156 header =
new proto::BucketHeader;
158 streamWriteJob.fileStream = fileStream;
159 streamWriteJob.isValid =
false;
161 workerReadyLock = std::unique_lock<std::mutex>(streamWriteJob.workerReadyMutex);
162 streamWriteThread = std::thread(Writer::streamWrite, &streamWriteJob);
165 void Writer::streamWrite(WriteJob *job) {
166 std::unique_lock<std::mutex> doJobLock(job->doJobMutex);
168 job->workerReadyMutex.lock();
169 job->workerReadyCond.notify_all();
170 job->workerReadyMutex.unlock();
171 job->doJobCond.wait(doJobLock);
174 auto stream =
new io::CodedOutputStream(job->fileStream);
175 stream->WriteRaw(magicBytes, 16);
176 #if GOOGLE_PROTOBUF_VERSION >= 3004000 177 stream->WriteLittleEndian32((uint32_t)job->header->ByteSizeLong());
179 stream->WriteLittleEndian32((uint32_t)job->header->ByteSize());
181 if (!job->header->SerializeToCodedStream(stream))
throw serializationError;
182 stream->WriteRaw(job->compBucket->Bytes(), job->compBucket->ByteCount());
186 job->isValid =
false;
192 BucketOutputStream::BucketOutputStream() { offset = 0; }
194 BucketOutputStream::~BucketOutputStream() { ; }
196 inline bool BucketOutputStream::Next(
void **data,
int *size) {
197 if (bytes.size() - offset < minBucketWriteWindow) bytes.resize(offset + minBucketWriteWindow);
198 *data = &bytes[offset];
199 *size = bytes.size() - offset;
200 offset = bytes.size();
204 inline void BucketOutputStream::BackUp(
int count) { offset -= count; }
206 inline int64 BucketOutputStream::ByteCount()
const {
return offset; }
208 inline bool BucketOutputStream::AllowsAliasing()
const {
return false; }
210 uint8_t *BucketOutputStream::Bytes() {
return &bytes[0]; }
212 void BucketOutputStream::Reset() { offset = 0; }
214 void BucketOutputStream::Reset(uint64_t size) {
216 if (bytes.size() < size) bytes.resize(size);
219 void BucketOutputStream::WriteTo(io::ZeroCopyOutputStream *stream) {
222 uint64_t bytesWritten = 0;
223 while (stream->Next((
void **)&data, &size)) {
224 uint64_t bytesLeft = offset - bytesWritten;
225 uint64_t bytesToCopy = (bytesLeft < uint64_t(size)) ? bytesLeft : size;
226 std::memcpy(data, Bytes() + bytesWritten, bytesToCopy);
227 bytesLeft -= bytesToCopy;
228 bytesWritten += bytesToCopy;
229 if (bytesToCopy < uint64_t(size)) stream->BackUp(size - bytesToCopy);
230 if (bytesLeft == 0)
break;
234 void BucketOutputStream::SetOffset(uint64_t offset) { this->offset = offset; }
void PushMetadata(std::string name, const std::string &data)