NDJson or Parquet#
Manifest 文件本质上是一对 Data File 的 metadata. 而一个 Data File 的 metadata 可以被抽象视为一个 schema 不变的 Struct 对象. 那么我们应该选择哪种格式用来存储这个 Struct 对象呢? 一般来说, 我们有两种选择: NDJson 和 Parquet. 下面这个脚本测试了两种格式的读写性能.
ndjson_or_parquet.py
1# -*- coding: utf-8 -*-
2
3"""
4Benchmark result::
5
6.. code-block:: python
7
8 n_records = 1000000
9 {
10 "Write ndjson": 1.875375,
11 "Read ndjson": 0.356473,
12 "Write parquet": 0.25004,
13 "Read parquet": 0.288895
14 }
15"""
16
17import typing as T
18import io
19import gzip
20import json
21import random
22
23import polars as pl
24
25from s3manifesto.vendor.timer import DateTimeTimer
26
27
28T_RECORD = T.Dict[str, T.Any]
29
30
31def write_ndjson(records: T.List[T_RECORD]) -> bytes:
32 df = pl.DataFrame(records)
33 buffer = io.BytesIO()
34 df.write_ndjson(buffer)
35 return gzip.compress(buffer.getvalue())
36
37
38def read_ndjson(b: bytes) -> T.List[T_RECORD]:
39 df = pl.read_ndjson(gzip.decompress(b))
40 return df.to_dicts()
41
42
43def write_parquet(records: T.List[T_RECORD]) -> bytes:
44 df = pl.DataFrame(records)
45 buffer = io.BytesIO()
46 # df.write_parquet(buffer, compression="snappy")
47 df.write_parquet(buffer, compression="zstd", pyarrow_options={"use_dictionary": True})
48 return buffer.getvalue()
49
50
51def read_parquet(b: bytes) -> T.List[T_RECORD]:
52 df = pl.read_parquet(b)
53 return df.to_dicts()
54
55
56def test_performance():
57 # n_records = 1_000
58 # n_records = 10_000
59 # n_records = 100_000
60 n_records = 1_000_000
61 # n_records = 10_000_000
62
63 prefix = "s3://mybucket/data"
64 records = [
65 {
66 "prefix": prefix,
67 "file": f"{ith}.parquet",
68 "size": random.randint(1000 * 1000, 10 * 1000 * 1000),
69 }
70 for ith in range(1, 1 + n_records)
71 ]
72
73 display = True
74 # display = False
75
76 result = {}
77
78 # with DateTimeTimer("Write ndjson", display=display) as timer:
79 # b1 = write_ndjson(records)
80 # result["Write ndjson"] = timer.elapsed
81 # size1 = len(b1)
82 #
83 # with DateTimeTimer("Read ndjson", display=display) as timer:
84 # records1 = read_ndjson(b1)
85 # result["Read ndjson"] = timer.elapsed
86 # assert len(records1) == n_records
87 # if n_records <= 1000:
88 # assert records1 == records
89
90 with DateTimeTimer("Write parquet", display=display) as timer:
91 b2 = write_parquet(records)
92 result["Write parquet"] = timer.elapsed
93 size2 = len(b2)
94
95 with DateTimeTimer("Read parquet", display=display) as timer:
96 records2 = read_parquet(b2)
97 result["Read parquet"] = timer.elapsed
98 assert len(records2) == n_records
99 if n_records <= 1000:
100 assert records2 == records
101
102 if display:
103 print(f"{n_records = }")
104 print(json.dumps(result, indent=4))
105
106
107test_performance()
结论:
Parquet 是最优选择. 它不仅有最好的 IO 性能, 并且由于其列式存储的特性, 还能选择性地读部分字段, 例如只读 URI 字段用来定位数据文件. 并且将它读到 DataFrame 中做后续处理也更方便.