我是scala新手,最近开始使用spark和scala。我有一段代码,它只是读取和处理csv文件和行,它都在本地运行在我的笔记本电脑上。代码正常工作,突然停止工作。代码如下所示:
val spark = SparkSession.builder()
.appName("testApp")
.config("spark.master", "local")
.getOrCreate()
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sc = spark.sparkContext
val ERROR_UUID = new UUID(0,0)
// Read data from input path
val data = spark.read.format("csv")
.schema(inputSchema)
.load(inputPath)
.rdd
data.take(2).foreach(println)
val headerlessRDD = data
.map {
case Row(colval: String, colval2: String) => {
val colval_uuid: UUID =
Try({
UUID.fromString(colval)
}).recoverWith({
// Just log the exception and keep it as a failure.
case (ex: Throwable) => malformedRows.add(1); ex.printStackTrace; Failure(ex);
}).getOrElse(ERROR_UUID)
(colval_uuid, colval2_uuid, 45)
}
}.filter( x => x._1 != ERROR_UUID) // FILTER OUT MALFORMED UUID ROWS
val aggregatedRDD = headerlessRDD.repartition(100)
aggregatedRDD.top(5)
例外情况是:
Caused by: java.lang.NoSuchMethodError: net.jpountz.lz4.LZ4BlockInputStream.<init>(Ljava/io/InputStream;Z)V
at org.apache.spark.io.LZ4CompressionCodec.compressedInputStream(CompressionCodec.scala:122)
at org.apache.spark.serializer.SerializerManager.wrapForCompression(SerializerManager.scala:163)
at org.apache.spark.serializer.SerializerManager.wrapStream(SerializerManager.scala:124)
at org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$3.apply(BlockStoreShuffleReader.scala:50)
at org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$3.apply(BlockStoreShuffleReader.scala:50)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:453)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:64)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
at org.spark_project.guava.collect.Ordering.leastOf(Ordering.java:628)
at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$32.apply(RDD.scala:1478)
at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$32.apply(RDD.scala:1475)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:823)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:823)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
我使用的是spark 2.4.5,可以看到 lz4-java-1.4.0.jar
在 jars
目录。我在stackoverflow上读到了一些类似的问题,有人指出他们有同样的问题,但谈论Kafka,但我根本没有使用Kafka。另外,我正在使用intellij idea运行此代码。正如我之前所指出的,这是工作得很好,不确定,而它抛出了一个例外。值得一提的是,如果我采取行动,问题就会消失 repartition
分开(我不打算这么做)
暂无答案!
目前还没有任何答案,快来回答吧!