Example DAGs

This section contains a few DAGs showing off some dbt pipelines to get you going.

Warning

All example DAGs are tested against a subset of Airflow versions. Some changes, like modifying import statements or changing types, may be required for them to work in environments running other versions of Airflow.

Basic DAG

This basic DAG shows off a single DbtRunOperator that executes daily:

basic_dag.py
 1"""Sample basic DAG which dbt runs a project."""
 2import datetime as dt
 3
 4from airflow import DAG
 5from airflow.utils.dates import days_ago
 6from airflow_dbt_python.operators.dbt import DbtRunOperator
 7
 8with DAG(
 9    dag_id="example_basic_dbt_run",
10    schedule_interval="0 * * * *",
11    start_date=days_ago(1),
12    catchup=False,
13    dagrun_timeout=dt.timedelta(minutes=60),
14) as dag:
15    dbt_run = DbtRunOperator(
16        task_id="dbt_run_hourly",
17        project_dir="/path/to/my/dbt/project/",
18        profiles_dir="~/.dbt/",
19        select=["+tag:hourly"],
20        exclude=["tag:deprecated"],
21        target="production",
22        profile="my-project",
23        full_refresh=False,
24    )

Run and Docs from S3

This DAG shows off a DbtRunOperator followed by a DbtDocsGenerateOperator. Both execute daily, and run from dbt project files available in an S3 URL:

dbt_project_in_s3_dag.py
 1"""Sample basic DAG which showcases a dbt project being pulled from S3."""
 2import datetime as dt
 3
 4from airflow import DAG
 5from airflow.utils.dates import days_ago
 6from airflow_dbt_python.operators.dbt import DbtDocsGenerateOperator, DbtRunOperator
 7
 8with DAG(
 9    dag_id="example_basic_dbt_run_with_s3",
10    schedule_interval="0 * * * *",
11    start_date=days_ago(1),
12    catchup=False,
13    dagrun_timeout=dt.timedelta(minutes=60),
14) as dag:
15    # Project files will be pulled from "s3://my-bucket/dbt/profiles/key/prefix/"
16    dbt_run = DbtRunOperator(
17        task_id="dbt_run_hourly",
18        project_dir="s3://my-bucket/dbt/project/key/prefix/",
19        profiles_dir="s3://my-bucket/dbt/profiles/key/prefix/",
20        select=["+tag:hourly"],
21        exclude=["tag:deprecated"],
22        target="production",
23        profile="my-project",
24        full_refresh=False,
25    )
26
27    # Documentation files (target/manifest.json, target/index.html, and
28    # target/catalog.json) will be pushed back to S3 after compilation is done.
29    dbt_docs = DbtDocsGenerateOperator(
30        task_id="dbt_run_hourly",
31        project_dir="s3://my-bucket/dbt/project/key/prefix/",
32        profiles_dir="s3://my-bucket/dbt/profiles/key/prefix/",
33    )
34
35    dbt_run >> dbt_docs

Complete dbt workflow

This DAG shows off a (almost) complete dbt workflow as it would be run from the CLI: we begin by running DbtSourceOperator to test the freshness of our source tables, DbtSeedOperator follows to load up any static data. Then, two instances of DbtRunOperator are created: one to handle incremental data, and the other one to run any non-incremental models. Finally, we run our tests to ensure our models remain correct.

complete_dbt_workflow_dag.py
 1"""Sample DAG showcasing a complete dbt workflow.
 2
 3The complete workflow includes a sequence of source, seed, and several run commands.
 4"""
 5import datetime as dt
 6
 7from airflow import DAG
 8from airflow.utils.dates import days_ago
 9from airflow_dbt_python.operators.dbt import (
10    DbtRunOperator,
11    DbtSeedOperator,
12    DbtSourceOperator,
13    DbtTestOperator,
14)
15
16with DAG(
17    dag_id="example_complete_dbt_workflow",
18    schedule_interval="0 * * * *",
19    start_date=days_ago(1),
20    catchup=False,
21    dagrun_timeout=dt.timedelta(minutes=60),
22) as dag:
23    dbt_source = DbtSourceOperator(
24        task_id="dbt_run_incremental_hourly",
25        project_dir="/path/to/my/dbt/project/",
26        profiles_dir="~/.dbt/",
27        target="production",
28        profile="my-project",
29        do_xcom_push_artifacts=["sources.json"],
30    )
31
32    dbt_seed = DbtSeedOperator(
33        task_id="dbt_seed",
34        project_dir="/path/to/my/dbt/project/",
35        profiles_dir="~/.dbt/",
36        target="production",
37        profile="my-project",
38    )
39
40    dbt_run_incremental = DbtRunOperator(
41        task_id="dbt_run_incremental_hourly",
42        project_dir="/path/to/my/dbt/project/",
43        profiles_dir="~/.dbt/",
44        select=["tag:hourly,config.materialized:incremental"],
45        exclude=["tag:deprecated"],
46        target="production",
47        profile="my-project",
48        full_refresh=False,
49    )
50
51    dbt_run = DbtRunOperator(
52        task_id="dbt_run_hourly",
53        project_dir="/path/to/my/dbt/project/",
54        profiles_dir="~/.dbt/",
55        select=["+tag:hourly"],
56        exclude=["tag:deprecated,config.materialized:incremental"],
57        target="production",
58        profile="my-project",
59        full_refresh=True,
60    )
61
62    dbt_test = DbtTestOperator(
63        task_id="dbt_test",
64        project_dir="/path/to/my/dbt/project/",
65        profiles_dir="~/.dbt/",
66        target="production",
67        profile="my-project",
68    )
69
70    dbt_source >> dbt_seed >> dbt_run_incremental >> dbt_run >> dbt_test

Using dbt artifacts

The following DAG showcases how to use dbt artifacts that are made available via XCom by airflow-dbt-python. A sample function calculates the longest running dbt model by pulling the artifacts that were generated after DbtRunOperator executes. We specify which dbt artifacts via the do_xcom_push_artifacts parameter.

use_dbt_artifacts_dag.py
 1"""Sample DAG to showcase pulling dbt artifacts from XCOM."""
 2import datetime as dt
 3
 4from airflow import DAG
 5from airflow.operators.python_operator import PythonOperator
 6from airflow.utils.dates import days_ago
 7from airflow_dbt_python.operators.dbt import DbtRunOperator
 8
 9
10def process_dbt_artifacts(**context):
11    """Report which model or models took the longest to compile and execute."""
12    run_results = context["ti"].xcom_pull(
13        key="run_results.json", task_ids="dbt_run_daily"
14    )
15    longest_compile = None
16    longest_execute = None
17
18    for result in run_results["results"]:
19        if result["status"] != "success":
20            continue
21
22    model_id = result["unique_id"]
23    for timing in result["timing"]:
24        duration = (
25            dt.datetime.strptime(
26                timing["started_at"], format="%Y-%m-%dT%H:%M:%S.%fZ"
27            )
28            - dt.datetime.strptime(
29                timing["completed_at"], format="%Y-%m-%dT%H:%M:%S.%fZ"
30            )
31        ).total_seconds()
32
33        if timing["name"] == "execute":
34            if longest_execute is None or duration > longest_execute[1]:
35                longest_execute = (model_id, duration)
36
37            elif timing["name"] == "compile":
38                if longest_compile is None or duration > longest_compile[1]:
39                    longest_compile = (model_id, duration)
40
41    print(
42        f"{longest_execute[0]} took the longest to execute with a time of "
43        f"{longest_execute[1]} seconds!"
44    )
45    print(
46        f"{longest_compile[0]} took the longest to compile with a time of "
47        f"{longest_compile[1]} seconds!"
48    )
49
50with DAG(
51    dag_id="example_dbt_artifacts",
52    schedule_interval="0 0 * * *",
53    start_date=days_ago(1),
54    catchup=False,
55    dagrun_timeout=dt.timedelta(minutes=60),
56) as dag:
57    dbt_run = DbtRunOperator(
58        task_id="dbt_run_daily",
59        project_dir="/path/to/my/dbt/project/",
60        profiles_dir="~/.dbt/",
61        select=["+tag:daily"],
62        exclude=["tag:deprecated"],
63        target="production",
64        profile="my-project",
65        full_refresh=True,
66        do_xcom_push_artifacts=["manifest.json", "run_results.json"],
67    )
68
69    process_artifacts = PythonOperator(
70        task_id="process_artifacts",
71        python_callable=process_dbt_artifacts,
72        provide_context=True,
73    )
74
75   dbt_run >> process_artifacts