4 #include "google/protobuf/io/gzip_stream.h" 10 using namespace proio;
15 fileStream =
new io::FileInputStream(fd);
16 closeFDOnDelete =
false;
22 fd = open(filename.c_str(), O_RDONLY);
23 if (fd == -1)
throw fileOpenError;
24 fileStream =
new io::FileInputStream(fd);
25 closeFDOnDelete =
true;
31 if (bucketHeader)
delete bucketHeader;
33 LZ4F_freeDecompressionContext(dctxPtr);
36 if (closeFDOnDelete) close(fd);
40 if (event)
event->
Clear();
42 while (!bucketHeader || bucketIndex >= bucketHeader->nevents()) {
43 if (bucketHeader) bucketIndex -= bucketHeader->nevents();
45 if (!bucketHeader)
return NULL;
48 if (!event)
event =
new Event();
49 event->metadata = metadata;
51 if (bucket->BytesRemaining() == 0) readBucket();
52 readFromBucket(event);
60 if (!data)
return false;
62 while (!bucketHeader || bucketIndex >= bucketHeader->nevents()) {
63 if (bucketHeader) bucketIndex -= bucketHeader->nevents();
65 if (!bucketHeader)
return false;
68 if (bucket->BytesRemaining() == 0) readBucket();
75 uint64_t nSkipped = 0;
77 uint64_t startIndex = bucketIndex;
78 bucketIndex += nEvents;
79 while (!bucketHeader || bucketIndex >= bucketHeader->nevents()) {
81 uint64_t nBucketEvents = bucketHeader->nevents();
82 bucketIndex -= nBucketEvents;
83 nSkipped += nBucketEvents - startIndex;
84 if (nBucketEvents > 0 && bucket->BytesRemaining() == 0)
85 if (!fileStream->Skip(bucketHeader->bucketsize()))
throw ioError;
88 if (!bucketHeader)
return nSkipped;
91 nSkipped += bucketIndex - startIndex;
98 if (lseek(fd, 0, SEEK_SET) == -1)
throw seekError;
99 fileStream =
new io::FileInputStream(fd);
105 void Reader::initBucket() {
108 bucketEventsRead = 0;
110 LZ4F_createDecompressionContext(&dctxPtr, LZ4F_VERSION);
114 void Reader::readFromBucket(
Event *event) {
115 auto stream =
new io::CodedInputStream(bucket);
117 while (bucketEventsRead <= bucketIndex) {
119 if (!stream->ReadLittleEndian32(&protoSize)) {
121 throw corruptBucketError;
124 if (event && bucketEventsRead == bucketIndex) {
125 auto eventLimit = stream->PushLimit(protoSize);
126 auto eventProto =
event->getProto();
127 if (!eventProto->MergeFromCodedStream(stream) || !stream->ConsumedEntireMessage()) {
129 throw deserializationError;
131 event->SetDescriptorPool(DescriptorPool());
132 stream->PopLimit(eventLimit);
133 }
else if (!stream->Skip(protoSize)) {
135 throw corruptBucketError;
145 void Reader::readFromBucket(std::string *data) {
146 auto stream =
new io::CodedInputStream(bucket);
148 while (bucketEventsRead <= bucketIndex) {
150 if (!stream->ReadLittleEndian32(&protoSize)) {
152 throw corruptBucketError;
155 if (data && bucketEventsRead == bucketIndex) {
156 data->resize(protoSize);
157 if (!stream->ReadString(data, protoSize)) {
159 throw corruptBucketError;
161 }
else if (!stream->Skip(protoSize)) {
163 throw corruptBucketError;
173 void Reader::readHeader() {
178 bucketEventsRead = 0;
179 compBucket->Reset(0);
182 auto stream =
new io::CodedInputStream(fileStream);
185 if (!stream->ReadLittleEndian32(&headerSize)) {
190 auto headerLimit = stream->PushLimit(headerSize);
191 bucketHeader =
new proto::BucketHeader;
192 if (!bucketHeader->MergeFromCodedStream(stream) || !stream->ConsumedEntireMessage()) {
194 throw deserializationError;
196 stream->PopLimit(headerLimit);
199 for (
auto keyValuePair : bucketHeader->metadata())
200 metadata[keyValuePair.first] = std::make_shared<std::string>(keyValuePair.second);
203 for (
const std::string &fdString : bucketHeader->filedescriptor()) {
204 FileDescriptorProto fdProto;
205 fdProto.ParseFromString(fdString);
206 descriptorPool.BuildFile(fdProto);
212 void Reader::readBucket() {
213 auto stream =
new io::CodedInputStream(fileStream);
215 uint64_t bucketSize = bucketHeader->bucketsize();
216 compBucket->Reset(bucketSize);
217 if (!stream->ReadRaw(compBucket->Bytes(), bucketSize)) {
219 throw corruptBucketError;
224 switch (bucketHeader->compression()) {
226 bucket->Reset(dctxPtr, compBucket);
230 io::GzipInputStream *gzipStream =
new io::GzipInputStream(compBucket);
231 bucket->Reset(*gzipStream);
238 compBucket = tmpBucket;
242 uint64_t Reader::syncToMagic(io::CodedInputStream *stream) {
246 while (stream->ReadRaw(&num, 1)) {
249 if (num == magicBytes[0]) {
252 for (
int i = 1; i < 16; i++) {
253 if (!stream->ReadRaw(&num, 1))
break;
256 if (num != magicBytes[i]) {
267 BucketInputStream::BucketInputStream(uint64_t size) {
273 BucketInputStream::~BucketInputStream() { ; }
275 inline bool BucketInputStream::Next(
const void **data,
int *size) {
276 *data = &bytes[offset];
277 *size = this->size - offset;
279 if (*size == 0)
return false;
283 inline void BucketInputStream::BackUp(
int count) { offset -= count; }
285 inline bool BucketInputStream::Skip(
int count) {
294 inline int64 BucketInputStream::ByteCount()
const {
return offset; }
296 uint8_t *BucketInputStream::Bytes() {
return &bytes[0]; }
298 uint64_t BucketInputStream::BytesRemaining() {
return size - offset; }
300 void BucketInputStream::Reset(uint64_t size) {
302 if (bytes.size() < size) bytes.resize(size);
306 uint64_t BucketInputStream::Reset(io::ZeroCopyInputStream &stream) {
310 while (stream.Next((
const void **)&data, &size)) {
313 if (this->size > bytes.size()) bytes.resize(this->size);
314 std::memcpy(&bytes[offset], data, size);
320 uint64_t BucketInputStream::Reset(LZ4F_dctx *dctxPtr,
BucketInputStream *compBucket) {
323 if (size == 0) Reset(minBucketWriteWindow);
326 compBucket->Next((
const void **)&srcBuffer, &srcSize);
327 int srcBytesRemaining = srcSize;
331 while (srcBytesRemaining > 0) {
332 Next((
const void **)&dstBuffer, &dstSize);
333 size_t srcSizeTmp = srcSize;
334 size_t dstSizeTmp = dstSize;
335 hint = LZ4F_decompress(dctxPtr, dstBuffer, &dstSizeTmp, srcBuffer, &srcSizeTmp, NULL);
336 if (LZ4F_isError(hint))
throw badLZ4FrameError;
337 srcBytesRemaining -= srcSizeTmp;
338 BackUp(dstSize - dstSizeTmp);
339 if (offset == size) {
340 size += minBucketWriteWindow;
343 compBucket->BackUp(srcBytesRemaining);
344 compBucket->Next((
const void **)&srcBuffer, &srcSize);
349 LZ4F_resetDecompressionContext(dctxPtr);
350 throw badLZ4FrameError;
Event * Next(Event *recycledEvent=NULL, bool metadataOnly=false)
uint64_t Skip(uint64_t nEvents)