Getting started
This section guides you on installing airflow-dbt-python and getting your first dbt workflow DAG running.
Requirements
Before using airflow-dbt-python, ensure you meet the following requirements: * A dbt project using dbt-core version 1.8 or later. * An Airflow environment using version 3.0 or later.
If using any managed Airflow service, like AWS MWAA or GCP Cloud Composer, ensure your environment is created with a supported version of Airflow.
If self-hosting, Airflow installation instructions can be found in their official documentation.
Python 3.10 or later.
Warning
New versions of Airflow and dbt may introduce breaking changes. We recommend testing any new versions of Airflow and dbt before upgrading production systems; Please report any issues that may arise during testing so they can be addressed.
Note
We only test airflow-dbt-python against a limited set of versions of Airflow and dbt, and try to keep up with the latest releases. For Airflow, our policy is to cover with tests the latest release of Airflow, the latest version available in GCP Cloud Composer, and the latest version available in AWS MWAA. For dbt, our policy is to cover the last two minor versions.
Installation
Your installation will vary according to your specific Airflow environment setup. These instructions cover installing from PyPI, building the GitHub repository directly, installing in AWS MWAA, and installing in GCP Cloud Composer. Other serviced offerings may require different steps, check the documentation of your managed service.
From PyPI
airflow-dbt-python is available in PyPI and can be installed with pip:
pip install airflow-dbt-python
As a convenience, some dbt adapters can be installed by specifying extras. For example, if requiring the dbt-redshift adapter:
pip install airflow-dbt-python[redshift]
Or dbt-snowflake:
pip install airflow-dbt-python[snowflake]
Note
These dbt adapter extras are provided as a convenience. Any required dbt adapters can also be installed separatedly. Refer to the dbt documentation for a list of supported adapters and how to install them.
Building from source
airflow-dbt-python can also be built from source by cloning the GitHub repository:
git clone https://github.com/tomasfarias/airflow-dbt-python.git
cd airflow-dbt-python
And build with uv:
uv build
In AWS MWAA
Add airflow-dbt-python to your requirements.txt file and edit your Airflow environment to use this new requirements.txt file, or upload it as a plugin.
To do so, include airflow-dbt-python in the requirements.txt file provided to AWS MWAA, for example:
airflow-dbt-python[redshift,s3]
Installs airflow-dbt-python, dbt-redshift adapter, and all required libraries to support dbt S3 remotes.
Alternatively, airflow-dbt-python can also be provided to AWS MWAA via a plugins.zip file. This can be achieved by adding an airflow-dbt-python wheel to your plugins.zip
For example, we can start by cloning the GitHub repository:
git clone https://github.com/tomasfarias/airflow-dbt-python.git
cd airflow-dbt-python
Then building an airflow-dbt-python wheel using uv:
uv build --wheel
The wheel file can now be added to your plugins.zip, and the requirements can be updated to point to this wheel file (note that the version placeholder ‘X.Y.Z’ has to be replaced by the actual version you have just built):
/usr/local/airflow/plugins/airflow_dbt_python-X.Y.Z-py3-none-any.whl
In GCP Cloud Composer
Add airflow-dbt-python to your PyPI packages list.
Refer to the GCP Cloud Composer documentation on how to do this.
In other managed services
airflow-dbt-python should be compatible with most or all Airflow managed services. Consult the documentation specific to your provider.
If you notice an issue when installing airflow-dbt-python in a specific managed service, please open an issue.
Accessing a dbt project
airflow-dbt-python needs a way to access your dbt project to run. The requirements to grant this access will depend on how your Airflow environment is setup:
Using a local executor with a single-machine installation means we can rely on the local machine’s filesystem to store a dbt project. This also applies to
DebugExecutorandSequentialExecutor, but these executors are generally only used for debugging/development so we will ignore them. If you are running a setup like this, then simply ensure your dbt project and profiles.yml exist somewhere in theLocalExecutor’s file system.Once your setup has evolved to a multi-machine/cloud installation with any remote executor, we must rely on a remote storage for dbt files. Currently, supported remote storages include AWS S3, Google Cloud Storage and Git repositories although more are in plans to be added. In this setup, your dbt project will need to be uploaded to a remote storage that Airflow can access. airflow-dbt-python can utilize Airflow connections to access these storages.
Single-machine installation
As we can rely on the local machine’s filesystem, simply copy or move your dbt project profiles.yml to a path in the instance executing Airflow.
Files may be laid out as:
.
|-- ~/.dbt/
| `-- profiles.yml
`-- /path/to/project/
|-- dbt_project.yml
|-- models/
| |-- model1.sql
| `-- model2.sql
|-- seeds/
| |-- seed1.csv
| `-- seed2.csv
|-- macros/
| |-- macro1.csv
| `-- macro2.csv
`-- tests/
|-- test1.sql
`-- test2.sql
Then we can simply set project_dir and profiles_dir to "/path/to/project/" and "~/.dbt/" respectively:
1import datetime as dt
2
3import pendulum
4from airflow import DAG
5from airflow_dbt_python.operators.dbt import DbtRunOperator
6
7with DAG(
8 dag_id="example_local_1",
9 schedule_interval="0 0 * * *",
10 start_date=pendulum.today("UTC").add(days=-1),
11 catchup=False,
12 dagrun_timeout=dt.timedelta(minutes=60),
13) as dag:
14 dbt_run = DbtRunOperator(
15 task_id="dbt_run_daily",
16 project_dir="/path/to/project",
17 profiles_dir="~/.dbt/",
18 select=["+tag:daily"],
19 exclude=["tag:deprecated"],
20 target="production",
21 profile="my-project",
22 )
Note
Setting profiles_dir to "~/.dbt/" can be omitted as this is the default value.
If we have multiple operators, we can also utilize default arguments and include other parameters like the profile and target to use:
1import datetime as dt
2
3import pendulum
4from airflow import DAG
5from airflow_dbt_python.operators.dbt import DbtRunOperator, DbtSeedOperator
6
7default_args = {
8 "project_dir": "/path/to/project/",
9 "profiles_dir": "~/.dbt/",
10 "target": "production",
11 "profile": "my-project",
12}
13
14with DAG(
15 dag_id="example_local_2",
16 schedule_interval="0 0 * * *",
17 start_date=pendulum.today("UTC").add(days=-1),
18 catchup=False,
19 dagrun_timeout=dt.timedelta(minutes=60),
20 default_args=default_args,
21) as dag:
22 dbt_seed = DbtSeedOperator(
23 task_id="dbt_seed",
24 )
25
26 dbt_run = DbtRunOperator(
27 task_id="dbt_run_daily",
28 select=["+tag:daily"],
29 exclude=["tag:deprecated"],
30 )
31
32 dbt_seed >> dbt_run
Note
dbt supports configuration via environment variables, which may also be used. Additionally, profile and target may be omitted if already specified in dbt_project.yml and profiles.yml respectively.
Multi-machine/cloud installation
When Airflow is installed is running on a multi- machine or cloud installation, each individual worker does not have does not have access to a common filesystem that we can reliably use to store dbt project files (at least, assuming any deployment with more than one worker). This includes both self-hosted deployments as well as managed Airflow deployments like AWS MWAA or Astronomer.
For these deployments we must rely on a dbt remote to download and, eventually, upload all required dbt files. The remote dbt URL may be used in place of a local project_dir or profiles_dir to have airflow-dbt-python download the dbt files in the remote into a temporary directory for execution.
Interactions with storages are supported by subclasses of DbtFSHook. Read the documentation dbt filesystem hooks to learn more about these hooks.
As an example, let’s upload our dbt project to an AWS S3 bucket. The files may end up structured in the bucket as:
s3://my-bucket/
.
|-- profiles/
| `-- profiles.yml
`-- project/
|-- dbt_project.yml
|-- models/
| |-- model1.sql
| `-- model2.sql
|-- seeds/
| |-- seed1.csv
| `-- seed2.csv
|-- macros/
| |-- macro1.csv
| `-- macro2.csv
`-- tests/
|-- test1.sql
`-- test2.sql
Then, we can alter the previous example DAG to set project_dir and profiles_dir to "s3://my-bucket/project/" and "s3://my-bucket/profiles/" respectively:
1import datetime as dt
2
3import pendulum
4from airflow import DAG
5from airflow_dbt_python.operators.dbt import DbtRunOperator
6
7with DAG(
8 dag_id="example_s3_remote_1",
9 schedule_interval="0 0 * * *",
10 start_date=pendulum.today("UTC").add(days=-1),
11 catchup=False,
12 dagrun_timeout=dt.timedelta(minutes=60),
13) as dag:
14 dbt_run = DbtRunOperator(
15 task_id="dbt_run_daily",
16 project_dir="s3://my-bucket/project/",
17 profiles_dir="s3://my-bucket/profiles/",
18 select=["+tag:daily"],
19 exclude=["tag:deprecated"],
20 target="production",
21 profile="my-project",
22 )
airflow-dbt-python uses the URL scheme (in this example, "s3") to figure out the type of remote, and the corresponding DbtFSHook to download all required files. An exception would be raised if the scheme does not point to a supported remote.
airflow-dbt-python takes care of adjusting any path-like arguments so that they are pointing to files in a local temporary directory once all the dbt files are download from the remote storage.
Let’s do another example where we upload our dbt project to a GitHub repository. For this example, let’s use dbt-labs’ own jaffle_shop.
The DAG looks the same as the AWS S3 example, except that now we use the GitHub repository’s SSH URL as the project_dir argument:
1import datetime as dt
2
3import pendulum
4from airflow import DAG
5from airflow_dbt_python.operators.dbt import DbtRunOperator
6
7with DAG(
8 dag_id="example_git_remote_1",
9 schedule_interval="0 0 * * *",
10 start_date=pendulum.today("UTC").add(days=-1),
11 catchup=False,
12 dagrun_timeout=dt.timedelta(minutes=60),
13) as dag:
14 dbt_run = DbtRunOperator(
15 task_id="dbt_run_daily",
16 project_dir="git+ssh://github.com:dbt-labs/jaffle-shop-classic",
17 select=["+tag:daily"],
18 exclude=["tag:deprecated"],
19 dbt_conn_id="my_warehouse_connection",
20 profile="my-project",
21 )
airflow-dbt-python can determine this URL requires a DbtGitFSHook by looking at the URL’s scheme ("git+ssh"). As we are passing an SSH URL, DbtGitFSHook can utilize an Airflow SSH Connection as it subclasses Airflow’s SSHHook. This connection type allows us to setup the necessary SSH keys to access GitHub. Of course, as this is a public repository, we could have just used an HTTP URL, but for private repositories an SSH key may be required.
Note
airflow-dbt-python can utilize Airflow Connections to fetch connection details for dbt remotes as well as for dbt targets (e.g. for your data warehouse). The project_conn_id and profiles_conn_id arguments that all dbt operators have refer to Airflow Connections to used to fetch dbt projects and profiles.yml respectively, whereas the target argument can point to an Airflow Connection used to setup dbt to access your data warehouse.
Notice we are omitting the profiles_dir argument as the jaffle_shop repo doesn’t include a profiles.yml file we can use. When we omit profiles_dir, airflow-dbt-python will attempt to find dbt connection details in one of two places:
First, it will check if the
project_dirURL already includes aprofiles.yml. If so, we can use it.If it’s not included, airflow-dbt-python will try to find an Airflow Connection using the
targetargument.
Airflow Connections are generally created in the UI, but for illustration purposes we can create one also in our DAG with:
1from airflow import DAG, settings
2from airflow.models.connection import Connection
3
4session = settings.Session()
5my_conn = Connection(
6 conn_id="my_db_connection",
7 conn_type="postgres",
8 description="A test postgres connection",
9 host="localhost",
10 login="username",
11 port=5432,
12 schema="my_dbt_schema",
13 password="password", # pragma: allowlist secret
14 # Other dbt parameters can be added as extras
15 extra=json.dumps(dict(threads=4, sslmode="require")),
16)
17
18session.add(my_conn)
19session.commit()