"""
IO for binary files (:mod:`fluiddyn.io.binary`)
===============================================
.. currentmodule:: fluiddyn.io.binary
Provides the class :class:`BinFile`.
.. autoclass:: BinFile
:members:
"""
import io as _io
import struct
import zlib
def chunks(l, n):
"""Yield successive n-sized chunks from l."""
for i in range(0, len(l), n):
yield l[i : i + n]
def _code_byte_order_from_str(byteorder=None):
if byteorder is None:
return "=" # native
elif byteorder.startswith("little"):
return "<" # little-endian
elif byteorder.startswith("big"):
return ">" # big-endian
else:
raise ValueError("byteorder should start with little or big.")
[docs]
class BinFile(_io.FileIO):
dcodetypes = {
"s": "s",
"B": "B",
"uint8": "B",
"H": "H",
"uint16": "H",
"I": "I",
"uint32": "I",
"f": "f",
"float32": "f",
"d": "d",
"float64": "d",
"q": "q",
"int64": "q",
}
keys_types = list(dcodetypes.keys())
def __init__(self, file_path, mode="rb", byteorder=None):
if "b" not in mode:
mode += "b"
super().__init__(file_path, mode)
self.code_byte_order = _code_byte_order_from_str(byteorder)
[docs]
def readt(self, nb_values, codetype, byteorder=None):
"""Read some values coded in a particular type."""
if codetype == "s":
fmt = f"{nb_values:d}s"
return struct.unpack(fmt, self.read(nb_values))[0].rstrip()
elif codetype in self.keys_types:
if byteorder is None:
code_byte_order = self.code_byte_order
else:
code_byte_order = _code_byte_order_from_str(byteorder)
fmt = code_byte_order + f"{nb_values:d}" + self.dcodetypes[codetype]
nb_bytes = struct.calcsize(fmt)
raw = self.read(nb_bytes)
if len(raw) != nb_bytes:
return "eof" # end of file
else:
t_result = struct.unpack(fmt, raw)
if len(t_result) == 1:
return t_result[0]
else:
return t_result
else:
raise ValueError("Value of codetype not yet implemented")
[docs]
def readt_zlib(self, nb_bytes, nb_values, codetype):
"""Read some value encoded with zlib."""
if codetype in self.keys_types:
fmt = f"={nb_values:d}" + self.dcodetypes[codetype]
nb_bytes_decompressed = struct.calcsize(fmt)
raw = self.read(nb_bytes)
raw_decompressed = zlib.decompress(raw)
if len(raw_decompressed) != nb_bytes_decompressed:
return "eof" # end of file
else:
t_result = struct.unpack(fmt, raw_decompressed)
if len(t_result) == 1:
return t_result[0]
else:
return t_result
else:
raise ValueError("Value of codetype not yet implemented")
def write_as(
self, to_be_saved, codetype="s", byteorder=None, buffersize=1000
):
if codetype == "s":
if not isinstance(to_be_saved, bytes):
to_be_saved = to_be_saved.encode()
self.write(to_be_saved)
elif codetype in self.keys_types:
if byteorder is None:
code_byte_order = self.code_byte_order
else:
code_byte_order = _code_byte_order_from_str(byteorder)
if not hasattr(to_be_saved, "__len__"):
fmt = code_byte_order + self.dcodetypes[codetype]
raw = struct.pack(fmt, to_be_saved)
self.write(raw)
else:
fmt_to_be_formated = (
code_byte_order + "{0:d}" + self.dcodetypes[codetype]
)
self._write_ndarray_with_buffer(
to_be_saved, fmt_to_be_formated, buffersize=buffersize
)
# raw = struct.pack(fmt, *to_be_saved)
def _write_ndarray_with_buffer(
self, to_be_saved, fmt_to_be_formated, buffersize=1000
):
nb_values = len(to_be_saved)
if nb_values < buffersize:
fmt = fmt_to_be_formated.format(nb_values)
raw = struct.pack(fmt, *to_be_saved)
self.write(raw)
else:
for small_arr in chunks(to_be_saved, buffersize):
fmt = fmt_to_be_formated.format(len(small_arr))
raw = struct.pack(fmt, *small_arr)
self.write(raw)
if __name__ == "__main__":
import os
import numpy as np
path_test_file = os.path.expanduser("/tmp/test_file.bin")
with BinFile(path_test_file, "w") as f:
f.write_as("poum", buffersize=1)
f.write_as([1, 3.55], "I", buffersize=1)
f.write_as(np.array([1.0, 1.5]), "float64", buffersize=1)
with BinFile(path_test_file) as f:
s = f.readt(4, "s")
l = f.readt(2, "I")
a = f.readt(2, "float64")
print(s, l, a)