Source code for rdflib.tools.chunk_serializer
"""
This file provides a single function `serialize_in_chunks()` which can serialize a
Graph into a number of NT files with a maximum number of triples or maximum file size.
There is an option to preserve any prefixes declared for the original graph in the first
file, which will be a Turtle file.
"""
from contextlib import ExitStack, contextmanager
from pathlib import Path
from typing import TYPE_CHECKING, BinaryIO, Generator, Optional, Tuple
from rdflib.graph import Graph
from rdflib.plugins.serializers.nt import _nt_row
# from rdflib.term import Literal
# if TYPE_CHECKING:
# from rdflib.graph import _TriplePatternType
__all__ = ["serialize_in_chunks"]
[docs]def serialize_in_chunks(
g: Graph,
max_triples: int = 10000,
max_file_size_kb: Optional[int] = None,
file_name_stem: str = "chunk",
output_dir: Optional[Path] = None,
write_prefixes: bool = False,
) -> None:
"""
Serializes a given Graph into a series of n-triples with a given length.
:param g:
The graph to serialize.
:param max_file_size_kb:
Maximum size per NT file in kB (1,000 bytes)
Equivalent to ~6,000 triples, depending on Literal sizes.
:param max_triples:
Maximum size per NT file in triples
Equivalent to lines in file.
If both this parameter and max_file_size_kb are set, max_file_size_kb will be used.
:param file_name_stem:
Prefix of each file name.
e.g. "chunk" = chunk_000001.nt, chunk_000002.nt...
:param output_dir:
The directory you want the files to be written to.
:param write_prefixes:
The first file created is a Turtle file containing original graph prefixes.
See ``../test/test_tools/test_chunk_serializer.py`` for examples of this in use.
"""
if output_dir is None:
output_dir = Path.cwd()
if not output_dir.is_dir():
raise ValueError(
"If you specify an output_dir, it must actually be a directory!"
)
@contextmanager
def _start_new_file(file_no: int) -> Generator[Tuple[Path, BinaryIO], None, None]:
if TYPE_CHECKING:
# this is here because mypy gets a bit confused
assert output_dir is not None
fp = Path(output_dir) / f"{file_name_stem}_{str(file_no).zfill(6)}.nt"
with open(fp, "wb") as fh:
yield fp, fh
def _serialize_prefixes(g: Graph) -> str:
pres = []
for k, v in g.namespace_manager.namespaces():
pres.append(f"PREFIX {k}: <{v}>")
return "\n".join(sorted(pres)) + "\n"
if write_prefixes:
with open(
Path(output_dir) / f"{file_name_stem}_000000.ttl", "w", encoding="utf-8"
) as fh:
fh.write(_serialize_prefixes(g))
bytes_written = 0
with ExitStack() as xstack:
if max_file_size_kb is not None:
max_file_size = max_file_size_kb * 1000
file_no = 1 if write_prefixes else 0
for i, t in enumerate(g.triples((None, None, None))):
row_bytes = _nt_row(t).encode("utf-8")
if len(row_bytes) > max_file_size:
raise ValueError(
# type error: Unsupported operand types for / ("bytes" and "int")
f"cannot write triple {t!r} as it's serialized size of {row_bytes / 1000} exceeds max_file_size_kb = {max_file_size_kb}" # type: ignore[operator]
)
if i == 0:
fp, fhb = xstack.enter_context(_start_new_file(file_no))
bytes_written = 0
elif (bytes_written + len(row_bytes)) >= max_file_size:
file_no += 1
fp, fhb = xstack.enter_context(_start_new_file(file_no))
bytes_written = 0
bytes_written += fhb.write(row_bytes)
else:
# count the triples in the graph
graph_length = len(g)
if graph_length <= max_triples:
# the graph is less than max so just NT serialize the whole thing
g.serialize(
destination=Path(output_dir) / f"{file_name_stem}_all.nt",
format="nt",
)
else:
# graph_length is > max_lines, make enough files for all graph
# no_files = math.ceil(graph_length / max_triples)
file_no = 1 if write_prefixes else 0
for i, t in enumerate(g.triples((None, None, None))):
if i % max_triples == 0:
fp, fhb = xstack.enter_context(_start_new_file(file_no))
file_no += 1
fhb.write(_nt_row(t).encode("utf-8"))
return