spark代码,用于读取属性并定义记录器和多线程

wz3gfoph  于 2021-05-22  发布在  Spark
关注(0)|答案(1)|浏览(339)

下面是在线程中运行spark代码的代码我们可以并行运行spark任务下面的代码是使用线程运行的示例
sparkDataframe传递查询运行将来将有助于并行运行
添加了日志文件和属性文件以读取属性

package Test

import java.util.concurrent.Executors

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

import scala.io.Source.fromFile

import org.apache.log4j.Logger

object ParallerlExecution {
  val log = Logger.getLogger(getClass.getName)

  def main(args: Array[String]): Unit = {
    log.info("Start of program!!")
val queryList=loadFile()
    parallerlExecution(queryList)
log.info("End of program!!!")
  }

  def loadFile():List[String]={
    fromFile("").getLines().toList
  }

  def parallerlExecution(queryList:List[String]): Unit ={
    val spark=SparkSession.builder().appName("test").master("local[*]").getOrCreate()

    /*
    --properties-file  parallelprop.conf

    create file and put info like below
    spark.jdbc.url <jdbc url >
    spark.jdbc.username <user name for db>
    spark.jdbc.password <password for db>

     */
    val url=spark.sparkContext.getConf.get("spark.jdbc.url")
    val username=spark.sparkContext.getConf.get("spark.jdbc.username")
    val password=spark.sparkContext.getConf.get("spark.jdbc.password")

    val pool= Executors.newFixedThreadPool(3)

    for(query<-queryList){

      val r= new Runnable {
        override def run(): Unit = {

          val st = System.currentTimeMillis();

          val df = spark.read
            .format("jdbc")
            .option("url", "jdbc:postgresql:dbserver")
            .option("dbtable", query)
            .option("user", username)
            .option("password", password)
            .load()
          val count = df.count
          val et = System.currentTimeMillis();

          val resIntoHdfs=spark.sparkContext.parallelize( Seq(url,count))
          resIntoHdfs.coalesce(1).saveAsTextFile("hdfs path to write result example /user/abc/"+et)
          val rddOfDataframe = df.rdd.map(_.toString())
          val size = calcRDDSize(rddOfDataframe)
          val logInput="Thread" + Thread.currentThread().getId() + "  Record Count " + count + " StartTime " + st + " Endtime " + et +" Size: "+size+ " Query: " + query
          case class LogOut(value:String)
          import spark.implicits._
          val logDF=spark.sparkContext.parallelize( Seq(LogOut(logInput))).toDF
          logDF.coalesce(1).write.mode("append").save("hdfs path to save result example /home/abc/logsqlresult")
          println("Thread" + Thread.currentThread().getId() + "  Record Count " + count + " StartTime " + st + " Endtime " + et +" Size: "+size+ " Query: " + query)
log.info(logInput)
        }
      }
      pool.execute(r)

    }

    pool.shutdown()
  }
  def calcRDDSize(rdd: RDD[String]): Long = {
    rdd.map(_.getBytes("UTF-8").length.toLong)
      .reduce(_+_) //add the sizes together
  }
}

我们可以在多线程定义的线程池中运行spark,在循环中我们可以在并行模式下运行spark程序

f3temu5u

f3temu5u1#

package Test

import java.io.{BufferedReader, InputStreamReader}
import java.util.concurrent.Executors

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

import scala.io.Source.fromFile
import org.apache.log4j.Logger
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.alias.CredentialProviderFactory
import java.util.Properties

import org.apache.hadoop.fs.FSInputStream
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

object ParallerlExecution {
  val log = Logger.getLogger(getClass.getName)

  def main(args: Array[String]): Unit = {

    val hdfsFilePath=args(0)
    log.info("Start of program!!")
val queryList=load_file("")
    parallerlExecution(queryList,hdfsFilePath)
log.info("End of program!!!")
  }

 /* def loadFile():List[String]={
    fromFile("").getLines().toList
  }

* /

  def load_file(path:String)={
    val pt=new Path(path)
    val fs = FileSystem.get(new Configuration())
    val br=new BufferedReader(new InputStreamReader(fs.open(pt)))
    var res:List[String]=  List()
    try {

      var line=br.readLine()
      while (line != null){
        System.out.println(line);

        res= res :+ line
        line=br.readLine()
      }
    } finally {
      // you should close out the BufferedReader
      br.close();
    }

    res
  }
  def parallerlExecution(queryList:List[String],hdfsFilePath:String): Unit ={
    val spark=SparkSession.builder().appName("test").master("local[*]").getOrCreate()

    /*
    --properties-file  parallelprop.conf

    create file and put info like below
    spark.jdbc.url <jdbc url >
    spark.jdbc.username <user name for db>
    spark.jdbc.password <password for db>

     */

    /*val url=spark.sparkContext.getConf.get("spark.jdbc.url")
    val username=spark.sparkContext.getConf.get("spark.jdbc.username")
    val jecksProvider=spark.sparkContext.getConf.get("spark.jecks.provider")
    val passwordAlial=spark.sparkContext.getConf.get("spark.password.alias")*/

    val prop=readHdfsFile(hdfsFilePath)
    val jecksProvider=prop.getProperty("jeck-provider")
    val passwordAlial=prop.getProperty("password-alias")
    val url=prop.getProperty("url")
    val username=prop.getProperty("username")

    val password=extractPwdFromJceks(jecksProvider,passwordAlial)
    val pool= Executors.newFixedThreadPool(3)

    for(query<-queryList){

      val r= new Runnable {
        override def run(): Unit = {

          val st = System.currentTimeMillis();
          val tableName=""
          val df = spark.read
            .format("jdbc")
            .option("url", "jdbc:postgresql:dbserver")
            .option("dbtable", query)
            .option("user", username)
            .option("password", password)
            .load()
          val count = df.count
          val et = System.currentTimeMillis();

          val resIntoHdfs=spark.sparkContext.parallelize( Seq(url,count))
          resIntoHdfs.coalesce(1).saveAsTextFile("hdfs path to write result example /user/abc/"+et)
          val rddOfDataframe = df.rdd.map(_.toString())
          val size = calcRDDSize(rddOfDataframe)
          val logInput="Thread" + Thread.currentThread().getId() + "  Record Count " + count + " StartTime " + st + " Endtime " + et +" Size: "+size+ " Query: " + query
          case class LogOut(value:String)
          import spark.implicits._
          case class Out(tableName:String,sizeOfData:String)
           df.write.mode("overwrite").save("<path hdfs>"+tableName)

          val sizeOfData=getFileSizeByPath("<path hdfs>"+tableName)

          val outDF=spark.sparkContext.parallelize( Seq(Out(tableName,sizeOfData.toString))).toDF

          outDF.coalesce(1).write.mode("append").csv("hdfs path to save result ")

          println("Thread" + Thread.currentThread().getId() + "  Record Count " + count + " StartTime " + st + " Endtime " + et +" Size: "+size+ " Query: " + query)
log.info(logInput)
        }
      }
      pool.execute(r)

    }

    pool.shutdown()
  }
  def calcRDDSize(rdd: RDD[String]): Long = {
    rdd.map(_.getBytes("UTF-8").length.toLong)
      .reduce(_+_) //add the sizes together
  }

  def extractPwdFromJceks(jceksfile:String, password_alias:String):String = {
    val conf:Configuration = new Configuration()
    conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, jceksfile)

    conf.getPassword(password_alias).mkString("")

  }

  def readHdfsFile(path:String):Properties={
    val prop=new Properties()
    val fis=FileSystem.get(new Configuration()).open(new Path(path))
    prop.load(fis)
    prop

  }

  private def getFileSizeByPath(filePath : String): Long = {
    val path = new Path(filePath)
    val hdfs = path.getFileSystem(new Configuration())
    val cSummary = hdfs.getContentSummary(path)
    val length = cSummary.getLength
    length
  }

}

相关问题