for循环列表

zpqajqem  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(386)

我有两个Dataframe叫做 df1 以及 df2 ,它们都有相同的列名。我希望在独特的日期上运行for循环,从 df1 并将相同的日期筛选器应用于 df2 . 我创建了一个唯一日期的列表,然后尝试遍历它。然而,我有什么是抛出错误。
以下是我所拥有的:

val unique_weeks = df1.select(df1("date")).distinct

for( week <- unique_weeks) {
  val df1_filtered = df1.filter($"date" === week)
  val df2_filtered = df2.filter($"date" === week)
  /// will run a join here and more code 

}

我想 <- 这部分可能不正确-但不确定如何使用其他方法过滤Dataframe。
错误如下:

[error] (run-main-0) org.apache.spark.SparkException: Job aborted due to stage failure: Task 35 in stage 3.0 failed 1 times, most recent failure: Lost task 35.0 in stage 3.0 (TID 399, localhost, executor driver): java.lang.RuntimeException: Unsupported literal type class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema [1591772400000]
[error]     at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:75)
[error]     at org.apache.spark.sql.functions$.lit(functions.scala:101)
[error]     at org.apache.spark.sql.Column.$eq$eq$eq(Column.scala:267)
[error]     at spark_pkg.SparkMain$$anonfun$main$1.apply(SparkMain.scala:880)
[error]     at spark_pkg.SparkMain$$anonfun$main$1.apply(SparkMain.scala:878)
[error]     at scala.collection.Iterator$class.foreach(Iterator.scala:893)
[error]     at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
[error]     at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
[error]     at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
[error]     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
[error]     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
[error]     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
[error]     at org.apache.spark.scheduler.Task.run(Task.scala:99)
[error]     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
[error]     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[error]     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[error]     at java.lang.Thread.run(Thread.java:748)
[error] 
[error] Driver stacktrace:
[error] org.apache.spark.SparkException: Job aborted due to stage failure: Task 35 in stage 3.0 failed 1 times, most recent failure: Lost task 35.0 in stage 3.0 (TID 399, localhost, executor driver): java.lang.RuntimeException: Unsupported literal type class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema [1591772400000]
[error]     at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:75)
[error]     at org.apache.spark.sql.functions$.lit(functions.scala:101)
[error]     at org.apache.spark.sql.Column.$eq$eq$eq(Column.scala:267)
[error]     at spark_pkg.SparkMain$$anonfun$main$1.apply(SparkMain.scala:880)
[error]     at spark_pkg.SparkMain$$anonfun$main$1.apply(SparkMain.scala:878)
[error]     at scala.collection.Iterator$class.foreach(Iterator.scala:893)
[error]     at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
[error]     at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
[error]     at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
[error]     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
[error]     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
[error]     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
[error]     at org.apache.spark.scheduler.Task.run(Task.scala:99)
[error]     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
[error]     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[error]     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[error]     at java.lang.Thread.run(Thread.java:748)
[error] 
[error] Driver stacktrace:
[error]     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
[error]     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
[error]     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
[error]     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[error]     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
[error]     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
[error]     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
[error]     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
[error]     at scala.Option.foreach(Option.scala:257)
[error]     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
[error]     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
[error]     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
[error]     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
[error]     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
[error]     at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
[error]     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
[error]     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
[error]     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
[error]     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958)
[error]     at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:917)
[error]     at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:915)
[error]     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
[error]     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
[error]     at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
[error]     at org.apache.spark.rdd.RDD.foreach(RDD.scala:915)
[error]     at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply$mcV$sp(Dataset.scala:2286)
[error]     at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2286)
[error]     at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2286)
[error]     at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
[error]     at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765)
[error]     at org.apache.spark.sql.Dataset.foreach(Dataset.scala:2285)
[error]     at spark_pkg.SparkMain$.main(SparkMain.scala:878)
[error]     at spark_pkg.SparkMain.main(SparkMain.scala)
[error]     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[error]     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[error]     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[error]     at java.lang.reflect.Method.invoke(Method.java:498)
[error] Caused by: java.lang.RuntimeException: Unsupported literal type class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema [1591772400000]
[error]     at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:75)
[error]     at org.apache.spark.sql.functions$.lit(functions.scala:101)
[error]     at org.apache.spark.sql.Column.$eq$eq$eq(Column.scala:267)
[error]     at spark_pkg.SparkMain$$anonfun$main$1.apply(SparkMain.scala:880)
[error]     at spark_pkg.SparkMain$$anonfun$main$1.apply(SparkMain.scala:878)
[error]     at scala.collection.Iterator$class.foreach(Iterator.scala:893)
[error]     at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
[error]     at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
[error]     at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
[error]     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
[error]     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
[error]     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
[error]     at org.apache.spark.scheduler.Task.run(Task.scala:99)
[error]     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
[error]     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[error]     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[error]     at java.lang.Thread.run(Thread.java:748)
[error] stack trace is suppressed; run 'last Compile / bgRun' for the full output
[error] Nonzero exit code: 1
[error] (Compile / run) Nonzero exit code: 1
[error] Total time: 137 s (02:17), completed Aug 20, 2020 1:16:02 PM
w51jfk4q

w51jfk4q1#

Dataframe不是迭代器,因此不能在其上运行for循环。您可以运行这样的程序—但我不认为它能实现您希望基于其他代码实现的功能。

unique_weeks.foreachPartition{ weeks : Iterator[YourData] => 

  for( week <- weeks) {

  }    
}

你的问题表明你关于什么是Dataframe以及spark是如何工作的心智模型并不完全。把Dataframe看作 List[List[YourData]] ,除了每个内部 List[YourData] 位于机器的独立部件上,不一定知道或与任何其他部件相互作用 List 直到你把它们拿回来交给司机。

相关问题