Example DAGs
This section contains a few DAGs showing off some dbt pipelines to get you going.
Warning
All example DAGs are tested against a subset of Airflow versions. Some changes, like modifying import statements or changing types, may be required for them to work in environments running other versions of Airflow.
Basic DAG
This basic DAG shows off a single DbtRunOperator that executes daily:
1"""Sample basic DAG which dbt runs a project."""
2import datetime as dt
3
4from airflow import DAG
5from airflow.utils.dates import days_ago
6from airflow_dbt_python.operators.dbt import DbtRunOperator
7
8with DAG(
9 dag_id="example_basic_dbt_run",
10 schedule_interval="0 * * * *",
11 start_date=days_ago(1),
12 catchup=False,
13 dagrun_timeout=dt.timedelta(minutes=60),
14) as dag:
15 dbt_run = DbtRunOperator(
16 task_id="dbt_run_hourly",
17 project_dir="/path/to/my/dbt/project/",
18 profiles_dir="~/.dbt/",
19 select=["+tag:hourly"],
20 exclude=["tag:deprecated"],
21 target="production",
22 profile="my-project",
23 full_refresh=False,
24 )
Run and Docs from S3
This DAG shows off a DbtRunOperator followed by a DbtDocsGenerateOperator. Both execute daily, and run from dbt project files available in an S3 URL:
1"""Sample basic DAG which showcases a dbt project being pulled from S3."""
2import datetime as dt
3
4from airflow import DAG
5from airflow.utils.dates import days_ago
6from airflow_dbt_python.operators.dbt import DbtDocsGenerateOperator, DbtRunOperator
7
8with DAG(
9 dag_id="example_basic_dbt_run_with_s3",
10 schedule_interval="0 * * * *",
11 start_date=days_ago(1),
12 catchup=False,
13 dagrun_timeout=dt.timedelta(minutes=60),
14) as dag:
15 # Project files will be pulled from "s3://my-bucket/dbt/profiles/key/prefix/"
16 dbt_run = DbtRunOperator(
17 task_id="dbt_run_hourly",
18 project_dir="s3://my-bucket/dbt/project/key/prefix/",
19 profiles_dir="s3://my-bucket/dbt/profiles/key/prefix/",
20 select=["+tag:hourly"],
21 exclude=["tag:deprecated"],
22 target="production",
23 profile="my-project",
24 full_refresh=False,
25 )
26
27 # Documentation files (target/manifest.json, target/index.html, and
28 # target/catalog.json) will be pushed back to S3 after compilation is done.
29 dbt_docs = DbtDocsGenerateOperator(
30 task_id="dbt_run_hourly",
31 project_dir="s3://my-bucket/dbt/project/key/prefix/",
32 profiles_dir="s3://my-bucket/dbt/profiles/key/prefix/",
33 )
34
35 dbt_run >> dbt_docs
Complete dbt workflow
This DAG shows off a (almost) complete dbt workflow as it would be run from the CLI: we begin by running DbtSourceOperator to test the freshness of our source tables, DbtSeedOperator follows to load up any static data. Then, two instances of DbtRunOperator are created: one to handle incremental data, and the other one to run any non-incremental models. Finally, we run our tests to ensure our models remain correct.
1"""Sample DAG showcasing a complete dbt workflow.
2
3The complete workflow includes a sequence of source, seed, and several run commands.
4"""
5import datetime as dt
6
7from airflow import DAG
8from airflow.utils.dates import days_ago
9from airflow_dbt_python.operators.dbt import (
10 DbtRunOperator,
11 DbtSeedOperator,
12 DbtSourceOperator,
13 DbtTestOperator,
14)
15
16with DAG(
17 dag_id="example_complete_dbt_workflow",
18 schedule_interval="0 * * * *",
19 start_date=days_ago(1),
20 catchup=False,
21 dagrun_timeout=dt.timedelta(minutes=60),
22) as dag:
23 dbt_source = DbtSourceOperator(
24 task_id="dbt_run_incremental_hourly",
25 project_dir="/path/to/my/dbt/project/",
26 profiles_dir="~/.dbt/",
27 target="production",
28 profile="my-project",
29 do_xcom_push_artifacts=["sources.json"],
30 )
31
32 dbt_seed = DbtSeedOperator(
33 task_id="dbt_seed",
34 project_dir="/path/to/my/dbt/project/",
35 profiles_dir="~/.dbt/",
36 target="production",
37 profile="my-project",
38 )
39
40 dbt_run_incremental = DbtRunOperator(
41 task_id="dbt_run_incremental_hourly",
42 project_dir="/path/to/my/dbt/project/",
43 profiles_dir="~/.dbt/",
44 select=["tag:hourly,config.materialized:incremental"],
45 exclude=["tag:deprecated"],
46 target="production",
47 profile="my-project",
48 full_refresh=False,
49 )
50
51 dbt_run = DbtRunOperator(
52 task_id="dbt_run_hourly",
53 project_dir="/path/to/my/dbt/project/",
54 profiles_dir="~/.dbt/",
55 select=["+tag:hourly"],
56 exclude=["tag:deprecated,config.materialized:incremental"],
57 target="production",
58 profile="my-project",
59 full_refresh=True,
60 )
61
62 dbt_test = DbtTestOperator(
63 task_id="dbt_test",
64 project_dir="/path/to/my/dbt/project/",
65 profiles_dir="~/.dbt/",
66 target="production",
67 profile="my-project",
68 )
69
70 dbt_source >> dbt_seed >> dbt_run_incremental >> dbt_run >> dbt_test
Using dbt artifacts
The following DAG showcases how to use dbt artifacts that are made available via XCom by airflow-dbt-python. A sample function calculates the longest running dbt model by pulling the artifacts that were generated after DbtRunOperator executes. We specify which dbt artifacts via the do_xcom_push_artifacts parameter.
1"""Sample DAG to showcase pulling dbt artifacts from XCOM."""
2import datetime as dt
3
4from airflow import DAG
5from airflow.operators.python_operator import PythonOperator
6from airflow.utils.dates import days_ago
7from airflow_dbt_python.operators.dbt import DbtRunOperator
8
9
10def process_dbt_artifacts(**context):
11 """Report which model or models took the longest to compile and execute."""
12 run_results = context["ti"].xcom_pull(
13 key="run_results.json", task_ids="dbt_run_daily"
14 )
15 longest_compile = None
16 longest_execute = None
17
18 for result in run_results["results"]:
19 if result["status"] != "success":
20 continue
21
22 model_id = result["unique_id"]
23 for timing in result["timing"]:
24 duration = (
25 dt.datetime.strptime(
26 timing["started_at"], format="%Y-%m-%dT%H:%M:%S.%fZ"
27 )
28 - dt.datetime.strptime(
29 timing["completed_at"], format="%Y-%m-%dT%H:%M:%S.%fZ"
30 )
31 ).total_seconds()
32
33 if timing["name"] == "execute":
34 if longest_execute is None or duration > longest_execute[1]:
35 longest_execute = (model_id, duration)
36
37 elif timing["name"] == "compile":
38 if longest_compile is None or duration > longest_compile[1]:
39 longest_compile = (model_id, duration)
40
41 print(
42 f"{longest_execute[0]} took the longest to execute with a time of "
43 f"{longest_execute[1]} seconds!"
44 )
45 print(
46 f"{longest_compile[0]} took the longest to compile with a time of "
47 f"{longest_compile[1]} seconds!"
48 )
49
50with DAG(
51 dag_id="example_dbt_artifacts",
52 schedule_interval="0 0 * * *",
53 start_date=days_ago(1),
54 catchup=False,
55 dagrun_timeout=dt.timedelta(minutes=60),
56) as dag:
57 dbt_run = DbtRunOperator(
58 task_id="dbt_run_daily",
59 project_dir="/path/to/my/dbt/project/",
60 profiles_dir="~/.dbt/",
61 select=["+tag:daily"],
62 exclude=["tag:deprecated"],
63 target="production",
64 profile="my-project",
65 full_refresh=True,
66 do_xcom_push_artifacts=["manifest.json", "run_results.json"],
67 )
68
69 process_artifacts = PythonOperator(
70 task_id="process_artifacts",
71 python_callable=process_dbt_artifacts,
72 provide_context=True,
73 )
74
75 dbt_run >> process_artifacts