NDJson or Parquet

Manifest files are essentially metadata for pairs of Data Files. A Data File’s metadata can be abstractly viewed as a Struct object with an invariant schema. So which format should we choose to store this Struct object? Generally, we have two options: NDJson and Parquet. The following script tests the read/write performance of both formats.

ndjson_or_parquet.py
  1# -*- coding: utf-8 -*-
  2
  3"""
  4Benchmark result:
  5
  6.. code-block:: python
  7
  8    n_records = 1_000_000
  9
 10    # in seconds
 11    {
 12        "Write ndjson": 1.875375,
 13        "Read ndjson": 0.356473,
 14        "Write parquet": 0.25004,
 15        "Read parquet": 0.288895
 16    }
 17"""
 18
 19import typing as T
 20import io
 21import gzip
 22import json
 23import random
 24
 25import polars as pl
 26
 27from s3manifesto.vendor.timer import DateTimeTimer
 28
 29
 30T_RECORD = T.Dict[str, T.Any]
 31
 32
 33def write_ndjson(records: T.List[T_RECORD]) -> bytes:
 34    df = pl.DataFrame(records)
 35    buffer = io.BytesIO()
 36    df.write_ndjson(buffer)
 37    return gzip.compress(buffer.getvalue())
 38
 39
 40def read_ndjson(b: bytes) -> T.List[T_RECORD]:
 41    df = pl.read_ndjson(gzip.decompress(b))
 42    return df.to_dicts()
 43
 44
 45def write_parquet(records: T.List[T_RECORD]) -> bytes:
 46    df = pl.DataFrame(records)
 47    buffer = io.BytesIO()
 48    # df.write_parquet(buffer, compression="snappy")
 49    df.write_parquet(buffer, compression="zstd", pyarrow_options={"use_dictionary": True})
 50    return buffer.getvalue()
 51
 52
 53def read_parquet(b: bytes) -> T.List[T_RECORD]:
 54    df = pl.read_parquet(b)
 55    return df.to_dicts()
 56
 57
 58def test_performance():
 59    # n_records = 1_000
 60    # n_records = 10_000
 61    # n_records = 100_000
 62    n_records = 1_000_000
 63    # n_records = 10_000_000
 64
 65    prefix = "s3://mybucket/data"
 66    records = [
 67        {
 68            "prefix": prefix,
 69            "file": f"{ith}.parquet",
 70            "size": random.randint(1000 * 1000, 10 * 1000 * 1000),
 71        }
 72        for ith in range(1, 1 + n_records)
 73    ]
 74
 75    display = True
 76    # display = False
 77
 78    result = {}
 79
 80    # with DateTimeTimer("Write ndjson", display=display) as timer:
 81    #     b1 = write_ndjson(records)
 82    # result["Write ndjson"] = timer.elapsed
 83    # size1 = len(b1)
 84    #
 85    # with DateTimeTimer("Read ndjson", display=display) as timer:
 86    #     records1 = read_ndjson(b1)
 87    # result["Read ndjson"] = timer.elapsed
 88    # assert len(records1) == n_records
 89    # if n_records <= 1000:
 90    #     assert records1 == records
 91
 92    with DateTimeTimer("Write parquet", display=display) as timer:
 93        b2 = write_parquet(records)
 94    result["Write parquet"] = timer.elapsed
 95    size2 = len(b2)
 96
 97    with DateTimeTimer("Read parquet", display=display) as timer:
 98        records2 = read_parquet(b2)
 99    result["Read parquet"] = timer.elapsed
100    assert len(records2) == n_records
101    if n_records <= 1000:
102        assert records2 == records
103
104    if display:
105        print(f"{n_records = }")
106        print(json.dumps(result, indent=4))
107
108
109test_performance()

Conclusion:

Parquet is the optimal choice. It not only has the best I/O performance, but due to its columnar storage characteristics, it also enables selective reading of partial fields, such as reading only the URI field to locate data files. Additionally, loading it into a DataFrame for subsequent processing is more convenient.