Airflow Dbt通过Kubernetes Pod operator(Cloud Composer 2)

gcxthw6b  于 5个月前  发布在  Kubernetes
关注(0)|答案(1)|浏览(63)

我正在尝试通过Cloud Composer运行dbt作业。主要想法是使用kubernetes pod操作符来检索run dbt run。
我熟悉工作负载标识,但由于某种原因,我似乎无法运行我的dbt工作负载,因为一个错误:“无法生成访问令牌”。

from datetime import datetime, timedelta

from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator

with DAG(
        'airflow_k8_dbt_demo',
        # These args will get passed on to each operator
        # You can override them on a per-task basis during operator initialization
        default_args={
            'depends_on_past': False,
            'email': ['[email protected]'],
            'email_on_failure': False,
            'email_on_retry': False,
            'retries': 0,
            'retry_delay': timedelta(minutes=5)
        },
        description='A simple tutorial DAG',
        schedule_interval=timedelta(days=1),
        start_date=datetime(2022, 1, 1),
        catchup=False,
        tags=['GA4']
) as dag:
    dbt_run = KubernetesPodOperator(
        namespace="k8-executor3",  # Some new namespace i created
        service_account_name="composer3", # Some new kubernetes service account I created
        config_file= "/home/airflow/composer_kube_config",
        image="europe-west9-docker.pkg.dev/dataeng-sandbox-datapilot/composer-images-europe-west9-ga4k8podv4-4edb9f24-gke/airflow-k8-dbt-demo:1.0.1",
        cmds=["bash", "-cx"],
        arguments=["dbt run --project-dir dbt_k8_demo"],
        labels={"foo": "bar"},
        name="dbt-run-k8",
        task_id="run_dbt_job_on_k8_demo",
        image_pull_policy="Always",
        get_logs=True,
        dag=dag
    )

    dbt_run.dry_run()

字符串
我最初认为我遇到了工作负载身份的问题,但我遵循了这里包含的步骤,允许pod使用工作负载身份对Google Cloud API进行身份验证。当这不起作用时,我将命名空间更改为专用的“composer-user-workloads”命名空间,该命名空间显然可以访问Google Cloud Resources。
我已经检查了我的Composer环境是否启用了身份验证。
这是完整的错误消息:

*** Reading remote log from gs://europe-west9-ga4k8podv4-4edb9f24-bucket/logs/dag_id=airflow_k8_dbt_demo/run_id=manual__2023-12-11T18:08:33.155924+00:00/task_id=run_dbt_job_on_k8_demo/attempt=1.log.
[2023-12-11, 18:08:46 UTC] {taskinstance.py:1104} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: airflow_k8_dbt_demo.run_dbt_job_on_k8_demo manual__2023-12-11T18:08:33.155924+00:00 [queued]>
[2023-12-11, 18:08:46 UTC] {taskinstance.py:1104} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: airflow_k8_dbt_demo.run_dbt_job_on_k8_demo manual__2023-12-11T18:08:33.155924+00:00 [queued]>
[2023-12-11, 18:08:46 UTC] {taskinstance.py:1309} INFO - Starting attempt 1 of 1
[2023-12-11, 18:08:47 UTC] {taskinstance.py:1328} INFO - Executing <Task(KubernetesPodOperator): run_dbt_job_on_k8_demo> on 2023-12-11 18:08:33.155924+00:00
[2023-12-11, 18:08:47 UTC] {standard_task_runner.py:57} INFO - Started process 172376 to run task
[2023-12-11, 18:08:47 UTC] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'airflow_k8_dbt_demo', 'run_dbt_job_on_k8_demo', 'manual__2023-12-11T18:08:33.155924+00:00', '--job-id', '898', '--raw', '--subdir', 'DAGS_FOLDER/airflow_k8_dbt_demo.py', '--cfg-path', '/tmp/tmpg5fagstz']
[2023-12-11, 18:08:47 UTC] {standard_task_runner.py:85} INFO - Job 898: Subtask run_dbt_job_on_k8_demo
[2023-12-11, 18:08:47 UTC] {task_command.py:414} INFO - Running <TaskInstance: airflow_k8_dbt_demo.run_dbt_job_on_k8_demo manual__2023-12-11T18:08:33.155924+00:00 [running]> on host airflow-worker-r8zh5
[2023-12-11, 18:08:47 UTC] {taskinstance.py:1547} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='[email protected]' AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='airflow_k8_dbt_demo' AIRFLOW_CTX_TASK_ID='run_dbt_job_on_k8_demo' AIRFLOW_CTX_EXECUTION_DATE='2023-12-11T18:08:33.155924+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-12-11T18:08:33.155924+00:00'
[2023-12-11, 18:08:47 UTC] {pod.py:973} INFO - Building pod dbt-run-k8-mms8vkif with labels: {'dag_id': 'airflow_k8_dbt_demo', 'task_id': 'run_dbt_job_on_k8_demo', 'run_id': 'manual__2023-12-11T180833.1559240000-b8aec9c3b', 'kubernetes_pod_operator': 'True', 'try_number': '1'}
[2023-12-11, 18:08:48 UTC] {pod.py:548} INFO - Found matching pod dbt-run-k8-mms8vkif with labels {'airflow_kpo_in_cluster': 'False', 'airflow_version': '2.6.3-composer', 'dag_id': 'airflow_k8_dbt_demo', 'foo': 'bar', 'kubernetes_pod_operator': 'True', 'run_id': 'manual__2023-12-11T180833.1559240000-b8aec9c3b', 'task_id': 'run_dbt_job_on_k8_demo', 'try_number': '1'}
[2023-12-11, 18:08:48 UTC] {pod.py:549} INFO - `try_number` of task_instance: 1
[2023-12-11, 18:08:48 UTC] {pod.py:550} INFO - `try_number` of pod: 1
[2023-12-11, 18:08:48 UTC] {pod_manager.py:349} WARNING - Pod not yet started: dbt-run-k8-mms8vkif
[2023-12-11, 18:08:49 UTC] {pod_manager.py:349} WARNING - Pod not yet started: dbt-run-k8-mms8vkif
[2023-12-11, 18:08:50 UTC] {pod_manager.py:349} WARNING - Pod not yet started: dbt-run-k8-mms8vkif
[2023-12-11, 18:08:51 UTC] {pod_manager.py:349} WARNING - Pod not yet started: dbt-run-k8-mms8vkif
[2023-12-11, 18:08:52 UTC] {pod_manager.py:349} WARNING - Pod not yet started: dbt-run-k8-mms8vkif
[2023-12-11, 18:08:53 UTC] {pod_manager.py:349} WARNING - Pod not yet started: dbt-run-k8-mms8vkif
[2023-12-11, 18:09:01 UTC] {pod_manager.py:431} INFO - [base] + dbt run --project-dir dbt_k8_demo
[2023-12-11, 18:09:02 UTC] {pod_manager.py:431} INFO - [base] 18:09:01  target not specified in profile 'dbt_k8_demo', using 'default'
[2023-12-11, 18:09:03 UTC] {pod_manager.py:431} INFO - [base] 18:09:02  Running with dbt=1.0.4
[2023-12-11, 18:09:06 UTC] {pod_manager.py:431} INFO - [base] 18:09:03  Partial parse save file not found. Starting full parse.
[2023-12-11, 18:09:06 UTC] {pod_manager.py:431} INFO - [base] 18:09:06  Found 2 models, 4 tests, 0 snapshots, 0 analyses, 188 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics
[2023-12-11, 18:09:06 UTC] {pod_manager.py:431} INFO - [base] 18:09:06
[2023-12-11, 18:09:06 UTC] {pod_manager.py:431} INFO - [base] 18:09:06  Encountered an error:
[2023-12-11, 18:09:06 UTC] {pod_manager.py:431} INFO - [base] Runtime Error
[2023-12-11, 18:09:06 UTC] {pod_manager.py:431} INFO - [base]   Unable to generate access token, if you're using impersonate_service_account, make sure your initial account has the "roles/iam.serviceAccountTokenCreator" role on the account you are trying to impersonate.
2023-12-11T18:09:06.569109820Z   

[2023-12-11, 18:09:07 UTC] {pod_manager.py:444} INFO - [base]   ("Failed to retrieve http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/[email protected]/token?scopes=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fiam from the Google Compute Engine metadata service. Status: 404 Response:\nb'Unable to generate access token; IAM returned 404 Not Found: Not found; Gaia id not found for email [email protected]\\n'", <google.auth.transport.requests._Response object at 0x7ff528226d00>)

[2023-12-11, 18:09:07 UTC] {pod_manager.py:468} WARNING - Follow requested but pod log read interrupted and container base still running
[2023-12-11, 18:09:08 UTC] {pod_manager.py:431} INFO - [base] 18:09:06  Found 2 models, 4 tests, 0 snapshots, 0 analyses, 188 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics
[2023-12-11, 18:09:08 UTC] {pod_manager.py:431} INFO - [base] 18:09:06
[2023-12-11, 18:09:08 UTC] {pod_manager.py:431} INFO - [base] 18:09:06  Encountered an error:
[2023-12-11, 18:09:08 UTC] {pod_manager.py:431} INFO - [base] Runtime Error
[2023-12-11, 18:09:08 UTC] {pod_manager.py:431} INFO - [base]   Unable to generate access token, if you're using impersonate_service_account, make sure your initial account has the "roles/iam.serviceAccountTokenCreator" role on the account you are trying to impersonate.
2023-12-11T18:09:06.569109820Z   

[2023-12-11, 18:09:08 UTC] {pod_manager.py:444} INFO - [base]   ("Failed to retrieve http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/[email protected]/token?scopes=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fiam from the Google Compute Engine metadata service. Status: 404 Response:\nb'Unable to generate access token; IAM returned 404 Not Found: Not found; Gaia id not found for email [email protected]\\n'", <google.auth.transport.requests._Response object at 0x7ff528226d00>)

[2023-12-11, 18:09:08 UTC] {pod_manager.py:571} INFO - Pod dbt-run-k8-mms8vkif has phase Running
[2023-12-11, 18:09:10 UTC] {pod.py:837} INFO - Deleting pod: dbt-run-k8-mms8vkif
[2023-12-11, 18:09:11 UTC] {taskinstance.py:1826} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 592, in execute
    return self.execute_sync(context)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 632, in execute_sync
    self.cleanup(
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 765, in cleanup
    raise AirflowException(
airflow.exceptions.AirflowException: Pod dbt-run-k8-mms8vkif returned a failure.

tcbh2hod

tcbh2hod1#

与这类GCP/GCC项目一样,问题通常与工作负载标识有关。在此页面https://cloud.google.com/kubernetes-engine/docs/how-to/workload-identity中,我错误地将kubernetes service account注解为Google Service account,确切地说,我没有正确使用此命令

kubectl annotate serviceaccount KSA_NAME \
    --namespace NAMESPACE \
    iam.gke.io/gcp-service-account=GSA_NAME@GSA_PROJECT.iam.gserviceaccount.com

字符串
这导致了一个问题,即Kubernetes服务帐户没有权限执行我的BigQueries转换。

相关问题