scipy 优化Pandas Dataframe concat方法,用于非常大的稀疏数据

wvt8vs2t  于 8个月前  发布在  其他
关注(0)|答案(1)|浏览(96)

给出:

import pandas as pd
import numpy as np

def get_rnd_df(row=10, col=7): # generate random pandas dataframe
    np.random.seed(0)
    d=np.random.randint(low=0, high=10, size=(row,col)).astype("float32")
    d[d < 3] = np.nan
    df=pd.DataFrame(data=d,
                    index=[f"ip{i}" for i in np.random.choice(range(max(row, 10)), row, replace=False) ],
                    columns=[f"col_{c}" for c in np.random.choice(range(max(col, 10)), col, replace=False) ],
                    dtype=pd.SparseDtype("float32", fill_value=np.nan), # sparse: memory efficient xxx but makes concatenation SUPER SLOW xxx
                    # dtype="float32", # dense: memory inefficient | time efficient
                )
    df.index.name = 'usr'
    return df

def get_df_concat(dfs): # concatenate [vstack & compute sum of group values (users)] + sort index + sort column
    df_concat = pd.concat(dfs, axis=0) # dfs=[df1, df2, df3, ..., dfN]
    df_concat = df_concat.groupby("usr").sum().sort_index(key=lambda x: ( x.to_series().str[2:].astype(int) ))
    df_concat = df_concat.reindex(columns=sorted(df_concat.columns, key=lambda x: ( int(x[4:]) ) ) )
    return df_concat

我使用我的辅助函数get_df_concat(dfs)对几个pandas数组[ Sparse ]应用连接,其中列和索引已排序。对于两个小样本pandas框架,它可以正常工作:

df1=get_rnd_df(row=8, col=7) # small sample random dataframe
        col_0   col_6   col_4   col_2   col_5   col_3   col_8
usr                             
ip3     5.0     NaN     3.0     3.0     7.0     9.0     3.0
ip5     5.0     NaN     4.0     7.0     6.0     8.0     8.0
ip0     NaN     6.0     7.0     7.0     8.0     NaN     5.0
ip6     9.0     8.0     9.0     4.0     3.0     NaN     3.0
ip2     5.0     NaN     NaN     3.0     8.0     NaN     3.0
ip8     3.0     3.0     7.0     NaN     NaN     9.0     9.0
ip9     NaN     4.0     7.0     3.0     NaN     7.0     NaN
ip7     NaN     NaN     4.0     5.0     5.0     6.0     8.0

df2=get_rnd_df(row=5, col=11) # small sample random dataframe
        col_4   col_10  col_5   col_0   col_9   col_2   col_8   col_6   col_3   col_7   col_1
usr                                             
ip3     5.0     NaN     3.0     3.0     7.0     9.0     3.0     5.0     NaN     4.0     7.0
ip5     6.0     8.0     8.0     NaN     6.0     7.0     7.0     8.0     NaN     5.0     9.0
ip0     8.0     9.0     4.0     3.0     NaN     3.0     5.0     NaN     NaN     3.0     8.0
ip6     NaN     3.0     3.0     3.0     7.0     NaN     NaN     9.0     9.0     NaN     4.0
ip2     7.0     3.0     NaN     7.0     NaN     NaN     NaN     4.0     5.0     5.0     6.0

%%time
df_concat=get_df_concat(dfs=[df1, df2])
CPU times: user 27.1 ms, sys: 0 ns, total: 27.1 ms
Wall time: 30.5 ms
        col_0   col_1   col_2   col_3   col_4   col_5   col_6   col_7   col_8   col_9   col_10
usr                                             
ip0     3.0     8.0     10.0    0.0     15.0    12.0    6.0     3.0     10.0    0.0     9.0
ip2     12.0    6.0     3.0     5.0     7.0     8.0     4.0     5.0     3.0     0.0     3.0
ip3     8.0     7.0     12.0    9.0     8.0     10.0    5.0     4.0     6.0     7.0     0.0
ip5     5.0     9.0     14.0    8.0     10.0    14.0    8.0     5.0     15.0    6.0     8.0
ip6     12.0    4.0     4.0     9.0     9.0     6.0     17.0    0.0     3.0     7.0     3.0
ip7     0.0     0.0     5.0     6.0     4.0     5.0     0.0     0.0     8.0     0.0     0.0
ip8     3.0     0.0     0.0     9.0     7.0     0.0     3.0     0.0     9.0     0.0     0.0
ip9     0.0     0.0     3.0     7.0     7.0     0.0     4.0     0.0     0.0     0.0     0.0

问题是,我的真实的数据是如此之大,它需要许多小时来完成串联,给定足够的可用内存:

df1=get_rnd_df(row=int(7e+5), col=int(2e+8)) # resembles my real data
df2=get_rnd_df(row=int(9e+6), col=int(1e+9)) # resembles my real data

%%time
df_concat=get_df_concat(dfs=[df1, df2]) # SUPER SLOW & time-consuming!!!

有没有更好的替代方案来更有效地完成这种连接?我想知道是否有SciPy csr_matrix可以帮助我更快地实现?vstackhstack对我没有帮助,因为我也对类似用户的类似列求和,就像我的助手函数中写的那样。
我利用pd.SparseDtype("float32", fill_value=np.nan)来确保它适合我的可用内存。

chhqkbe1

chhqkbe11#

优化要点:

  • 一个小的:我更喜欢numpy.random.Generator,这是推荐的,一般来说,我注意到它的工作速度比np.random.<func>快一点
  • 操作数字索引/列标签。然后,立即在末尾添加所需的前缀(矢量化)。这将加快整个过程。
  • 回顾前面的一点,消除列的自定义排序,因为它们可以在连接阶段通过设置sort参数进行排序:pd.concat(dfs, axis=0, sort=True)
  • 使用sort_index消除索引排序,因为默认情况下pd.DataFrame.groupby将对组键进行排序
  • 通过选择numba(作为engine)用于具有并行计算的JIT编译代码({'nopython': True, 'parallel': False})来提高groupby.DataFrameGroupBy.sum的性能

我在2个大小为row=int(1e+3), col=int(2e+3)的框架上进行了测试,前一个/初始解决方案在我的机器上完成了144秒。
而优化的方法运行时间约为8秒(因为Numba会有一些函数编译开销)。但所有后续调用都在2.6秒内运行,这是55倍的加速。
完整的优化版本:

def get_rnd_df(row=10, col=7):
    rng = np.random.default_rng(0)
    a = rng.integers(low=0, high=10, size=(row, col)).astype("float32")
    a[a < 3] = np.nan

    df = pd.DataFrame(data=a,
                      index=rng.choice(range(max(row, 10)), row, replace=False),
                      columns=rng.choice(range(max(col, 10)), col, replace=False),
                      dtype=pd.SparseDtype("float32", fill_value=np.nan),
                      )

    df.index.name = 'usr'
    return df

def get_df_concat(dfs):
    df = pd.concat(dfs, axis=0, sort=True)  # dfs=[df1, df2, df3, ..., dfN]
    df = df.groupby(level=0).sum(engine="numba",
                                 engine_kwargs={'nopython': True,
                                                'parallel': True})

    df.index = 'ip' + df.index.astype(str)
    df.columns = 'col_' + df.columns.astype(str)

    return df

t0 = time.time()

df1=get_rnd_df(row=int(1e+3), col=int(2e+3)) # resembles my real data
df2=get_rnd_df(row=int(1e+3), col=int(2e+3)) # resembles my real data
df_concat=get_df_concat(dfs=[df1, df2])

print('time spent: ', time.time() - t0)
time spent:  2.636758327484131

相关问题