NDJson or Parquet¶
Manifest files are essentially metadata for pairs of Data Files. A Data File’s metadata can be abstractly viewed as a Struct object with an invariant schema. So which format should we choose to store this Struct object? Generally, we have two options: NDJson and Parquet. The following script tests the read/write performance of both formats.
ndjson_or_parquet.py
1# -*- coding: utf-8 -*-
2
3"""
4Benchmark result:
5
6.. code-block:: python
7
8 n_records = 1_000_000
9
10 # in seconds
11 {
12 "Write ndjson": 1.875375,
13 "Read ndjson": 0.356473,
14 "Write parquet": 0.25004,
15 "Read parquet": 0.288895
16 }
17"""
18
19import typing as T
20import io
21import gzip
22import json
23import random
24
25import polars as pl
26
27from s3manifesto.vendor.timer import DateTimeTimer
28
29
30T_RECORD = T.Dict[str, T.Any]
31
32
33def write_ndjson(records: T.List[T_RECORD]) -> bytes:
34 df = pl.DataFrame(records)
35 buffer = io.BytesIO()
36 df.write_ndjson(buffer)
37 return gzip.compress(buffer.getvalue())
38
39
40def read_ndjson(b: bytes) -> T.List[T_RECORD]:
41 df = pl.read_ndjson(gzip.decompress(b))
42 return df.to_dicts()
43
44
45def write_parquet(records: T.List[T_RECORD]) -> bytes:
46 df = pl.DataFrame(records)
47 buffer = io.BytesIO()
48 # df.write_parquet(buffer, compression="snappy")
49 df.write_parquet(buffer, compression="zstd", pyarrow_options={"use_dictionary": True})
50 return buffer.getvalue()
51
52
53def read_parquet(b: bytes) -> T.List[T_RECORD]:
54 df = pl.read_parquet(b)
55 return df.to_dicts()
56
57
58def test_performance():
59 # n_records = 1_000
60 # n_records = 10_000
61 # n_records = 100_000
62 n_records = 1_000_000
63 # n_records = 10_000_000
64
65 prefix = "s3://mybucket/data"
66 records = [
67 {
68 "prefix": prefix,
69 "file": f"{ith}.parquet",
70 "size": random.randint(1000 * 1000, 10 * 1000 * 1000),
71 }
72 for ith in range(1, 1 + n_records)
73 ]
74
75 display = True
76 # display = False
77
78 result = {}
79
80 # with DateTimeTimer("Write ndjson", display=display) as timer:
81 # b1 = write_ndjson(records)
82 # result["Write ndjson"] = timer.elapsed
83 # size1 = len(b1)
84 #
85 # with DateTimeTimer("Read ndjson", display=display) as timer:
86 # records1 = read_ndjson(b1)
87 # result["Read ndjson"] = timer.elapsed
88 # assert len(records1) == n_records
89 # if n_records <= 1000:
90 # assert records1 == records
91
92 with DateTimeTimer("Write parquet", display=display) as timer:
93 b2 = write_parquet(records)
94 result["Write parquet"] = timer.elapsed
95 size2 = len(b2)
96
97 with DateTimeTimer("Read parquet", display=display) as timer:
98 records2 = read_parquet(b2)
99 result["Read parquet"] = timer.elapsed
100 assert len(records2) == n_records
101 if n_records <= 1000:
102 assert records2 == records
103
104 if display:
105 print(f"{n_records = }")
106 print(json.dumps(result, indent=4))
107
108
109test_performance()
Conclusion:
Parquet is the optimal choice. It not only has the best I/O performance, but due to its columnar storage characteristics, it also enables selective reading of partial fields, such as reading only the URI field to locate data files. Additionally, loading it into a DataFrame for subsequent processing is more convenient.