pyspark上的动态窗口

7uhlpewt  于 2021-07-14  发布在  Spark
关注(0)|答案(0)|浏览(250)

我目前在pyspark上面临以下问题。我需要在行上创建一个滚动窗口,在另一行的值上应用一个函数。这个窗口通常是动态的,如果找到某个值就“重启”。我们可以在示例中看到:
“组列”“应用程序列”“输出列”12223532712223533236284210
在本例中,我们使用值1作为重新启动值,我们在application列上应用sum(但它可以是任何函数)。主要的问题是我们不知道团队的规模。正如我们在示例中看到的,大小是不同的。
我使用pandas编写了一个python代码(但我大多数使用pyspark)。然而,它需要几分钟来执行,我需要一个代码,可以执行得更好。
我的代码:
df->pyspsarkDataframe
应用列->将应用该方法的列
分组列->将用于分组值的列
输出列->输出列的名称
方法->将应用于列的方法
开始值->计数将重新开始的值

def ave_like_method(df, apply_column, group_column, output_column, method, start_value=None):
  df = df.toPandas()

  if start_value is None:

    start_value = df[group_column].min()

  new_values = []

  index = 0

  iteration_values = []

  df[apply_column] = df[apply_column].astype(float)
  df[group_column] = df[group_column].astype(float)

  for apply, group in df[[apply_column, group_column]].values:

    if group == start_value:
      # in this case we can restart the "counting"
      if index != 0:
        # when the index is different of 0 then,
        # we can group the data and apply the method

        # applying the method over a list
        results = method(np.array(iteration_values))

        if isinstance(results, float) or isinstance(results, int):

          results = [results] * len(iteration_values)

        new_values.extend(results)

      iteration_values = []

    iteration_values.append(apply)
    index += 1

  # in the final aggregation the list does not return to it first value
  results = method(np.array(iteration_values))
  if isinstance(results, float) or isinstance(results, int):
    results = [results] * len(iteration_values)
  new_values.extend(results)

  print(len(new_values), len(df))
  df[output_column] = new_values

  # converting pandas dataframe to pyspawrk frame
  df = pandas_to_spark(df)

  return df

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题