Source code for proio.writer
import gzip
import io
import lz4.frame
import struct
import google.protobuf.descriptor_pool as descriptor_pool
import proio.proto as proto
magic_bytes = [b'\xe1',
b'\xc1',
b'\x00',
b'\x00',
b'\x00',
b'\x00',
b'\x00',
b'\x00',
b'\x00',
b'\x00',
b'\x00',
b'\x00',
b'\x00',
b'\x00',
b'\x00',
b'\x00']
[docs]class Writer(object):
"""
Writer for proio files
This class can be used with the `with` statement. A filename may be
omitted in favor of specifying `fileobj`.
:param string filename: name of output file to create or overwrite
:param fileobj: file object to write to
:example:
.. code-block:: python
with proio.Writer('output.proio') as writer:
...
"""
def __init__(self, filename = None, fileobj = None):
if filename is None:
if fileobj is not None:
self._stream_writer = fileobj
else:
self._stream_writer = io.BytesIO(b'')
else:
self._stream_writer = open(filename, 'wb')
self._close_file = True
self.bucket_dump_size = 0x1000000
self._bucket_events = 0
self._header = proto.BucketHeader()
self._bucket = io.BytesIO(b'')
self._written_fds = set()
self._metadata = {}
self.set_compression(proto.BucketHeader.GZIP)
def __enter__(self):
return self
def __exit__(self, exception_type, exception_value, traceback):
self.close()
[docs] def close(self):
"""
closes the file object assigned to the Writer. This is automatically
called at the end of a `with` statement.
"""
self.flush()
try:
if self._close_file:
self._stream_writer.close()
except AttributeError:
pass
[docs] def flush(self):
"""
flushes all buffered data to the output file object. This is
automatically called at the end of a `with` statement.
"""
if self._bucket_events == 0:
return
if self._comp == proto.BucketHeader.LZ4:
bucket_bytes = lz4.frame.compress(self._bucket.getvalue())
elif self._comp == proto.BucketHeader.GZIP:
bucket_compressed = io.BytesIO(b'')
with gzip.GzipFile(fileobj = bucket_compressed, mode = 'wb') as writer:
writer.write(self._bucket.getvalue())
bucket_bytes = bucket_compressed.getvalue()
else:
bucket_bytes = self._bucket.getvalue()
self._bucket.seek(0, 0)
self._bucket.truncate(0)
self._header.nEvents = self._bucket_events
self._header.bucketSize = len(bucket_bytes)
self._header.compression = self._comp
header_buf = self._header.SerializeToString()
header_size = struct.pack("I", len(header_buf))
for magic_byte in magic_bytes:
self._stream_writer.write(magic_byte)
self._stream_writer.write(header_size)
self._stream_writer.write(header_buf)
self._stream_writer.write(bucket_bytes)
self._bucket_events = 0
self._header = proto.BucketHeader()
[docs] def set_compression(self, comp):
"""
sets the compression type to use for future output buckets.
:param comp: can be one of :attr:`proio.LZ4`, :attr:`proio.GZIP`, or
:attr:`proio.UNCOMPRESSED`
"""
self._comp = comp
[docs] def push(self, event):
"""
takes an event and serializes it into the output bucket.
:param Event event: event to serialize to output
"""
for key, value in event.metadata.items():
if key not in self._metadata or self._metadata[key] != value:
self.push_metadata(key, value)
self._metadata[key] = value
event._flush_cache()
proto_buf = event._proto.SerializeToString()
# add new protobuf FileDescriptors to the stream that are required to
# describe event data
new_fds = set()
def add_fds_to_set(fd):
for dep in fd.dependencies:
add_fds_to_set(dep)
if fd not in self._written_fds:
new_fds.add(fd)
self._written_fds.add(fd)
for _, type_string in event._proto.type.items():
add_fds_to_set(descriptor_pool.Default().FindMessageTypeByName(type_string).file)
if len(new_fds) > 0:
self.flush()
for fd in new_fds:
self._header.fileDescriptor.append(fd.serialized_pb)
proto_size = struct.pack("I", len(proto_buf))
self._bucket.write(proto_size)
self._bucket.write(proto_buf)
self._bucket_events += 1
bucket_length = len(self._bucket.getvalue())
if bucket_length > self.bucket_dump_size:
self.flush()