环境:
databricks 6.4(包括apache spark 2.4.5、scala 2.11)
具有自动缩放和终止功能的高并发服务器
配置:
spark.dynamicAllocation.enabled true
spark.scheduler.mode FAIR
spark.databricks.delta.preview.enabled true
spark.shuffle.service.enabled true
spark.databricks.service.server.enabled true
如何在一个sparkcontext中从pyspark中的不同线程运行多个作业?
我没有得到任何错误,我没有得到任何计数
我想要实现什么:我想要提交一个作业列表(python方法/函数),我想要databricks并行运行这些作业。
下面是我使用的示例代码。我想传递一个包含并行执行的函数名的列表,即job1、job2和job3,它们都同时运行。
import datetime
from pyspark.sql.types import StructType, IntegerType, DateType, StringType, StructField
current_time = datetime.datetime.now()
current_time = str(current_time)
print(current_time)
localPath = '/mnt'
def job1(inputText):
path = localPath + '/Table{0}/'.format(inputText)
print(path)
mySchema = StructType([StructField("textValue1", StringType())])
rddDF = sc.parallelize((
{"textValue": current_time },\
{ "textValue": current_time },\
{ "textValue": current_time}))
new_df1 = sqlContext.createDataFrame(rddDF,mySchema1)
new_df1 = new_df1.fillna(current_time +'1' )
new_df1.repartition(1).write.format("parquet").mode("overwrite").save(path)
job1('1')
def job2(inputText):
path = localPath + '/Table{0}/'.format(inputText)
print(path)
mySchema = StructType([StructField("textValue1", StringType())])
rddDF = sc.parallelize((
{"textValue": current_time },\
{ "textValue": current_time },\
{ "textValue": current_time}))
new_df1 = sqlContext.createDataFrame(rddDF,mySchema1)
new_df1 = new_df1.fillna(current_time +'1' )
new_df1.repartition(1).write.format("parquet").mode("overwrite").save(path)
job2('2')
def job3(inputText):
path = localPath + '/Table{0}/'.format(inputText)
print(path)
mySchema = StructType([StructField("textValue1", StringType())])
rddDF = sc.parallelize((
{"textValue": current_time },\
{ "textValue": current_time },\
{ "textValue": current_time}))
new_df1 = sqlContext.createDataFrame(rddDF,mySchema1)
new_df1 = new_df1.fillna(current_time +'1' )
new_df1.repartition(1).write.format("parquet").mode("overwrite").save(path)
job3('3')
listOfJobs = ['job1', 'job2', 'job3']
print(listOfJobs)
暂无答案!
目前还没有任何答案,快来回答吧!