如何重写SparkScala代码以在ApacheLivy中使用它

ngynwnxp  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(371)

我重写了这段代码:

import org.apache.spark.sql.SparkSession

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "file:///root/spark/README.md"
    val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
    val logData = spark.read.textFile(logFile).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println(s"Lines with a: $numAs, Lines with b: $numBs")
    spark.stop()
  }
}

对此:

import org.apache.livy._
import org.apache.spark.sql.SparkSession

class Test extends Job[Int]{

  override def call(jc: JobContext): Int = {

    val spark = jc.sparkSession()

    val logFile = "file:///root/spark/README.md"
    val logData = spark.read.textFile(logFile).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println(s"Lines with a: $numAs, Lines with b: $numBs")

    1 //Return value
  }
}

但是当用sbt编译它时,val spark没有正确识别,我收到错误“value read is not a member of nothing”
另外,在注解spark相关代码之后,当我尝试使用/batches运行结果jar文件时,我收到错误“java.lang.nosuchmethodexception:test.main([ljava.lang.string;)”
请问任何人都可以用正确的spark scala代码重写方式吗?

disho6za

disho6za1#

使用livy不需要重写spark应用程序。相反,您可以使用它的rest接口在具有运行livy服务器的集群上提交作业、检索日志、获取作业状态等。
作为一个实际的例子,下面是在aws上运行应用程序的说明。
设置:
使用aws emr创建一个spark集群,该集群包含spark、livy和您的应用程序所需的任何其他预装应用程序。
将jar上传到awss3。
确保连接到集群的安全组有一个入站规则,该规则将端口8998(livy的端口)上的ip列为白名单。
确保集群可以访问s3以获取jar。
现在,您可以使用curl(或任何等效工具)发出post请求来提交您的申请:

curl -H "Content-Type: application/json" -X POST --data '{"className":"<your-package-name>.SimpleApp","file":"s3://<path-to-your-jar>"}' http://<cluster-domain-name>:8998/batches

相关问题