下面是在线程中运行spark代码的代码我们可以并行运行spark任务下面的代码是使用线程运行的示例
sparkDataframe传递查询运行将来将有助于并行运行
添加了日志文件和属性文件以读取属性
package Test
import java.util.concurrent.Executors
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import scala.io.Source.fromFile
import org.apache.log4j.Logger
object ParallerlExecution {
val log = Logger.getLogger(getClass.getName)
def main(args: Array[String]): Unit = {
log.info("Start of program!!")
val queryList=loadFile()
parallerlExecution(queryList)
log.info("End of program!!!")
}
def loadFile():List[String]={
fromFile("").getLines().toList
}
def parallerlExecution(queryList:List[String]): Unit ={
val spark=SparkSession.builder().appName("test").master("local[*]").getOrCreate()
/*
--properties-file parallelprop.conf
create file and put info like below
spark.jdbc.url <jdbc url >
spark.jdbc.username <user name for db>
spark.jdbc.password <password for db>
*/
val url=spark.sparkContext.getConf.get("spark.jdbc.url")
val username=spark.sparkContext.getConf.get("spark.jdbc.username")
val password=spark.sparkContext.getConf.get("spark.jdbc.password")
val pool= Executors.newFixedThreadPool(3)
for(query<-queryList){
val r= new Runnable {
override def run(): Unit = {
val st = System.currentTimeMillis();
val df = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", query)
.option("user", username)
.option("password", password)
.load()
val count = df.count
val et = System.currentTimeMillis();
val resIntoHdfs=spark.sparkContext.parallelize( Seq(url,count))
resIntoHdfs.coalesce(1).saveAsTextFile("hdfs path to write result example /user/abc/"+et)
val rddOfDataframe = df.rdd.map(_.toString())
val size = calcRDDSize(rddOfDataframe)
val logInput="Thread" + Thread.currentThread().getId() + " Record Count " + count + " StartTime " + st + " Endtime " + et +" Size: "+size+ " Query: " + query
case class LogOut(value:String)
import spark.implicits._
val logDF=spark.sparkContext.parallelize( Seq(LogOut(logInput))).toDF
logDF.coalesce(1).write.mode("append").save("hdfs path to save result example /home/abc/logsqlresult")
println("Thread" + Thread.currentThread().getId() + " Record Count " + count + " StartTime " + st + " Endtime " + et +" Size: "+size+ " Query: " + query)
log.info(logInput)
}
}
pool.execute(r)
}
pool.shutdown()
}
def calcRDDSize(rdd: RDD[String]): Long = {
rdd.map(_.getBytes("UTF-8").length.toLong)
.reduce(_+_) //add the sizes together
}
}
我们可以在多线程定义的线程池中运行spark,在循环中我们可以在并行模式下运行spark程序
1条答案
按热度按时间f3temu5u1#