Group Files Planner#
在大数据处理中, 经常会用到分儿治之的策略. 举例来说, 在百万级文件数量的数据处理中, 会有如下应用场景:
将文件分成小组, 例如每组 1000 个文件, 交给许多分布式的 worker 并行处理, 提高处理速度.
对 datalake 中的数据文件做 compaction, 将小文件合并使得文件的 IO 次数减少, 数据更有序, 查询性能更高.
这种分而治之的应用场景的核心技术是将文件按照大小 (或是里面的数据量, 通常是行数) 聚合成固定大小 (不是严格等于, 大致就可以) 的 Group. 在数据量很大的时候, 选择一个高性能的分组算法就变得很重要了.
Implementation and Performance Benchmark#
下面这个脚本我们分别用纯 Python 和用 polars 库实现了这个算法, 并测试了性能.
group_files_planner_poc.py
1# -*- coding: utf-8 -*-
2
3"""
4Benchmark::
5
6 ========== V1 ==========
7 V1: from 2024-08-09 00:15:16.634320 to 2024-08-09 00:15:17.065350 elapsed 0.431030 second.
8 n_group = 313702, n_file = 1000000
9 186 [30, 2, 84, 70]
10 207 [77, 37, 93]
11 137 [51, 86]
12 135 [87, 48]
13 178 [41, 68, 69]
14 ========== V2 ==========
15 V2: from 2024-08-09 00:15:17.074063 to 2024-08-09 00:15:18.011648 elapsed 0.937585 second.
16 n_group = 394647, n_file = 1000000
17 116 [30, 2, 84]
18 70 [70]
19 114 [77, 37]
20 144 [93, 51]
21 173 [86, 87]
22 {'elapse1': 0.43103, 'elapse2': 0.937585}
23"""
24
25import typing as T
26from collections import deque
27
28import numpy as np
29import polars as pl
30from rich import print as rprint
31
32from s3manifesto.vendor.timer import DateTimeTimer
33
34T_FILE_SPEC = T.Tuple[T.Union[str, int], int]
35
36
37def calculate_group_file_plan_v1(
38 files: T.List[T_FILE_SPEC],
39 target: int,
40) -> T.List[T.List[T_FILE_SPEC]]:
41 """
42 Given a list of :class:`File` and a target size, put them into groups,
43 so that each group has approximately the same size as the target size.
44
45 Pure Python implementation.
46 """
47 half_target_size = target // 2
48
49 files = deque(sorted(files, key=lambda x: [1]))
50 file_groups = list()
51 file_group = list()
52 file_group_size = 0
53
54 while 1:
55 # if no files left
56 if len(files) == 0:
57 if len(file_group):
58 file_groups.append(file_group)
59 break
60
61 remaining_size = half_target_size - file_group_size
62 # take the largest file
63 if remaining_size <= half_target_size:
64 file = files.popleft()
65 # take the smallest file
66 else:
67 file = files.pop()
68
69 file_group.append(file)
70 file_group_size += file[1]
71
72 if file_group_size >= target:
73 file_groups.append(file_group)
74 file_group = list()
75 file_group_size = 0
76
77 return file_groups
78
79
80def calculate_group_file_plan_v2(
81 df: pl.DataFrame,
82 size_col: str,
83 target: int,
84) -> T.List[T.List[T_FILE_SPEC]]:
85 """
86 Polars implementation.
87 """
88 df = df.with_columns(pl.col(size_col).cum_sum().alias("cum_sum"))
89 df = df.with_columns(
90 ((pl.col("cum_sum") - pl.lit(1)) // target).alias("batch_group")
91 )
92 df = df.drop("cum_sum")
93 file_group_list = list()
94 for sub_df in df.partition_by("batch_group", include_key=False):
95 sub_df["size"].sum()
96 file_group_list.append(sub_df.rows())
97 return file_group_list
98
99
100if __name__ == "__main__":
101 # n_row = 1000
102 # n_row = 10_000
103 # n_row = 100_000
104 n_row = 1_000_000
105 # n_row = 10_000_000
106 lower = 1
107 upper = 100
108 max_sum = 128
109
110 df = pl.DataFrame(
111 {
112 "id": np.arange(1, 1 + n_row),
113 "size": np.random.randint(lower, upper + 1, n_row),
114 }
115 )
116 files = df.rows()
117 print("========== V1 ==========")
118 with DateTimeTimer("V1") as timer:
119 file_group_list1 = calculate_group_file_plan_v1(
120 files=files,
121 target=max_sum,
122 )
123 elapse1 = timer.elapsed
124
125 n_group = len(file_group_list1)
126 n_file = sum(len(group) for group in file_group_list1)
127 print(f"n_group = {n_group}, n_file = {n_file}")
128 for file_group in file_group_list1[:5]:
129 size_list = [file[1] for file in file_group]
130 total_size = sum(size_list)
131 print(total_size, size_list)
132
133 print("========== V2 ==========")
134 with DateTimeTimer("V2") as timer:
135 file_group_list2 = calculate_group_file_plan_v2(
136 df=df,
137 size_col="size",
138 target=max_sum,
139 )
140 elapse2 = timer.elapsed
141
142 n_group = len(file_group_list2)
143 n_file = sum(len(group) for group in file_group_list2)
144 print(f"n_group = {n_group}, n_file = {n_file}")
145 for file_group in file_group_list2[:5]:
146 size_list = [file[1] for file in file_group]
147 total_size = sum(size_list)
148 print(total_size, size_list)
149
150 res = {"elapse1": elapse1, "elapse2": elapse2}
151 rprint(res)
结论:
对于 1M 个文件进行测试, 纯 Python 实现大约是 0.5 秒, 而 Polars 实现大约是 1 秒. 随着文件的数量增加耗时也 1:1 线性增加. 在 100M 个文件时纯 Python 实现大约是 50 秒. 而 Polars 实现大约是 100 秒.
所以我的项目中主要采用纯 Python 实现的算法.
Dispatcher Use Case#
在大数据处理时, 我们往往会 Orchestrator 扫描全部文件的 metadata, 然后将其分成小组分发给 Worker 来处理. 那么根据以上的 benchmark, 我们以 1M 个文件为基准, 0.5 秒处理完任务分配工作. 我们按照 Amazon Order 数据集大约 100K 条数据压缩后是 20MB, 未压缩大约是 150MB 为参照物. 也就是我们在 0.5 秒内完成了一个 1M * 100K = 100B 条数据, 1M * 20MB = 20TB 压缩后的数据 (未压缩则是 150TB) 的数据集的编排任务. 只要分配工作这一个必须由单点处理的工作抗住了压力, 后续就都是分而治之分布式处理的工作了. 可见即使数据量再大一百倍, 也就是 10 Trillion 条数据, 2PB 压缩后的数据该算法也能胜任. 证明了这一算法的可扩展性.