neo4j/apachespark任务不可序列化

xqk2d5yq  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(378)

我的目标是将apachespark中作为rdd加载的一些文档加载到neo4j图中。问题似乎出在创建这样的节点上,因为eclipse报告任务不可序列化 Exception in thread "main" org.apache.spark.SparkException: Task not serializable 我的代码如下:

public class Spark {

    public static void createRdd() {

        SparkSession spark = SparkSession.builder()
                .master("local")
                .appName("MongoSparkConnector")
                .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/TheFoodPlanner.join")           
                .getOrCreate();

            JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());

            JavaMongoRDD<Document> rddRecipes = MongoSpark.load(jsc);

            Driver driver = GraphDatabase.driver("bolt://localhost:7687", AuthTokens.basic("neo4j", "estrella100"));
            try (Session session = driver.session()) {
                rddRecipes.foreach(f -> {
                    String id = f.getString("id");
                    session.run("CREATE (n: Recipe {id:'" + id + "'})");            
                });             
                session.close();
            }

            driver.close();
            jsc.close();

    }
}

错误出现在行中 session.run("CREATE (n: Recipe {id:'" + id + "'})"); 以并行方式加载节点的正确方法是什么?

btqmn9zl

btqmn9zl1#

问题是,您将来自mongo的基于spark的数据连接器与neo4j的“传统”java连接器混为一谈。在这种情况下,通常的问题是,如果没有额外的代码来处理在多个执行器上运行的代码,那么这些驱动程序就不能优化为与spark一起使用—这里的一个常见问题是 Driver 仅在驱动程序节点上打开连接,而不在执行器上打开连接。
对于您的情况,您有两种可能:
使用neo4j现有的spark连接器-您需要将mongo中的数据转换为neo4j连接器可以使用的格式-它可以比“手动”解决方案更优化,而且更易于使用
继续使用 foreach ,但您需要更改方法-驱动程序的初始化应该在每个执行器上进行-最简单的方法是使用类似的方法(伪代码,未测试)-这将使每个分区由持有该分区的执行器处理-在这种情况下,每个执行器都有自己到neo4j的连接:

rddRecipes.foreachPartition { partitionOfRecords =>
    Driver driver = GraphDatabase.driver("bolt://localhost:7687", 
           AuthTokens.basic("neo4j", "estrella100"));
    try (Session session = driver.session()) {
        partitionOfRecords.foreach(f -> {
          String id = f.getString("id");
          session.run("CREATE (n: Recipe {id:'" + id + "'})");            
          });             
        session.close();
    }
}

相关问题