NDJson or Parquet#

Manifest 文件本质上是一对 Data File 的 metadata. 而一个 Data File 的 metadata 可以被抽象视为一个 schema 不变的 Struct 对象. 那么我们应该选择哪种格式用来存储这个 Struct 对象呢? 一般来说, 我们有两种选择: NDJson 和 Parquet. 下面这个脚本测试了两种格式的读写性能.

ndjson_or_parquet.py
  1# -*- coding: utf-8 -*-
  2
  3"""
  4Benchmark result::
  5
  6.. code-block:: python
  7
  8    n_records = 1000000
  9    {
 10        "Write ndjson": 1.875375,
 11        "Read ndjson": 0.356473,
 12        "Write parquet": 0.25004,
 13        "Read parquet": 0.288895
 14    }
 15"""
 16
 17import typing as T
 18import io
 19import gzip
 20import json
 21import random
 22
 23import polars as pl
 24
 25from s3manifesto.vendor.timer import DateTimeTimer
 26
 27
 28T_RECORD = T.Dict[str, T.Any]
 29
 30
 31def write_ndjson(records: T.List[T_RECORD]) -> bytes:
 32    df = pl.DataFrame(records)
 33    buffer = io.BytesIO()
 34    df.write_ndjson(buffer)
 35    return gzip.compress(buffer.getvalue())
 36
 37
 38def read_ndjson(b: bytes) -> T.List[T_RECORD]:
 39    df = pl.read_ndjson(gzip.decompress(b))
 40    return df.to_dicts()
 41
 42
 43def write_parquet(records: T.List[T_RECORD]) -> bytes:
 44    df = pl.DataFrame(records)
 45    buffer = io.BytesIO()
 46    # df.write_parquet(buffer, compression="snappy")
 47    df.write_parquet(buffer, compression="zstd", pyarrow_options={"use_dictionary": True})
 48    return buffer.getvalue()
 49
 50
 51def read_parquet(b: bytes) -> T.List[T_RECORD]:
 52    df = pl.read_parquet(b)
 53    return df.to_dicts()
 54
 55
 56def test_performance():
 57    # n_records = 1_000
 58    # n_records = 10_000
 59    # n_records = 100_000
 60    n_records = 1_000_000
 61    # n_records = 10_000_000
 62
 63    prefix = "s3://mybucket/data"
 64    records = [
 65        {
 66            "prefix": prefix,
 67            "file": f"{ith}.parquet",
 68            "size": random.randint(1000 * 1000, 10 * 1000 * 1000),
 69        }
 70        for ith in range(1, 1 + n_records)
 71    ]
 72
 73    display = True
 74    # display = False
 75
 76    result = {}
 77
 78    # with DateTimeTimer("Write ndjson", display=display) as timer:
 79    #     b1 = write_ndjson(records)
 80    # result["Write ndjson"] = timer.elapsed
 81    # size1 = len(b1)
 82    #
 83    # with DateTimeTimer("Read ndjson", display=display) as timer:
 84    #     records1 = read_ndjson(b1)
 85    # result["Read ndjson"] = timer.elapsed
 86    # assert len(records1) == n_records
 87    # if n_records <= 1000:
 88    #     assert records1 == records
 89
 90    with DateTimeTimer("Write parquet", display=display) as timer:
 91        b2 = write_parquet(records)
 92    result["Write parquet"] = timer.elapsed
 93    size2 = len(b2)
 94
 95    with DateTimeTimer("Read parquet", display=display) as timer:
 96        records2 = read_parquet(b2)
 97    result["Read parquet"] = timer.elapsed
 98    assert len(records2) == n_records
 99    if n_records <= 1000:
100        assert records2 == records
101
102    if display:
103        print(f"{n_records = }")
104        print(json.dumps(result, indent=4))
105
106
107test_performance()

结论:

Parquet 是最优选择. 它不仅有最好的 IO 性能, 并且由于其列式存储的特性, 还能选择性地读部分字段, 例如只读 URI 字段用来定位数据文件. 并且将它读到 DataFrame 中做后续处理也更方便.