无法访问sparksubmitoperator-dag中的jinja模板{dag\u run.conf[“spark\u binary”]}}

pexxcrt2  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(267)
from datetime import timedelta, datetime
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable
from airflow.utils.dates import days_ago

args1 = {
    'owner': 'Airflow',
    'start_date': days_ago(2),
}

dag = DAG(
    dag_id='anantha-dag',
    default_args=args1,
    schedule_interval='0 0 * * *',
    dagrun_timeout=timedelta(minutes=60),
    tags=['ananthaPOC']
)

spark_submit_operator = SparkSubmitOperator(
    task_id='spark_submit_job',
    dag=dag,
    application='/opt/cloudera/parcels/SPARK2/lib/spark2/examples/jars/spark-examples_2.11-2.4.0.cloudera2.jar',
    application_args=[ '10' ],
    spark_binary="{{ dag_run.conf["spark_binary_1"] if dag_run else "" }}",
    java_class="org.apache.spark.examples.SparkPi",
  **kwargs
    )

bash_task = BashOperator(
    task_id="bash_task",
    bash_command='echo "Here is the message: \'{{ dag_run.conf["spark_binary_1"] if dag_run else "" }}\'"',
    dag=dag,
)
bash_task >> spark_submit_operator

我有一个示例dag和jinja模板在bash操作符中工作,但在spark submit操作符中没有。下面是错误:

spark_binary={{ dag_run.conf["spark_binary_1"] if dag_run else "" }},
NameError: name 'dag_run' is not defined

我正在通过cli命令调用dag:aiffolTrigger\u dag anantha dag--conf“{{'spark\u binary\u 1':'spark2 submit'}”

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题