Group Files Planner¶
n big data processing, divide-and-conquer strategies are commonly employed. When processing millions of files, typical use cases include:
Dividing files into smaller groups (e.g., 1,000 files per group) and distributing them across multiple parallel workers to improve processing speed
Performing data compaction in data lakes by merging small files to reduce I/O operations, improve data organization, and enhance query performance
The core challenge in these divide-and-conquer scenarios is efficiently grouping files by size (or data volume, typically record count) into groups of approximately equal size. When dealing with large data volumes, selecting a high-performance grouping algorithm becomes critical.
Implementation and Performance Benchmark¶
The following script implements this algorithm using both pure Python and the Polars library, with comprehensive performance testing results.
group_files_planner_poc.py
1# -*- coding: utf-8 -*-
2
3"""
4Benchmark::
5
6 ========== V1 ==========
7 V1: from 2024-08-09 00:15:16.634320 to 2024-08-09 00:15:17.065350 elapsed 0.431030 second.
8 n_group = 313702, n_file = 1000000
9 186 [30, 2, 84, 70]
10 207 [77, 37, 93]
11 137 [51, 86]
12 135 [87, 48]
13 178 [41, 68, 69]
14 ========== V2 ==========
15 V2: from 2024-08-09 00:15:17.074063 to 2024-08-09 00:15:18.011648 elapsed 0.937585 second.
16 n_group = 394647, n_file = 1000000
17 116 [30, 2, 84]
18 70 [70]
19 114 [77, 37]
20 144 [93, 51]
21 173 [86, 87]
22 {'elapse1': 0.43103, 'elapse2': 0.937585}
23"""
24
25import typing as T
26from collections import deque
27
28import numpy as np
29import polars as pl
30from rich import print as rprint
31
32from s3manifesto.vendor.timer import DateTimeTimer
33
34T_FILE_SPEC = T.Tuple[T.Union[str, int], int]
35
36
37def calculate_group_file_plan_v1(
38 files: T.List[T_FILE_SPEC],
39 target: int,
40) -> T.List[T.List[T_FILE_SPEC]]:
41 """
42 Given a list of :class:`File` and a target size, put them into groups,
43 so that each group has approximately the same size as the target size.
44
45 Pure Python implementation.
46 """
47 half_target_size = target // 2
48
49 files = deque(sorted(files, key=lambda x: [1]))
50 file_groups = list()
51 file_group = list()
52 file_group_size = 0
53
54 while 1:
55 # if no files left
56 if len(files) == 0:
57 if len(file_group):
58 file_groups.append(file_group)
59 break
60
61 remaining_size = half_target_size - file_group_size
62 # take the largest file
63 if remaining_size <= half_target_size:
64 file = files.popleft()
65 # take the smallest file
66 else:
67 file = files.pop()
68
69 file_group.append(file)
70 file_group_size += file[1]
71
72 if file_group_size >= target:
73 file_groups.append(file_group)
74 file_group = list()
75 file_group_size = 0
76
77 return file_groups
78
79
80def calculate_group_file_plan_v2(
81 df: pl.DataFrame,
82 size_col: str,
83 target: int,
84) -> T.List[T.List[T_FILE_SPEC]]:
85 """
86 Polars implementation.
87 """
88 df = df.with_columns(pl.col(size_col).cum_sum().alias("cum_sum"))
89 df = df.with_columns(
90 ((pl.col("cum_sum") - pl.lit(1)) // target).alias("batch_group")
91 )
92 df = df.drop("cum_sum")
93 file_group_list = list()
94 for sub_df in df.partition_by("batch_group", include_key=False):
95 sub_df["size"].sum()
96 file_group_list.append(sub_df.rows())
97 return file_group_list
98
99
100if __name__ == "__main__":
101 # n_row = 1000
102 # n_row = 10_000
103 # n_row = 100_000
104 n_row = 1_000_000
105 # n_row = 10_000_000
106 lower = 1
107 upper = 100
108 max_sum = 128
109
110 df = pl.DataFrame(
111 {
112 "id": np.arange(1, 1 + n_row),
113 "size": np.random.randint(lower, upper + 1, n_row),
114 }
115 )
116 files = df.rows()
117 print("========== V1 ==========")
118 with DateTimeTimer("V1") as timer:
119 file_group_list1 = calculate_group_file_plan_v1(
120 files=files,
121 target=max_sum,
122 )
123 elapse1 = timer.elapsed
124
125 n_group = len(file_group_list1)
126 n_file = sum(len(group) for group in file_group_list1)
127 print(f"n_group = {n_group}, n_file = {n_file}")
128 for file_group in file_group_list1[:5]:
129 size_list = [file[1] for file in file_group]
130 total_size = sum(size_list)
131 print(total_size, size_list)
132
133 print("========== V2 ==========")
134 with DateTimeTimer("V2") as timer:
135 file_group_list2 = calculate_group_file_plan_v2(
136 df=df,
137 size_col="size",
138 target=max_sum,
139 )
140 elapse2 = timer.elapsed
141
142 n_group = len(file_group_list2)
143 n_file = sum(len(group) for group in file_group_list2)
144 print(f"n_group = {n_group}, n_file = {n_file}")
145 for file_group in file_group_list2[:5]:
146 size_list = [file[1] for file in file_group]
147 total_size = sum(size_list)
148 print(total_size, size_list)
149
150 res = {"elapse1": elapse1, "elapse2": elapse2}
151 rprint(res)
Performance Results:
Testing with 1M files shows that the pure Python implementation completes in approximately 0.5 seconds, while the Polars implementation requires about 1 second. Processing time scales linearly with file count at a 1:1 ratio. For 100M files, the pure Python implementation takes approximately 50 seconds compared to 100 seconds for the Polars implementation.
Based on these results, our project primarily uses the pure Python algorithm implementation.
Note
The Polars implementation uses cumulative sum, but many different algorithms exist for this grouping problem, each with distinct complexity and space utilization characteristics. This benchmark serves as a reference point only. In production, we use the Best Fit Decreasing (BFD) algorithm. For the specific implementation, refer to group_files(). We won’t elaborate further on alternative approaches here.
Note
Since our algorithm testing used pure numerical calculations while practical applications use dataclasses, there is additional performance overhead in real-world usage. In our business scenarios, we typically process between 100 and 1,000 files, with a maximum of 10,000 files. Our testing shows that processing 10,000 files takes approximately 1 second. Processing time grows slightly super-linearly with data volume at O(n*log n). For datasets exceeding 10,000 files, we can use random sampling to partition large datasets into smaller chunks, then apply this algorithm within each chunk.
Below are our benchmark test results:
process 1k item in 0.02 sec
process 5k item in 0.30 sec
process 10k item in 1 sec
process 50k item in 12 sec
process 100k item in 37 sec
process 500k item in 7.5 min
process 1000k item in 24 min
Dispatcher Use Case¶
In big data processing workflows, an Orchestrator typically scans all file metadata and divides it into smaller groups for distribution to Workers. Based on our benchmark results (1 second to process 10,000 files), let’s evaluate this algorithm’s performance in big data divide-and-conquer scenarios using the Amazon Orders open-source dataset as a baseline (approximately 100,000 records compressed to 20MB, uncompressed to about 150MB).
Assume we have 1PB of uncompressed data: 1PB / 150MB = 6.67M files
Each distributed worker processes 1GB of data (approximately 6-7 files)
We divide 6.67M files into 6,670 groups of ~1,000 files each
Each group requires approximately 0.02 seconds to process
Total processing time: 6,670 × 0.02 = 133 seconds (approximately 2.2 minutes)
The ability to process 1PB of data metadata in approximately 2 minutes demonstrates excellent performance characteristics, confirming the algorithm’s suitability for large-scale big data scenarios.