Group Files Planner#

在大数据处理中, 经常会用到分儿治之的策略. 举例来说, 在百万级文件数量的数据处理中, 会有如下应用场景:

  • 将文件分成小组, 例如每组 1000 个文件, 交给许多分布式的 worker 并行处理, 提高处理速度.

  • 对 datalake 中的数据文件做 compaction, 将小文件合并使得文件的 IO 次数减少, 数据更有序, 查询性能更高.

这种分而治之的应用场景的核心技术是将文件按照大小 (或是里面的数据量, 通常是行数) 聚合成固定大小 (不是严格等于, 大致就可以) 的 Group. 在数据量很大的时候, 选择一个高性能的分组算法就变得很重要了.

Implementation and Performance Benchmark#

下面这个脚本我们分别用纯 Python 和用 polars 库实现了这个算法, 并测试了性能.

group_files_planner_poc.py
  1# -*- coding: utf-8 -*-
  2
  3"""
  4Benchmark::
  5
  6    ========== V1 ==========
  7    V1: from 2024-08-09 00:15:16.634320 to 2024-08-09 00:15:17.065350 elapsed 0.431030 second.
  8    n_group = 313702, n_file = 1000000
  9    186 [30, 2, 84, 70]
 10    207 [77, 37, 93]
 11    137 [51, 86]
 12    135 [87, 48]
 13    178 [41, 68, 69]
 14    ========== V2 ==========
 15    V2: from 2024-08-09 00:15:17.074063 to 2024-08-09 00:15:18.011648 elapsed 0.937585 second.
 16    n_group = 394647, n_file = 1000000
 17    116 [30, 2, 84]
 18    70 [70]
 19    114 [77, 37]
 20    144 [93, 51]
 21    173 [86, 87]
 22    {'elapse1': 0.43103, 'elapse2': 0.937585}
 23"""
 24
 25import typing as T
 26from collections import deque
 27
 28import numpy as np
 29import polars as pl
 30from rich import print as rprint
 31
 32from s3manifesto.vendor.timer import DateTimeTimer
 33
 34T_FILE_SPEC = T.Tuple[T.Union[str, int], int]
 35
 36
 37def calculate_group_file_plan_v1(
 38    files: T.List[T_FILE_SPEC],
 39    target: int,
 40) -> T.List[T.List[T_FILE_SPEC]]:
 41    """
 42    Given a list of :class:`File` and a target size, put them into groups,
 43    so that each group has approximately the same size as the target size.
 44
 45    Pure Python implementation.
 46    """
 47    half_target_size = target // 2
 48
 49    files = deque(sorted(files, key=lambda x: [1]))
 50    file_groups = list()
 51    file_group = list()
 52    file_group_size = 0
 53
 54    while 1:
 55        # if no files left
 56        if len(files) == 0:
 57            if len(file_group):
 58                file_groups.append(file_group)
 59            break
 60
 61        remaining_size = half_target_size - file_group_size
 62        # take the largest file
 63        if remaining_size <= half_target_size:
 64            file = files.popleft()
 65        # take the smallest file
 66        else:
 67            file = files.pop()
 68
 69        file_group.append(file)
 70        file_group_size += file[1]
 71
 72        if file_group_size >= target:
 73            file_groups.append(file_group)
 74            file_group = list()
 75            file_group_size = 0
 76
 77    return file_groups
 78
 79
 80def calculate_group_file_plan_v2(
 81    df: pl.DataFrame,
 82    size_col: str,
 83    target: int,
 84) -> T.List[T.List[T_FILE_SPEC]]:
 85    """
 86    Polars implementation.
 87    """
 88    df = df.with_columns(pl.col(size_col).cum_sum().alias("cum_sum"))
 89    df = df.with_columns(
 90        ((pl.col("cum_sum") - pl.lit(1)) // target).alias("batch_group")
 91    )
 92    df = df.drop("cum_sum")
 93    file_group_list = list()
 94    for sub_df in df.partition_by("batch_group", include_key=False):
 95        sub_df["size"].sum()
 96        file_group_list.append(sub_df.rows())
 97    return file_group_list
 98
 99
100if __name__ == "__main__":
101    # n_row = 1000
102    # n_row = 10_000
103    # n_row = 100_000
104    n_row = 1_000_000
105    # n_row = 10_000_000
106    lower = 1
107    upper = 100
108    max_sum = 128
109
110    df = pl.DataFrame(
111        {
112            "id": np.arange(1, 1 + n_row),
113            "size": np.random.randint(lower, upper + 1, n_row),
114        }
115    )
116    files = df.rows()
117    print("========== V1 ==========")
118    with DateTimeTimer("V1") as timer:
119        file_group_list1 = calculate_group_file_plan_v1(
120            files=files,
121            target=max_sum,
122        )
123    elapse1 = timer.elapsed
124
125    n_group = len(file_group_list1)
126    n_file = sum(len(group) for group in file_group_list1)
127    print(f"n_group = {n_group}, n_file = {n_file}")
128    for file_group in file_group_list1[:5]:
129        size_list = [file[1] for file in file_group]
130        total_size = sum(size_list)
131        print(total_size, size_list)
132
133    print("========== V2 ==========")
134    with DateTimeTimer("V2") as timer:
135        file_group_list2 = calculate_group_file_plan_v2(
136            df=df,
137            size_col="size",
138            target=max_sum,
139        )
140    elapse2 = timer.elapsed
141
142    n_group = len(file_group_list2)
143    n_file = sum(len(group) for group in file_group_list2)
144    print(f"n_group = {n_group}, n_file = {n_file}")
145    for file_group in file_group_list2[:5]:
146        size_list = [file[1] for file in file_group]
147        total_size = sum(size_list)
148        print(total_size, size_list)
149
150    res = {"elapse1": elapse1, "elapse2": elapse2}
151    rprint(res)

结论:

对于 1M 个文件进行测试, 纯 Python 实现大约是 0.5 秒, 而 Polars 实现大约是 1 秒. 随着文件的数量增加耗时也 1:1 线性增加. 在 100M 个文件时纯 Python 实现大约是 50 秒. 而 Polars 实现大约是 100 秒.

所以我的项目中主要采用纯 Python 实现的算法.

Dispatcher Use Case#

在大数据处理时, 我们往往会 Orchestrator 扫描全部文件的 metadata, 然后将其分成小组分发给 Worker 来处理. 那么根据以上的 benchmark, 我们以 1M 个文件为基准, 0.5 秒处理完任务分配工作. 我们按照 Amazon Order 数据集大约 100K 条数据压缩后是 20MB, 未压缩大约是 150MB 为参照物. 也就是我们在 0.5 秒内完成了一个 1M * 100K = 100B 条数据, 1M * 20MB = 20TB 压缩后的数据 (未压缩则是 150TB) 的数据集的编排任务. 只要分配工作这一个必须由单点处理的工作抗住了压力, 后续就都是分而治之分布式处理的工作了. 可见即使数据量再大一百倍, 也就是 10 Trillion 条数据, 2PB 压缩后的数据该算法也能胜任. 证明了这一算法的可扩展性.