如何通过容器化livy提交spark作业

niknxzdl  于 2021-07-09  发布在  Spark
关注(0)|答案(0)|浏览(226)

我使用下面的repo运行spark(2.4.7)和livy(0.7)。
repo上显示的curl命令工作正常,似乎一切都正常。
我编写了一个简单的单词计数maven spark java程序,并使用livy客户机通过livy将其作为spark作业提交。
我的java字数计算:

package spark;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.regex.Pattern;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.livy.Job;
import org.apache.livy.JobContext;
import scala.Tuple2;

public final class JavaWordCount implements Job<Double> {

    private static final long serialVersionUID = 4870271814150948504L;
    private static final Pattern SPACE = Pattern.compile(" ");

    @Override
    public Double call(JobContext ctx) throws Exception {
        count(ctx);
        return 0.7;
    }

    public void count(JobContext ctx){

        //JavaRDD<String> lines = ctx.textFile(args[0], 1);
        JavaRDD<String> lines = ctx.sc().parallelize(Arrays.asList("It is close to midnight and something evil is lurking in the dark".split(" ")));

        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) {
                return Arrays.asList(SPACE.split(s)).iterator();
            }
        });

        JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) {
                return new Tuple2<>(s, 1);
            }
        });

        JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer i1, Integer i2) {
                return i1 + i2;
            }
        });

        List<Tuple2<String, Integer>> output = counts.collect();
        for (Tuple2<?, ?> tuple : output) {
            System.out.println(tuple._1() + ": " + tuple._2());
        }
    }
}

我的利维客户:

package spark;

import org.apache.livy.Job;
import org.apache.livy.LivyClient;
import org.apache.livy.LivyClientBuilder;

import java.io.File;
import java.net.URI;

public final class SubmitJob {

    private static final String livyURI = "http://localhost:8998/";
    private static final String JAR_PATH = "/Users/.../spark-word-count/target/word-count-0.1-SNAPSHOT.jar";

    public static void main(String[] args) throws Exception {
        LivyClient livyClient = new LivyClientBuilder()
        .setURI(new URI(livyURI)).build();

        try {
            System.err.printf("Uploading %s to the Spark context...\n", JAR_PATH);
            livyClient.uploadJar(new File(JAR_PATH));

            System.err.printf("Running JavaWordCount...\n", JAR_PATH);
            double pi = livyClient.submit(new JavaWordCount()).get();

            System.out.println("Pi is roughly: " + pi);
        } finally {
            livyClient.stop(true);
        }

    }
}

运行客户端时,出现以下错误:
线程“main”java.util.concurrent.executionexception:java.lang.runtimeexception:org.apache.livy.shaded.kryo.kryo.kryoexception:在org.apache.livy.shaded.kryo.kryo.util.defaultclassresolver.readname(defaultclassresolver)中找不到类:spark.javawordcount。java:138)在org.apache.livy.shaded.kryo.kryo.util.defaultclassresolver.readclass(defaultclassresolver。java:115)
线程“main”java.util.concurrent.executionexception:java.lang.runtimeexception:org.apache.livy.shaded.kryo.kryo.kryoexception:在org.apache.livy.shaded.kryo.kryo.util.defaultclassresolver.readname(defaultclassresolver)中找不到类:spark.javawordcount。java:138)在org.apache.livy.shaded.kryo.kryo.util.defaultclassresolver.readclass(defaultclassresolver。java:115)在org.apache.livy.shaded.kryo.kryo.kryo.readclass(kryo。java:656)在org.apache.livy.shaded.kryo.kryo.kryo.readclassandobject(kryo。java:767)在org.apache.livy.client.common.serializer.deserialize(serializer。java:63)在org.apache.livy.rsc.driver.bypassjob.call(bypassjob。java:39)在org.apache.livy.rsc.driver.bypassjob.call(bypassjob。java:27)在org.apache.livy.rsc.driver.jobwrapper.call(jobwrapper。java:64)在org.apache.livy.rsc.driver.bypassjobwrapper.call(bypassjobwrapper。java:45)在org.apache.livy.rsc.driver.bypassjobwrapper.call(bypassjobwrapper。java:27)在java.util.concurrent.futuretask.run(futuretask。java:266)位于java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1149)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:624)在java.lang.thread.run(线程。java:748)原因:java.lang.classnotfoundexception:spark.javawordcount
如何解决此错误?
我读到可能是因为livy.file.local-dir-whitelist。我的livy conf白名单如下:
livy.file.local-dir-whitelist=/
我尝试将jar上传到livy容器并将其放在“/”下,并在我的客户机上更改了jar\u path=“/word-count-0.1-snapshot.jar”。我得到同样的错误。。。
如何提交我的jar?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题