如何在pyspark/databricks中执行作业列表?

kt06eoxx  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(187)

环境:
databricks 6.4(包括apache spark 2.4.5、scala 2.11)
具有自动缩放和终止功能的高并发服务器
配置:

spark.dynamicAllocation.enabled true
spark.scheduler.mode FAIR
spark.databricks.delta.preview.enabled true
spark.shuffle.service.enabled true
spark.databricks.service.server.enabled true

如何在一个sparkcontext中从pyspark中的不同线程运行多个作业?
我没有得到任何错误,我没有得到任何计数
我想要实现什么:我想要提交一个作业列表(python方法/函数),我想要databricks并行运行这些作业。
下面是我使用的示例代码。我想传递一个包含并行执行的函数名的列表,即job1、job2和job3,它们都同时运行。

import datetime 
from pyspark.sql.types import StructType, IntegerType, DateType, StringType, StructField
current_time = datetime.datetime.now()
current_time = str(current_time)
print(current_time)

localPath = '/mnt'

def job1(inputText):
  path = localPath + '/Table{0}/'.format(inputText)
  print(path)
  mySchema = StructType([StructField("textValue1", StringType())])

  rddDF = sc.parallelize((
  {"textValue": current_time },\
  { "textValue": current_time  },\
  { "textValue": current_time}))
  new_df1 = sqlContext.createDataFrame(rddDF,mySchema1)
  new_df1 = new_df1.fillna(current_time +'1' )
  new_df1.repartition(1).write.format("parquet").mode("overwrite").save(path)

job1('1')

def job2(inputText):
  path = localPath + '/Table{0}/'.format(inputText)
  print(path)
  mySchema = StructType([StructField("textValue1", StringType())])

  rddDF = sc.parallelize((
  {"textValue": current_time },\
  { "textValue": current_time  },\
  { "textValue": current_time}))
  new_df1 = sqlContext.createDataFrame(rddDF,mySchema1)
  new_df1 = new_df1.fillna(current_time +'1' )
  new_df1.repartition(1).write.format("parquet").mode("overwrite").save(path)

job2('2')

def job3(inputText):
  path = localPath + '/Table{0}/'.format(inputText)
  print(path)
  mySchema = StructType([StructField("textValue1", StringType())])

  rddDF = sc.parallelize((
  {"textValue": current_time },\
  { "textValue": current_time  },\
  { "textValue": current_time}))
  new_df1 = sqlContext.createDataFrame(rddDF,mySchema1)
  new_df1 = new_df1.fillna(current_time +'1' )
  new_df1.repartition(1).write.format("parquet").mode("overwrite").save(path)

job3('3')

listOfJobs = ['job1', 'job2', 'job3']
print(listOfJobs)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题