# -*- coding: utf-8 -*-
"""
Manifest file system for efficient metadata management and file grouping in ETL pipelines.
Provides the :class:`ManifestFile` class for creating, storing, and retrieving file metadata
collections, enabling optimized batch processing and intelligent file partitioning.
"""
import json
import hashlib
import dataclasses
from .compact import T
from .constants import KeyEnum
from .model import FileSpec, DataFile, DataFileGroup, ManifestSummary
from .utils import write_parquet, read_parquet, split_s3_uri, human_size
from .grouper import group_files
if T.TYPE_CHECKING: # pragma: no cover
from mypy_boto3_s3.client import S3Client
[docs]
@dataclasses.dataclass
class ManifestFile:
"""
Core manifest file system consisting of two linked files for efficient metadata management.
**Manifest File Structure:**
A complete manifest consists of two files that work together:
1. **Manifest Summary File** (JSON): Contains aggregate metadata and references, Example::
{
"n_files": 50,
"total_size": 600_000_000, # 600 MB
"total_records": 100_000,
"uri": "s3://bucket/prefix/manifest.parquet",
"fingerprint": "2d0175ad9416dc5fd7138546471738ca"
}
2. **Manifest Data File** (Parquet): Contains detailed per-file metadata, example::
+-------------------------------+--------------+----------+----------------------------------+
| uri | size (Bytes) | n_record | Etag |
+-------------------------------+--------------+----------+----------------------------------+
| s3://bucket/prefix/file1.json | 1_000_000 | 1000 | 8a53247196e46b53699d065ba3cc8e0d |
+-------------------------------+--------------+----------+----------------------------------+
| s3://bucket/prefix/file2.json | 2_000_000 | 2000 | b3f20f3c7a8877c24504634edd067fcf |
+-------------------------------+--------------+----------+----------------------------------+
| s3://bucket/prefix/file3.json | 3_000_000 | 3000 | dd9b315f1d7ec573cb7305e6e238731f |
+-------------------------------+--------------+----------+----------------------------------+
| ... | ... | ... | ... |
+-------------------------------+--------------+----------+----------------------------------+
| ... | ... | ... | ... |
+-------------------------------+--------------+----------+----------------------------------+
| ... | ... | ... | ... |
+-------------------------------+--------------+----------+----------------------------------+
**Write Process:**
When creating a manifest, write the Manifest Summary File first, then the Manifest
Data File to S3, ensuring atomicity and consistency.
**Read Process:**
When reading a manifest, read the Manifest Summary File first to get aggregate
statistics and the URI reference, then read the Manifest Data File for detailed metadata.
**Simple Usage Examples:**
Creating and writing a manifest::
data_files = [
DataFile(uri="s3://bucket/file1.json", size=1000000, n_record=1000, etag="abc123"),
DataFile(uri="s3://bucket/file2.json", size=2000000, n_record=2000, etag="def456"),
DataFile(uri="s3://bucket/file3.json", size=3000000, n_record=3000, etag="ghi789")
]
manifest = ManifestFile.new(
uri="s3://bucket/manifest-data.parquet",
uri_summary="s3://bucket/manifest-summary.json",
data_file_list=data_files,
)
manifest.write(s3_client)
Reading a manifest::
manifest = ManifestFile.read(
uri_summary="s3://bucket/manifest-summary.json",
s3_client=s3_client,
)
print(f"Total files: {len(manifest.data_file_list)}")
print(f"Total size: {manifest.size} bytes")
**File Partitioning:**
Manifest files are essentially collections of file metadata that can be intelligently
partitioned for parallel processing. Use :meth:`partition_files_by_size` and
:meth:`partition_files_by_n_record` to efficiently split files into balanced groups.
You can use :class:`ManifestFile` in two ways:
- As a **file splitter calculator** (in-memory partitioning without S3 storage)
- As a **persistent manifest file storage** (with S3 read/write operations)
See the :ref:`Quick Start Guide <quick-start>` for complete examples.
:param uri: URI of the Manifest Data File (Parquet format)
:param uri_summary: URI of the Manifest Summary File (JSON format)
:param data_file_list: List of DataFile objects with metadata
:param size: Total aggregate size in bytes of all files
:param n_record: Total aggregate record count across all files
:param fingerprint: Unique hash for detecting data changes and cache invalidation
:param details: Additional workflow-specific metadata
"""
uri: str = dataclasses.field()
uri_summary: str = dataclasses.field()
data_file_list: T.List[DataFile] = dataclasses.field(default_factory=list)
size: T.Optional[int] = dataclasses.field(default=None)
n_record: T.Optional[int] = dataclasses.field(default=None)
fingerprint: T.Optional[str] = dataclasses.field(default=None)
details: T.Dict[str, T.Any] = dataclasses.field(default_factory=dict)
@property
def n_data_file(self) -> int:
"""
Get the number of data files in the manifest.
"""
return len(self.data_file_list)
@property
def size_for_human(self) -> str: # pragma: no cover
return human_size(self.size) if self.size is not None else "Unknown"
[docs]
def calculate(self):
"""
Calculate total size, n_record, and fingerprint of the data files in a single pass.
We use pre-calculated values stored as instance attributes rather than
lazy-loaded cached properties for performance optimization. Since calculating
size, n_record, and fingerprint all require iterating through the data_file_list,
using separate cached properties would result in multiple for-loops (one per
property access). This single calculate() method performs all computations
in one pass, significantly improving efficiency for large file collections.
"""
size_list = list()
n_record_list = list()
if (self.size is None) and (self.n_record is None):
for data_file in self.data_file_list:
size_list.append(data_file.size)
n_record_list.append(data_file.n_record)
elif self.size is None: # pragma: no cover
for data_file in self.data_file_list:
size_list.append(data_file.size)
elif self.n_record is None: # pragma: no cover
for data_file in self.data_file_list:
n_record_list.append(data_file.n_record)
else: # pragma: no cover
pass
try:
if size_list:
size = sum(size_list)
self.size = size
except: # pragma: no cover
pass
try:
if n_record_list:
n_record = sum(n_record_list)
self.n_record = n_record
except: # pragma: no cover
pass
try:
md5 = hashlib.md5()
for data_file in sorted(
self.data_file_list, key=lambda data_file: data_file.uri
):
md5.update(data_file.uri.encode("utf-8"))
md5.update(data_file.etag.encode("utf-8"))
self.fingerprint = md5.hexdigest()
except: # pragma: no cover
pass
[docs]
@classmethod
def new(
cls,
uri: str,
uri_summary: str,
data_file_list: T.List[DataFile],
size: T.Optional[int] = None,
n_record: T.Optional[int] = None,
fingerprint: T.Optional[str] = None,
details: T.Optional[T.Dict[str, T.Any]] = None,
calculate: bool = True,
) -> T.Self:
"""
Create a new manifest file object. To load manifest file data from S3,
use the :meth:`read` method.
:param uri: URI of the manifest data file.
:param uri_summary: URI of the manifest summary file.
:param data_file_list: List of data files.
:param size: Total size of the data files.
:param n_record: Total number of records in the data files.
:param calculate: If True, calculate the size and n_record using the data_file_list.
"""
if details is None:
details = dict()
manifest_file = cls(
uri=uri,
uri_summary=uri_summary,
data_file_list=data_file_list,
size=size,
n_record=n_record,
fingerprint=fingerprint,
details=details,
)
if calculate:
manifest_file.calculate()
return manifest_file
[docs]
def write(
self,
s3_client: "S3Client",
):
"""
Write the manifest file to S3.
:param s3_client: boto3.client("s3") object.
"""
manifest_summary = ManifestSummary(
manifest=self.uri,
size=self.size,
n_record=self.n_record,
fingerprint=self.fingerprint,
details=self.details,
)
bucket, key = split_s3_uri(self.uri_summary)
s3_client.put_object(
Bucket=bucket,
Key=key,
Body=json.dumps(manifest_summary.to_dict(), indent=4),
ContentType="application/json",
)
bucket, key = split_s3_uri(self.uri)
s3_client.put_object(
Bucket=bucket,
Key=key,
Body=write_parquet(
[data_file.to_dict() for data_file in self.data_file_list]
),
ContentType="application/octet-stream",
ContentEncoding="gzip",
)
[docs]
@classmethod
def read(
cls,
uri_summary: str,
s3_client: "S3Client",
) -> T.Self:
"""
Read the manifest file from S3.
:param uri_summary: URI of the manifest summary file. (NOT THE MANIFEST DATA FILE)
:param s3_client: boto3.client("s3") object.
"""
bucket, key = split_s3_uri(uri_summary)
res = s3_client.get_object(Bucket=bucket, Key=key)
dct = json.loads(res["Body"].read().decode("utf-8"))
manifest_summary = ManifestSummary(**dct)
bucket, key = split_s3_uri(dct[KeyEnum.MANIFEST])
res = s3_client.get_object(Bucket=bucket, Key=key)
data_file_list = [DataFile(**dct) for dct in read_parquet(res["Body"].read())]
manifest_file = cls.new(
uri=manifest_summary.manifest,
uri_summary=uri_summary,
size=manifest_summary.size,
n_record=manifest_summary.n_record,
data_file_list=data_file_list,
fingerprint=manifest_summary.fingerprint,
details=manifest_summary.details,
calculate=False,
)
return manifest_file
def _partition_files_by_value(
self,
attr_name: str,
target_value: int,
) -> T.List[DataFileGroup]:
"""
Group the snapshot data files into tasks.
"""
mapping: dict[str, DataFile] = {
data_file.uri: data_file for data_file in self.data_file_list
}
file_specs = [
FileSpec(uri=data_file.uri, value=getattr(data_file, attr_name))
for data_file in self.data_file_list
]
group_specs = group_files(file_specs=file_specs, target_value=target_value)
groups = list()
for group_spec in group_specs:
group = DataFileGroup(
data_files=[
mapping[file_spec.uri] for file_spec in group_spec.file_specs
],
attr_name=attr_name,
value=group_spec.value,
)
groups.append(group)
return groups
[docs]
def partition_files_by_size(
self,
target_size: int = 100 * 1000 * 1000, ## 100 MB in size
) -> T.List[DataFileGroup]:
"""
Organize data files into balanced task groups, ensuring each group's
total file size approximates a specified target,
optimizing workload distribution.
:param target_size: Target size for each task group in bytes.
"""
return self._partition_files_by_value(
attr_name=KeyEnum.SIZE,
target_value=target_size,
)
[docs]
def partition_files_by_n_record(
self,
target_n_record: int = 10 * 1000 * 1000, ## 10M records
) -> T.List[DataFileGroup]:
"""
Organize data files into balanced task groups, ensuring each group's
total number of records approximates a specified target,
optimizing workload distribution.
:param target_n_record: Target number of records for each task group.
"""
return self._partition_files_by_value(
attr_name=KeyEnum.N_RECORD,
target_value=target_n_record,
)
T_MANIFEST_FILE = T.TypeVar("T_MANIFEST_FILE", bound=ManifestFile)