Source code for proio.reader

import gzip
import io
import lz4.frame
import struct
import sys

import google.protobuf.descriptor_pool as descriptor_pool

from .event import Event
import proio.proto as proto
from .writer import magic_bytes

[docs]class Reader(object): """ Reader for proio files This class can be used with the `with` statement, and it also may be used as an iterator that sequentially iterates all events. A filename may be omitted in favor of specifying `fileobj`. :param string filename: name of input file to read :param fileobj: file object to read from :example: .. code-block:: python with proio.Reader('input.proio') as reader: for event in reader: ... """ def __init__(self, filename = None, fileobj = None): if filename is None: if fileobj is not None: self._stream_reader = fileobj else: self._stream_reader = io.BytesIO(b'') else: self._stream_reader = open(filename, 'rb') self._close_file = True self._bucket_index = 0 self._bucket_header = None self._bucket_reader = None self._metadata = {} def __enter__(self): return self def __exit__(self, exception_type, exception_value, traceback): self.close() def __iter__(self): return self def __next__(self): """ :return: the next event :rtype: Event """ event = None # use skip() to ensure that we land on a non-empty bucket self.skip(0) if self._bucket_header is not None: if self._bucket_reader is None: self._read_bucket() event = self._read_from_bucket() if event is None: raise StopIteration return event if sys.version_info[0] == 2: def next(self): return self.__next__()
[docs] def close(self): """ closes the underlying input file object. """ try: if self._close_file: self._stream_reader.close() except AttributeError: pass
[docs] def skip(self, n_events): """ skips the next `n_events` events. :param int n_events: number of events to skip :return: number of events skipped :rtype: int """ n_skipped = 0 start_index = self._bucket_index self._bucket_index += n_events while self._bucket_header is None or self._bucket_index >= self._bucket_header.nEvents: if self._bucket_header is not None: n_bucket_events = self._bucket_header.nEvents self._bucket_index -= n_bucket_events n_skipped += n_bucket_events - start_index # skip the bucket bytes on the stream if they haven't been read # into memory already if n_bucket_events > 0 and self._bucket_reader is None: try: self._stream_reader.seek(self._bucket_header.bucketSize, 1) except OSError: self._stream_reader.read(self._bucket_header.bucketSize) self._read_header() if self._bucket_header is None: return n_skipped start_index = 0 n_skipped += self._bucket_index - start_index return n_skipped
[docs] def seek_to_start(self): """ seeks, if possible, to the start of the input file object. This can be used along with :func:`skip` to directly access events. :return: success :rtype: boolean """ if self._stream_reader.seekable(): self._stream_reader.seek(0, 0) self._metadata = {} self._bucket_index = 0 self._read_header() return True return False
def _read_header(self): self._bucket_evts_read = 0 self._bucket_reader = None self._bucket_header = None n = self._sync_to_magic() if n < len(magic_bytes): return header_size = struct.unpack("I", self._stream_reader.read(4))[0] header_string = self._stream_reader.read(header_size) if len(header_string) != header_size: return self._bucket_header = proto.BucketHeader.FromString(header_string) # set metadata for future events for key, value in self._bucket_header.metadata.items(): self._metadata[key] = value # add descriptors to pool for fd_bytes in self._bucket_header.fileDescriptor: try: descriptor_pool.Default().AddSerializedFile(fd_bytes) except TypeError: # ignore cases where types were already defined by another # FileDescriptorProto pass def _read_bucket(self): bucket = self._stream_reader.read(self._bucket_header.bucketSize) if len(bucket) != self._bucket_header.bucketSize: raise EOFError if self._bucket_header.compression == proto.BucketHeader.GZIP: self._bucket_reader = gzip.GzipFile(fileobj = io.BytesIO(bucket), mode = 'rb') elif self._bucket_header.compression == proto.BucketHeader.LZ4: try: uncomp_bytes, _ = lz4.frame.decompress(bucket) except ValueError: uncomp_bytes = lz4.frame.decompress(bucket) self._bucket_reader = io.BytesIO(uncomp_bytes) elif self._bucket_header.compression == proto.BucketHeader.NONE: self._bucket_reader = io.BytesIO(bucket) else: raise UnknownBucketCompError def _read_from_bucket(self): event = None while self._bucket_evts_read <= self._bucket_index: proto_size_buf = self._bucket_reader.read(4) if len(proto_size_buf) != 4: raise CorruptBucketError('Unable to read event size') proto_size = struct.unpack("I", proto_size_buf)[0] proto_buf = self._bucket_reader.read(proto_size) if len(proto_buf) != proto_size: raise CorruptBucketError('Unable to read event data') if self._bucket_evts_read == self._bucket_index: event_proto = proto.Event.FromString(proto_buf) event = Event(proto_obj = event_proto) for key, value in self._metadata.items(): event.metadata[key] = value self._bucket_evts_read += 1 self._bucket_index += 1 return event def _sync_to_magic(self): n_read = 0 while True: magic_byte = self._stream_reader.read(1) if len(magic_byte) != 1: return -1 n_read += 1 if magic_byte == magic_bytes[0]: goodSeq = True for i in range(1, len(magic_bytes)): magic_byte = self._stream_reader.read(1) if len(magic_byte) != 1: return -1 n_read += 1 if magic_byte != magic_bytes[i]: goodSeq = False break if goodSeq: break return n_read
class UnknownBucketCompError(Exception): """ raised when a bucket is compressed with an uknown type """ pass class CorruptBucketError(Exception): """ raised when there is a problem reading from a bucket """ pass