Group Files Planner

n big data processing, divide-and-conquer strategies are commonly employed. When processing millions of files, typical use cases include:

  • Dividing files into smaller groups (e.g., 1,000 files per group) and distributing them across multiple parallel workers to improve processing speed

  • Performing data compaction in data lakes by merging small files to reduce I/O operations, improve data organization, and enhance query performance

The core challenge in these divide-and-conquer scenarios is efficiently grouping files by size (or data volume, typically record count) into groups of approximately equal size. When dealing with large data volumes, selecting a high-performance grouping algorithm becomes critical.

Implementation and Performance Benchmark

The following script implements this algorithm using both pure Python and the Polars library, with comprehensive performance testing results.

group_files_planner_poc.py
  1# -*- coding: utf-8 -*-
  2
  3"""
  4Benchmark::
  5
  6    ========== V1 ==========
  7    V1: from 2024-08-09 00:15:16.634320 to 2024-08-09 00:15:17.065350 elapsed 0.431030 second.
  8    n_group = 313702, n_file = 1000000
  9    186 [30, 2, 84, 70]
 10    207 [77, 37, 93]
 11    137 [51, 86]
 12    135 [87, 48]
 13    178 [41, 68, 69]
 14    ========== V2 ==========
 15    V2: from 2024-08-09 00:15:17.074063 to 2024-08-09 00:15:18.011648 elapsed 0.937585 second.
 16    n_group = 394647, n_file = 1000000
 17    116 [30, 2, 84]
 18    70 [70]
 19    114 [77, 37]
 20    144 [93, 51]
 21    173 [86, 87]
 22    {'elapse1': 0.43103, 'elapse2': 0.937585}
 23"""
 24
 25import typing as T
 26from collections import deque
 27
 28import numpy as np
 29import polars as pl
 30from rich import print as rprint
 31
 32from s3manifesto.vendor.timer import DateTimeTimer
 33
 34T_FILE_SPEC = T.Tuple[T.Union[str, int], int]
 35
 36
 37def calculate_group_file_plan_v1(
 38    files: T.List[T_FILE_SPEC],
 39    target: int,
 40) -> T.List[T.List[T_FILE_SPEC]]:
 41    """
 42    Given a list of :class:`File` and a target size, put them into groups,
 43    so that each group has approximately the same size as the target size.
 44
 45    Pure Python implementation.
 46    """
 47    half_target_size = target // 2
 48
 49    files = deque(sorted(files, key=lambda x: [1]))
 50    file_groups = list()
 51    file_group = list()
 52    file_group_size = 0
 53
 54    while 1:
 55        # if no files left
 56        if len(files) == 0:
 57            if len(file_group):
 58                file_groups.append(file_group)
 59            break
 60
 61        remaining_size = half_target_size - file_group_size
 62        # take the largest file
 63        if remaining_size <= half_target_size:
 64            file = files.popleft()
 65        # take the smallest file
 66        else:
 67            file = files.pop()
 68
 69        file_group.append(file)
 70        file_group_size += file[1]
 71
 72        if file_group_size >= target:
 73            file_groups.append(file_group)
 74            file_group = list()
 75            file_group_size = 0
 76
 77    return file_groups
 78
 79
 80def calculate_group_file_plan_v2(
 81    df: pl.DataFrame,
 82    size_col: str,
 83    target: int,
 84) -> T.List[T.List[T_FILE_SPEC]]:
 85    """
 86    Polars implementation.
 87    """
 88    df = df.with_columns(pl.col(size_col).cum_sum().alias("cum_sum"))
 89    df = df.with_columns(
 90        ((pl.col("cum_sum") - pl.lit(1)) // target).alias("batch_group")
 91    )
 92    df = df.drop("cum_sum")
 93    file_group_list = list()
 94    for sub_df in df.partition_by("batch_group", include_key=False):
 95        sub_df["size"].sum()
 96        file_group_list.append(sub_df.rows())
 97    return file_group_list
 98
 99
100if __name__ == "__main__":
101    # n_row = 1000
102    # n_row = 10_000
103    # n_row = 100_000
104    n_row = 1_000_000
105    # n_row = 10_000_000
106    lower = 1
107    upper = 100
108    max_sum = 128
109
110    df = pl.DataFrame(
111        {
112            "id": np.arange(1, 1 + n_row),
113            "size": np.random.randint(lower, upper + 1, n_row),
114        }
115    )
116    files = df.rows()
117    print("========== V1 ==========")
118    with DateTimeTimer("V1") as timer:
119        file_group_list1 = calculate_group_file_plan_v1(
120            files=files,
121            target=max_sum,
122        )
123    elapse1 = timer.elapsed
124
125    n_group = len(file_group_list1)
126    n_file = sum(len(group) for group in file_group_list1)
127    print(f"n_group = {n_group}, n_file = {n_file}")
128    for file_group in file_group_list1[:5]:
129        size_list = [file[1] for file in file_group]
130        total_size = sum(size_list)
131        print(total_size, size_list)
132
133    print("========== V2 ==========")
134    with DateTimeTimer("V2") as timer:
135        file_group_list2 = calculate_group_file_plan_v2(
136            df=df,
137            size_col="size",
138            target=max_sum,
139        )
140    elapse2 = timer.elapsed
141
142    n_group = len(file_group_list2)
143    n_file = sum(len(group) for group in file_group_list2)
144    print(f"n_group = {n_group}, n_file = {n_file}")
145    for file_group in file_group_list2[:5]:
146        size_list = [file[1] for file in file_group]
147        total_size = sum(size_list)
148        print(total_size, size_list)
149
150    res = {"elapse1": elapse1, "elapse2": elapse2}
151    rprint(res)

Performance Results:

Testing with 1M files shows that the pure Python implementation completes in approximately 0.5 seconds, while the Polars implementation requires about 1 second. Processing time scales linearly with file count at a 1:1 ratio. For 100M files, the pure Python implementation takes approximately 50 seconds compared to 100 seconds for the Polars implementation.

Based on these results, our project primarily uses the pure Python algorithm implementation.

Note

The Polars implementation uses cumulative sum, but many different algorithms exist for this grouping problem, each with distinct complexity and space utilization characteristics. This benchmark serves as a reference point only. In production, we use the Best Fit Decreasing (BFD) algorithm. For the specific implementation, refer to group_files(). We won’t elaborate further on alternative approaches here.

Note

Since our algorithm testing used pure numerical calculations while practical applications use dataclasses, there is additional performance overhead in real-world usage. In our business scenarios, we typically process between 100 and 1,000 files, with a maximum of 10,000 files. Our testing shows that processing 10,000 files takes approximately 1 second. Processing time grows slightly super-linearly with data volume at O(n*log n). For datasets exceeding 10,000 files, we can use random sampling to partition large datasets into smaller chunks, then apply this algorithm within each chunk.

Below are our benchmark test results:

  • process 1k item in 0.02 sec

  • process 5k item in 0.30 sec

  • process 10k item in 1 sec

  • process 50k item in 12 sec

  • process 100k item in 37 sec

  • process 500k item in 7.5 min

  • process 1000k item in 24 min

Dispatcher Use Case

In big data processing workflows, an Orchestrator typically scans all file metadata and divides it into smaller groups for distribution to Workers. Based on our benchmark results (1 second to process 10,000 files), let’s evaluate this algorithm’s performance in big data divide-and-conquer scenarios using the Amazon Orders open-source dataset as a baseline (approximately 100,000 records compressed to 20MB, uncompressed to about 150MB).

  • Assume we have 1PB of uncompressed data: 1PB / 150MB = 6.67M files

  • Each distributed worker processes 1GB of data (approximately 6-7 files)

  • We divide 6.67M files into 6,670 groups of ~1,000 files each

  • Each group requires approximately 0.02 seconds to process

  • Total processing time: 6,670 × 0.02 = 133 seconds (approximately 2.2 minutes)

The ability to process 1PB of data metadata in approximately 2 minutes demonstrates excellent performance characteristics, confirming the algorithm’s suitability for large-scale big data scenarios.