class Dump:
"""Create a new instance of the :py:class:`~pgdumplib.dump.Dump` class
Once created, the instance of :py:class:`~pgdumplib.dump.Dump` can
be used to read existing dumps or to create new ones.
:param str dbname: The database name for the dump (Default: ``pgdumplib``)
:param str encoding: The data encoding (Default: ``UTF8``)
:param converter: The data converter class to use
(Default: :py:class:`pgdumplib.converters.DataConverter`)
"""
def __init__(
self,
dbname: str = 'pgdumplib',
encoding: str = 'UTF8',
converter: typing.Any = None,
appear_as: str = '12.0',
):
self.compression_algorithm = constants.COMPRESSION_NONE
self.dbname = dbname
self.dump_version = VERSION_INFO.format(appear_as, version)
self.encoding = encoding
self.entries = [
models.Entry(
dump_id=1,
tag=constants.ENCODING,
desc=constants.ENCODING,
defn=f"SET client_encoding = '{self.encoding}';\n",
),
models.Entry(
dump_id=2,
tag='STDSTRINGS',
desc='STDSTRINGS',
defn="SET standard_conforming_strings = 'on';\n",
),
models.Entry(
dump_id=3,
tag='SEARCHPATH',
desc='SEARCHPATH',
defn='SELECT pg_catalog.set_config('
"'search_path', '', false);\n",
),
]
self.server_version = self.dump_version
self.timestamp = datetime.datetime.now(tz=datetime.UTC)
converter = converter or converters.DataConverter
self._converter: converters.DataConverter = converter()
self._format: str = 'Custom'
self._handle: io.BufferedReader | io.BufferedWriter | None = None
self._intsize: int = 4
self._offsize: int = 8
self._temp_dir = tempfile.TemporaryDirectory()
parts = tuple(int(v) for v in appear_as.split('.'))
if len(parts) < 2:
raise ValueError(f'Invalid appear_as version: {appear_as}')
k_version = self._get_k_version(parts)
self._vmaj: int = k_version[0]
self._vmin: int = k_version[1]
self._vrev: int = k_version[2]
self._writers: dict[int, TableData] = {}
def __repr__(self) -> str:
return (
f'<Dump format={self._format!r} '
f'timestamp={self.timestamp.isoformat()!r} '
f'entry_count={len(self.entries)!r}>'
)
def add_entry(
self,
desc: str,
namespace: str | None = None,
tag: str | None = None,
owner: str | None = None,
defn: str | None = None,
drop_stmt: str | None = None,
copy_stmt: str | None = None,
dependencies: list[int] | None = None,
tablespace: str | None = None,
tableam: str | None = None,
dump_id: int | None = None,
) -> models.Entry:
"""Add an entry to the dump
A :py:exc:`ValueError` will be raised if `desc` is not value that
is known in :py:module:`pgdumplib.constants`.
The section is
When adding data, use :py:meth:`~Dump.table_data_writer` instead of
invoking :py:meth:`~Dump.add_entry` directly.
If ``dependencies`` are specified, they will be validated and if a
``dump_id`` is specified and no entry is found with that ``dump_id``,
a :py:exc:`ValueError` will be raised.
Other omitted values will be set to the default values will be set to
the defaults specified in the :py:class:`pgdumplib.dump.Entry`
class.
The ``dump_id`` will be auto-calculated based upon the existing entries
if it is not specified.
.. note:: The creation of ad-hoc blobs is not supported.
:param str desc: The entry description
:param str namespace: The namespace of the entry
:param str tag: The name/table/relation/etc of the entry
:param str owner: The owner of the object in Postgres
:param str defn: The DDL definition for the entry
:param drop_stmt: A drop statement used to drop the entry before
:param copy_stmt: A copy statement used when there is a corresponding
data section.
:param list dependencies: A list of dump_ids of objects that the entry
is dependent upon.
:param str tablespace: The tablespace to use
:param str tableam: The table access method
:param int dump_id: The dump id, will be auto-calculated if left empty
:raises: :py:exc:`ValueError`
:rtype: pgdumplib.dump.Entry
"""
if desc not in constants.SECTION_MAPPING:
raise ValueError(f'Invalid desc: {desc}')
if dump_id is not None and dump_id < 1:
raise ValueError('dump_id must be greater than 1')
dump_ids = [e.dump_id for e in self.entries]
if dump_id and dump_id in dump_ids:
raise ValueError('dump_id {!r} is already assigned', dump_id)
for dependency in dependencies or []:
if dependency not in dump_ids:
raise ValueError(
f'Dependency dump_id {dependency!r} not found'
)
self.entries.append(
models.Entry(
dump_id=dump_id or self._next_dump_id(),
had_dumper=False,
table_oid='',
oid='',
tag=tag or '',
desc=desc,
defn=defn or '',
drop_stmt=drop_stmt or '',
copy_stmt=copy_stmt or '',
namespace=namespace or '',
tablespace=tablespace or None,
tableam=tableam or None,
relkind=None,
owner=owner or '',
with_oids=False,
dependencies=dependencies or [],
)
)
return self.entries[-1]
def blobs(self) -> typing.Generator[tuple[int, bytes], None, None]:
"""Iterator that returns each blob in the dump
:rtype: tuple(int, bytes)
"""
def read_oid(fd: io.BufferedReader) -> int | None:
"""Small helper function to deduplicate code"""
try:
return struct.unpack('I', fd.read(4))[0]
except struct.error:
return None
for entry in self._data_entries:
if entry.desc == constants.BLOBS:
with self._tempfile(entry.dump_id, 'rb') as handle:
oid: int | None = read_oid(handle)
while oid:
length: int = struct.unpack('I', handle.read(4))[0]
yield oid, handle.read(length)
oid = read_oid(handle)
def get_entry(self, dump_id: int) -> models.Entry | None:
"""Return the entry for the given `dump_id`
:param int dump_id: The dump ID of the entry to return.
"""
for entry in self.entries:
if entry.dump_id == dump_id:
return entry
return None
def load(self, path: str | os.PathLike) -> typing.Self:
"""Load the Dumpfile, including extracting all data into a temporary
directory
:param os.PathLike path: The path of the dump to load
:raises: :py:exc:`RuntimeError`
:raises: :py:exc:`ValueError`
"""
if not pathlib.Path(path).exists():
raise ValueError(f'Path {path!r} does not exist')
LOGGER.debug('Loading dump file from %s', path)
self.entries = [] # Wipe out pre-existing entries
self._handle = open(path, 'rb')
self._read_header()
if not constants.MIN_VER <= self.version <= constants.MAX_VER:
raise ValueError(
'Unsupported backup version: {}.{}.{}'.format(*self.version)
)
if self.version >= (1, 15, 0):
self.compression_algorithm = constants.COMPRESSION_ALGORITHMS[
self._compression_algorithm
]
if (
self.compression_algorithm
not in constants.SUPPORTED_COMPRESSION_ALGORITHMS
):
raise ValueError(
'Unsupported compression algorithm: {}'.format(
*self.compression_algorithm
)
)
else:
self.compression_algorithm = (
constants.COMPRESSION_GZIP
if self._read_int() != 0
else constants.COMPRESSION_NONE
)
self.timestamp = self._read_timestamp()
self.dbname = self._read_bytes().decode(self.encoding)
self.server_version = self._read_bytes().decode(self.encoding)
self.dump_version = self._read_bytes().decode(self.encoding)
self._read_entries()
self._set_encoding()
# Cache table data and blobs
_last_pos = self._handle.tell()
for entry in self._data_entries:
if entry.data_state == constants.K_OFFSET_NO_DATA:
continue
elif entry.data_state != constants.K_OFFSET_POS_SET:
raise RuntimeError('Unsupported data format')
self._handle.seek(entry.offset, io.SEEK_SET)
block_type, dump_id = self._read_block_header()
if not dump_id or dump_id != entry.dump_id:
raise RuntimeError(
f'Dump IDs do not match ({dump_id} != {entry.dump_id}'
)
if block_type == constants.BLK_DATA:
self._cache_table_data(dump_id)
elif block_type == constants.BLK_BLOBS:
self._cache_blobs(dump_id)
else:
raise RuntimeError(f'Unknown block type: {block_type!r}')
return self
def lookup_entry(
self, desc: str, namespace: str, tag: str
) -> models.Entry | None:
"""Return the entry for the given namespace and tag
:param str desc: The desc / object type of the entry
:param str namespace: The namespace of the entry
:param str tag: The tag/relation/table name
:raises: :py:exc:`ValueError`
:rtype: pgdumplib.dump.Entry or None
"""
if desc not in constants.SECTION_MAPPING:
raise ValueError(f'Invalid desc: {desc}')
for entry in [e for e in self.entries if e.desc == desc]:
if entry.namespace == namespace and entry.tag == tag:
return entry
return None
def save(self, path: str | os.PathLike) -> None:
"""Save the Dump file to the specified path
:param path: The path to save the dump to
:type path: str or os.PathLike
"""
if self._handle is not None and not self._handle.closed:
self._handle.close()
self.compression_algorithm = constants.COMPRESSION_NONE
self._handle = open(path, 'wb')
self._save()
self._handle.close()
def table_data(
self, namespace: str, table: str
) -> typing.Generator[str | tuple[typing.Any, ...], None, None]:
"""Iterator that returns data for the given namespace and table
:param str namespace: The namespace/schema for the table
:param str table: The table name
:raises: :py:exc:`pgdumplib.exceptions.EntityNotFoundError`
"""
for entry in self._data_entries:
if entry.namespace == namespace and entry.tag == table:
for row in self._read_table_data(entry.dump_id):
yield self._converter.convert(row)
return
raise exceptions.EntityNotFoundError(namespace=namespace, table=table)
@contextlib.contextmanager
def table_data_writer(
self, entry: models.Entry, columns: abc.Sequence
) -> typing.Generator[TableData, None, None]:
"""A context manager that is used to return a
:py:class:`~pgdumplib.dump.TableData` instance, which can be used
to add table data to the dump.
When invoked for a given entry containing the table definition,
:param Entry entry: The entry for the table to add data for
:param columns: The ordered list of table columns
:type columns: list or tuple
:rtype: TableData
"""
if entry.dump_id not in self._writers.keys():
dump_id = self._next_dump_id()
self.entries.append(
models.Entry(
dump_id=dump_id,
had_dumper=True,
tag=entry.tag,
desc=constants.TABLE_DATA,
copy_stmt='COPY {}.{} ({}) FROM stdin;'.format(
entry.namespace, entry.tag, ', '.join(columns)
),
namespace=entry.namespace,
owner=entry.owner,
dependencies=[entry.dump_id],
data_state=constants.K_OFFSET_POS_NOT_SET,
)
)
self._writers[entry.dump_id] = TableData(
dump_id, self._temp_dir.name, self.encoding
)
yield self._writers[entry.dump_id]
return None
@property
def version(self) -> tuple[int, int, int]:
"""Return the version as a tuple to make version comparisons easier.
:rtype: tuple
"""
return self._vmaj, self._vmin, self._vrev
def _cache_blobs(self, dump_id: int) -> None:
"""Create a temp cache file for blob data
:param int dump_id: The dump ID for the filename
"""
count = 0
with self._tempfile(dump_id, 'wb') as handle:
for oid, blob in self._read_blobs():
handle.write(struct.pack('I', oid))
handle.write(struct.pack('I', len(blob)))
handle.write(blob)
count += 1
def _cache_table_data(self, dump_id: int) -> None:
"""Create a temp cache file for the table data
:param int dump_id: The dump ID for the filename
"""
with self._tempfile(dump_id, 'wb') as handle:
handle.write(self._read_data())
@property
def _data_entries(self) -> list[models.Entry]:
"""Return the list of entries that are in the data section
:rtype: list
"""
return [e for e in self.entries if e.section == constants.SECTION_DATA]
@staticmethod
def _get_k_version(appear_as: tuple[int, ...]) -> tuple[int, int, int]:
for (min_ver, max_ver), value in constants.K_VERSION_MAP.items():
if min_ver <= appear_as <= max_ver:
return value
raise RuntimeError(f'Unsupported PostgreSQL version: {appear_as}')
def _next_dump_id(self) -> int:
"""Get the next ``dump_id`` that is available for adding an entry
:rtype: int
"""
return max(e.dump_id for e in self.entries) + 1
def _read_blobs(self) -> typing.Generator[tuple[int, bytes], None, None]:
"""Read blobs, returning a tuple of the blob ID and the blob data
:rtype: (int, bytes)
:raises: :exc:`RuntimeError`
"""
oid = self._read_int()
while oid is not None and oid > 0:
data = self._read_data()
yield oid, data
oid = self._read_int()
if oid == 0:
oid = self._read_int()
def _read_block_header(self) -> tuple[bytes, int | None]:
"""Read the block header in
:rtype: bytes, int
"""
if self._handle is None:
raise ValueError('File handle is not initialized')
return self._handle.read(1), self._read_int()
def _read_byte(self) -> int | None:
"""Read in an individual byte
:rtype: int
"""
if self._handle is None:
raise ValueError('File handle is not initialized')
try:
return struct.unpack('B', self._handle.read(1))[0]
except struct.error:
return None
def _read_bytes(self) -> bytes:
"""Read in a byte stream
:rtype: bytes
"""
if self._handle is None:
raise ValueError('File handle is not initialized')
length = self._read_int()
if length and length > 0:
value = self._handle.read(length)
return value
return b''
def _read_data(self) -> bytes:
"""Read a data block, returning the bytes.
:rtype: bytes
"""
if self.compression_algorithm != constants.COMPRESSION_NONE:
return self._read_data_compressed()
return self._read_data_uncompressed()
def _read_data_compressed(self) -> bytes:
"""Read a compressed data block
:rtype: bytes
"""
if self._handle is None:
raise ValueError('File handle is not initialized')
buffer = io.BytesIO()
chunk = b''
decompress = zlib.decompressobj()
while True:
chunk_size = self._read_int()
if not chunk_size: # pragma: nocover
break
chunk += self._handle.read(chunk_size)
buffer.write(decompress.decompress(chunk))
chunk = decompress.unconsumed_tail
if chunk_size < constants.ZLIB_IN_SIZE:
break
return buffer.getvalue()
def _read_data_uncompressed(self) -> bytes:
"""Read an uncompressed data block
:rtype: bytes
"""
if self._handle is None:
raise ValueError('File handle is not initialized')
buffer = io.BytesIO()
while True:
block_length = self._read_int()
if not block_length or block_length <= 0:
break
buffer.write(self._handle.read(block_length))
return buffer.getvalue()
def _read_dependencies(self) -> list[int]:
"""Read in the dependencies for an entry.
:rtype: list
"""
values = set({})
while True:
value = self._read_bytes()
if not value:
break
values.add(int(value))
return sorted(values)
def _read_entries(self) -> None:
"""Read in all of the entries"""
for _i in range(0, self._read_int() or 0):
self._read_entry()
def _read_entry(self) -> None:
"""Read in an individual entry and append it to the entries stack"""
dump_id = self._read_int()
if dump_id is None:
raise ValueError('dump_id cannot be None')
had_dumper = bool(self._read_int())
table_oid = self._read_bytes().decode(self.encoding)
oid = self._read_bytes().decode(self.encoding)
tag = self._read_bytes().decode(self.encoding)
desc = self._read_bytes().decode(self.encoding)
self._read_int() # Section is mapped, no need to assign
defn = self._read_bytes().decode(self.encoding)
drop_stmt = self._read_bytes().decode(self.encoding)
copy_stmt = self._read_bytes().decode(self.encoding)
namespace = self._read_bytes().decode(self.encoding)
tablespace = self._read_bytes().decode(self.encoding)
# Normalize empty strings to None for consistency
tablespace = tablespace if tablespace else None
if self.version >= (1, 14, 0):
tableam = self._read_bytes().decode(self.encoding)
# Normalize empty strings to None to prevent invalid SQL
# generation (e.g., SET default_table_access_method = "";)
tableam = tableam if tableam else None
else:
tableam = None
if self.version >= (1, 16, 0):
relkind_val = self._read_int()
relkind = chr(relkind_val) if relkind_val else None
else:
relkind = None
owner = self._read_bytes().decode(self.encoding)
with_oids = self._read_bytes() == b'true'
dependencies = self._read_dependencies()
data_state, offset = self._read_offset()
self.entries.append(
models.Entry(
dump_id=dump_id,
had_dumper=had_dumper,
table_oid=table_oid,
oid=oid,
tag=tag,
desc=desc,
defn=defn,
drop_stmt=drop_stmt,
copy_stmt=copy_stmt,
namespace=namespace,
tablespace=tablespace,
tableam=tableam,
relkind=relkind,
owner=owner,
with_oids=with_oids,
dependencies=dependencies,
data_state=data_state or 0,
offset=offset or 0,
)
)
def _read_header(self) -> None:
"""Read in the dump header
:raises: ValueError
"""
if self._handle is None:
raise ValueError('File handle is not initialized')
magic_bytes = self._handle.read(5)
if magic_bytes != constants.MAGIC:
# Provide helpful error messages based on file content
error_msg = (
'Invalid archive header. '
'pgdumplib only supports custom format dumps '
'created with pg_dump -Fc'
)
try:
# Try to detect plain SQL files
file_start = magic_bytes.decode('ascii', errors='ignore')
if file_start.startswith(('--', '/*', 'SE', 'CR', 'IN', 'DR')):
error_msg = (
'This appears to be a plain SQL text file. '
'pgdumplib only supports custom format dumps '
'created with pg_dump -Fc'
)
elif len(file_start) == 0 or not file_start.isprintable():
error_msg = (
'Invalid archive format. '
'pgdumplib only supports custom format dumps '
'created with pg_dump -Fc'
)
except (UnicodeDecodeError, AttributeError):
# Ignore errors from decode or isprintable on invalid data
pass
raise ValueError(error_msg)
self._vmaj = struct.unpack('B', self._handle.read(1))[0]
self._vmin = struct.unpack('B', self._handle.read(1))[0]
self._vrev = struct.unpack('B', self._handle.read(1))[0]
self._intsize = struct.unpack('B', self._handle.read(1))[0]
self._offsize = struct.unpack('B', self._handle.read(1))[0]
self._format = constants.FORMATS[
struct.unpack('B', self._handle.read(1))[0]
]
LOGGER.debug(
'Archive version %i.%i.%i', self._vmaj, self._vmin, self._vrev
)
# v1.15+ has compression_spec.algorithm byte
if (self._vmaj, self._vmin, self._vrev) >= (1, 15, 0):
self._compression_algorithm = struct.unpack(
'B', self._handle.read(1)
)[0]
def _read_int(self) -> int | None:
"""Read in a signed integer
:rtype: int or None
"""
sign = self._read_byte()
if sign is None:
return None
bs, bv, value = 0, 0, 0
for _offset in range(0, self._intsize):
bv = (self._read_byte() or 0) & 0xFF
if bv != 0:
value += bv << bs
bs += 8
return -value if sign else value
def _read_offset(self) -> tuple[int, int]:
"""Read in the value for the length of the data stored in the file
:rtype: int, int
"""
data_state = self._read_byte() or 0
value = 0
for offset in range(0, self._offsize):
bv = self._read_byte() or 0
value |= bv << (offset * 8)
return data_state, value
def _read_table_data(
self, dump_id: int
) -> typing.Generator[str, None, None]:
"""Iterate through the data returning on row at a time
:rtype: str
"""
try:
with self._tempfile(dump_id, 'rb') as handle:
for line in handle:
out = (line or b'').decode(self.encoding).strip()
if out.startswith('\\.') or not out:
break
yield out
except exceptions.NoDataError:
pass
def _read_timestamp(self) -> datetime.datetime:
"""Read in the timestamp from handle.
:rtype: datetime.datetime
"""
second, minute, hour, day, month, year = (
self._read_int() or 0,
self._read_int() or 0,
self._read_int() or 0,
self._read_int() or 0,
(self._read_int() or 0) + 1,
(self._read_int() or 0) + 1900,
)
self._read_int() # DST flag
return datetime.datetime(
year, month, day, hour, minute, second, 0, tzinfo=datetime.UTC
)
def _save(self) -> None:
"""Save the dump file to disk"""
self._write_toc()
self._write_entries()
if self._write_data():
self._write_toc() # Overwrite ToC and entries
self._write_entries()
def _set_encoding(self) -> None:
"""If the encoding is found in the dump entries, set the encoding
to `self.encoding`.
"""
for entry in self.entries:
if entry.desc == constants.ENCODING and entry.defn:
match = ENCODING_PATTERN.match(entry.defn)
if match:
self.encoding = match.group(1)
return
@contextlib.contextmanager
def _tempfile(
self, dump_id: int, mode: str
) -> typing.Generator[typing.Any, None, None]:
"""Open the temp file for the specified dump_id in the specified mode
:param int dump_id: The dump_id for the temp file
:param str mode: The mode (rb, wb)
"""
path = pathlib.Path(self._temp_dir.name) / f'{dump_id}.gz'
if not path.exists() and mode.startswith('r'):
raise exceptions.NoDataError()
with gzip.open(path, mode) as handle:
try:
yield handle
except Exception:
raise
def _write_blobs(self, dump_id: int) -> int:
"""Write the blobs for the entry.
:param int dump_id: The entry dump ID for the blobs
:rtype: int
"""
if self._handle is None:
raise ValueError('File handle is not initialized')
length = 0
with self._tempfile(dump_id, 'rb') as handle:
self._handle.write(constants.BLK_BLOBS)
self._write_int(dump_id)
while True:
try:
oid = struct.unpack('I', handle.read(4))[0]
except struct.error:
break
length = struct.unpack('I', handle.read(4))[0]
self._write_int(oid)
self._write_int(length)
self._handle.write(handle.read(length))
self._write_int(0)
self._write_int(0)
return length
def _write_byte(self, value: int) -> None:
"""Write a byte to the handle
:param int value: The byte value
"""
if self._handle is None:
raise ValueError('File handle is not initialized')
self._handle.write(struct.pack('B', value))
def _write_data(self) -> set[int]:
"""Write the data blocks, returning a set of IDs that were written"""
if self._handle is None:
raise ValueError('File handle is not initialized')
saved = set({})
for offset, entry in enumerate(self.entries):
if entry.section != constants.SECTION_DATA:
continue
self.entries[offset].offset = self._handle.tell()
size = 0
if entry.desc == constants.TABLE_DATA:
size = self._write_table_data(entry.dump_id)
saved.add(entry.dump_id)
elif entry.desc == constants.BLOBS:
size = self._write_blobs(entry.dump_id)
saved.add(entry.dump_id)
if size:
self.entries[offset].data_state = constants.K_OFFSET_POS_SET
return saved
def _write_entries(self) -> None:
self._write_int(len(self.entries))
saved = set({})
# Always add these entries first
for entry in self.entries[0:3]:
self._write_entry(entry)
saved.add(entry.dump_id)
saved = self._write_section(
constants.SECTION_PRE_DATA,
[
constants.GROUP,
constants.ROLE,
constants.USER,
constants.SCHEMA,
constants.EXTENSION,
constants.AGGREGATE,
constants.OPERATOR,
constants.OPERATOR_CLASS,
constants.CAST,
constants.COLLATION,
constants.CONVERSION,
constants.PROCEDURAL_LANGUAGE,
constants.FOREIGN_DATA_WRAPPER,
constants.FOREIGN_SERVER,
constants.SERVER,
constants.DOMAIN,
constants.TYPE,
constants.SHELL_TYPE,
],
saved,
)
saved = self._write_section(constants.SECTION_DATA, [], saved)
saved = self._write_section(
constants.SECTION_POST_DATA,
[
constants.CHECK_CONSTRAINT,
constants.CONSTRAINT,
constants.INDEX,
],
saved,
)
saved = self._write_section(constants.SECTION_NONE, [], saved)
LOGGER.debug('Wrote %i of %i entries', len(saved), len(self.entries))
def _write_entry(self, entry: models.Entry) -> None:
"""Write the entry
:param pgdumplib.dump.Entry entry: The entry to write
"""
LOGGER.debug('Writing %r', entry)
self._write_int(entry.dump_id)
self._write_int(int(entry.had_dumper))
self._write_str(entry.table_oid or '0')
self._write_str(entry.oid or '0')
self._write_str(entry.tag)
self._write_str(entry.desc)
self._write_int(constants.SECTIONS.index(entry.section) + 1)
self._write_str(entry.defn)
self._write_str(entry.drop_stmt)
self._write_str(entry.copy_stmt)
self._write_str(entry.namespace)
self._write_str(entry.tablespace)
if self.version >= (1, 14, 0):
LOGGER.debug('Adding tableam')
self._write_str(entry.tableam)
if self.version >= (1, 16, 0):
LOGGER.debug('Adding relkind')
# Write relkind as an int (character code)
relkind_val = ord(entry.relkind) if entry.relkind else 0
self._write_int(relkind_val)
self._write_str(entry.owner)
self._write_str('true' if entry.with_oids else 'false')
for dependency in entry.dependencies or []:
self._write_str(str(dependency))
self._write_int(-1)
self._write_offset(entry.offset, entry.data_state)
def _write_header(self) -> None:
"""Write the file header"""
if self._handle is None:
raise ValueError('File handle is not initialized')
LOGGER.debug(
'Writing archive version %i.%i.%i',
self._vmaj,
self._vmin,
self._vrev,
)
self._handle.write(constants.MAGIC)
self._write_byte(self._vmaj)
self._write_byte(self._vmin)
self._write_byte(self._vrev)
self._write_byte(self._intsize)
self._write_byte(self._offsize)
self._write_byte(constants.FORMATS.index(self._format))
# v1.15+ has compression algorithm in header
if self.version >= (1, 15, 0):
# Write compression algorithm: 0=none, 1=gzip, 2=lz4, 3=zstd
comp_alg = constants.COMPRESSION_ALGORITHMS.index(
self.compression_algorithm
)
self._write_byte(comp_alg)
def _write_int(self, value: int) -> None:
"""Write an integer value
:param int value:
"""
if self._handle is None:
raise ValueError('File handle is not initialized')
self._write_byte(1 if value < 0 else 0)
if value < 0:
value = -value
for _offset in range(0, self._intsize):
self._write_byte(value & 0xFF)
value >>= 8
def _write_offset(self, value: int, data_state: int) -> None:
"""Write the offset value.
:param int value: The value to write
:param int data_state: The data state flag
"""
self._write_byte(data_state)
for _offset in range(0, self._offsize):
self._write_byte(value & 0xFF)
value >>= 8
def _write_section(
self, section: str, obj_types: list[str], saved: set[int]
) -> set[int]:
for obj_type in obj_types:
for entry in [e for e in self.entries if e.desc == obj_type]:
self._write_entry(entry)
saved.add(entry.dump_id)
for dump_id in toposort.toposort_flatten(
{
e.dump_id: set(e.dependencies)
for e in self.entries
if e.section == section
},
True,
):
if dump_id not in saved:
found_entry: models.Entry | None = self.get_entry(dump_id)
if found_entry:
self._write_entry(found_entry)
saved.add(dump_id)
else:
LOGGER.warning('Entry %d not found, skipping', dump_id)
return saved
def _write_str(self, value: str | None) -> None:
"""Write a string or NULL marker
:param value: The string to write, or None to write -1 length
(indicating an unset/NULL field in the archive format)
"""
if self._handle is None:
raise ValueError('File handle is not initialized')
if value is None:
# Write -1 length to indicate "not set" rather than "empty string"
self._write_int(-1)
else:
out = value.encode(self.encoding)
self._write_int(len(out))
if out:
LOGGER.debug('Writing %r', out)
self._handle.write(out)
def _write_table_data(self, dump_id: int) -> int:
"""Write the blobs for the entry, returning the # of bytes written
:param int dump_id: The entry dump ID for the blobs
:rtype: int
"""
if self._handle is None:
raise ValueError('File handle is not initialized')
self._handle.write(constants.BLK_DATA)
self._write_int(dump_id)
writer = [w for w in self._writers.values() if w.dump_id == dump_id]
if writer: # Data was added ad-hoc, read from TableData writer
writer[0].finish()
# writer.read() returns decompressed data (auto-decompressed)
data = writer[0].read()
if self.compression_algorithm != constants.COMPRESSION_NONE:
# Re-compress with zlib and write in chunks
# Compress all data as a continuous stream
compressed_data = zlib.compress(data)
# Write compressed data in ZLIB_IN_SIZE chunks
total_size = 0
offset = 0
while offset < len(compressed_data):
chunk_size = min(
constants.ZLIB_IN_SIZE, len(compressed_data) - offset
)
self._write_int(chunk_size)
self._handle.write(
compressed_data[offset : offset + chunk_size]
)
total_size += chunk_size
offset += chunk_size
else:
# Write uncompressed in chunks
total_size = 0
offset = 0
while offset < len(data):
chunk_size = min(
constants.ZLIB_IN_SIZE, len(data) - offset
)
self._write_int(chunk_size)
self._handle.write(data[offset : offset + chunk_size])
total_size += chunk_size
offset += chunk_size
self._write_int(0) # End of data indicator
return total_size
# Data was cached on load - read from tempfile and write
with self._tempfile(dump_id, 'rb') as handle:
# Read all decompressed data from the gzip temp file
data = handle.read()
if self.compression_algorithm != constants.COMPRESSION_NONE:
# Compress and write in chunks
# Compress all data as a continuous stream
compressed_data = zlib.compress(data)
# Write compressed data in ZLIB_IN_SIZE chunks
total_size = 0
offset = 0
while offset < len(compressed_data):
chunk_size = min(
constants.ZLIB_IN_SIZE, len(compressed_data) - offset
)
self._write_int(chunk_size)
self._handle.write(
compressed_data[offset : offset + chunk_size]
)
total_size += chunk_size
offset += chunk_size
else:
# Write uncompressed in chunks
total_size = 0
offset = 0
while offset < len(data):
chunk_size = min(constants.ZLIB_IN_SIZE, len(data) - offset)
self._write_int(chunk_size)
self._handle.write(data[offset : offset + chunk_size])
total_size += chunk_size
offset += chunk_size
self._write_int(0) # End of data indicator
return total_size
def _write_timestamp(self, value: datetime.datetime) -> None:
"""Write a datetime.datetime value
:param datetime.datetime value: The value to write
"""
if self._handle is None:
raise ValueError('File handle is not initialized')
self._write_int(value.second)
self._write_int(value.minute)
self._write_int(value.hour)
self._write_int(value.day)
self._write_int(value.month - 1)
self._write_int(value.year - 1900)
self._write_int(1 if value.dst() else 0)
def _write_toc(self) -> None:
"""Write the ToC for the file"""
if self._handle is None:
raise ValueError('File handle is not initialized')
self._handle.seek(0)
self._write_header()
# v1.15+ has compression in header, older versions have it here
if self.version < (1, 15, 0):
self._write_int(
int(self.compression_algorithm != constants.COMPRESSION_NONE)
)
self._write_timestamp(self.timestamp)
self._write_str(self.dbname)
self._write_str(self.server_version)
self._write_str(self.dump_version)