如何从spark将数据放入ignite缓存

yws3nbqq  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(316)

我管理了几个spark任务来计算rdd,最后,我想把其中的一些数据放到ignite缓存中。不幸的是,我有一个错误:

Java.lang.ClassCastException: org.apache.ignite.internal.processors.cache.IgniteCacheProxyImpl cannot be cast to org.apache.ignite.internal.processors.cache.GatewayProtectedCacheProxy
[info]  at org.apache.ignite.internal.processors.cache.GatewayProtectedCacheProxy.equals(GatewayProtectedCacheProxy.java:1715)
[info]  at scala.collection.mutable.FlatHashTable$class.findElemImpl(FlatHashTable.scala:131)
[info]  at scala.collection.mutable.FlatHashTable$class.containsElem(FlatHashTable.scala:124)
[info]  at scala.collection.mutable.HashSet.containsElem(HashSet.scala:40)
[info]  at scala.collection.mutable.HashSet.contains(HashSet.scala:57)
[info]  at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:87)
[info]  at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitExternalizable(SerializationDebugger.scala:142)
[info]  at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:104)
[info]  at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:206)
[info]  at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:108)
[info]  at org.apache.spark.serializer.SerializationDebugger$.find(SerializationDebugger.scala:67)
[info]  at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
[info]  at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
[info]  at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
[info]  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
[info]  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
[info]  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
[info]  at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
[info]  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:371)
[info]  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
[info]  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
[info]  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
[info]  at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
[info]  at org.apache.spark.rdd.RDD.map(RDD.scala:370).........

所以我的问题是如何将来自sparkrdd的数据放在特定的ignite缓存中,在我们的例子中,是一个在postgres上实现了第三个持久性缓存存储的ignite缓存?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题