airflow_dbt_python.hooks

Core dbt hook

Provides a hook to interact with dbt.

Interacting with dbt mainly means setting up a dbt project and running a dbt task.

class airflow_dbt_python.hooks.dbt.DbtHook(*args, dbt_conn_id=None, project_conn_id=None, profiles_conn_id=None, **kwargs)[source]

A hook to interact with dbt.

Allows for running dbt tasks and provides required configurations for each task.

Parameters:
  • dbt_conn_id (Optional[str])

  • project_conn_id (Optional[str])

  • profiles_conn_id (Optional[str])

dbt_directory(config, upload_dbt_project=False, delete_before_upload=False, replace_on_upload=False, env_vars=None)[source]

Provides a temporary directory to execute dbt.

Creates a temporary directory for dbt to run in and prepares the dbt files if they need to be pulled from S3. If a S3 backend is being used, and self.upload_dbt_project is True, before leaving the temporary directory, we push back the project to S3. Pushing back a project enables commands like deps or docs generate.

Yields:

The temporary directory’s name.

Parameters:
  • upload_dbt_project (bool)

  • delete_before_upload (bool)

  • replace_on_upload (bool)

  • env_vars (Dict[str, Any] | None)

Return type:

Iterator[str]

download_dbt_profiles(profiles_dir, destination)[source]

Pull a dbt profiles.yml file from a given profiles_dir.

This operation is delegated to a DbtFSHook. An optional connection id is supported for remotes that require it.

Parameters:
  • profiles_dir (URLLike)

  • destination (URLLike)

Return type:

Path

download_dbt_project(project_dir, destination)[source]

Pull a dbt project from a given project_dir.

This operation is delegated to a DbtFSHook. An optional connection id is supported for remotes that require it.

Parameters:
  • project_dir (URLLike)

  • destination (URLLike)

Return type:

Path

static get_dbt_target_hook(conn_id)[source]

Get a hook to get a dbt profile based on the Airflow connection.

Parameters:

conn_id (str)

Return type:

DbtConnectionHook

get_dbt_task_config(command, **config_kwargs)[source]

Initialize a configuration for given dbt command with given kwargs.

Parameters:

command (str)

Return type:

BaseConfig

static get_fs_hook(scheme, conn_id)[source]

Get a fs_hook to interact with dbt files.

FSHooks are defined by the scheme we are looking for and an optional connection id if we are looking to interface with any Airflow hook that uses a connection.

Parameters:
  • scheme (str)

  • conn_id (Optional[str])

Return type:

DbtFSHook

prepare_directory(tmp_dir, project_dir, profiles_dir=None)[source]

Prepares a dbt directory for execution of a dbt task.

Preparation involves downloading the required dbt project files and profiles.yml.

Parameters:
  • tmp_dir (str)

  • project_dir (URLLike)

  • profiles_dir (Optional[URLLike])

Return type:

tuple[str, Optional[str]]

run_dbt_task(command, upload_dbt_project=False, delete_before_upload=False, replace_on_upload=False, artifacts=None, env_vars=None, write_perf_info=False, **kwargs)[source]

Run a dbt task with a given configuration and return the results.

The configuration used determines the task that will be ran.

Returns:

A tuple containing a boolean indicating success and optionally the results

of running the dbt command.

Parameters:
  • command (str)

  • upload_dbt_project (bool)

  • delete_before_upload (bool)

  • replace_on_upload (bool)

  • artifacts (Iterable[str] | None)

  • env_vars (Dict[str, Any] | None)

  • write_perf_info (bool)

Return type:

DbtTaskResult

setup_dbt_logging(debug)[source]

Setup dbt logging.

Starting with dbt v1, dbt initializes two loggers: default_file and default_stdout. As these are initialized by the CLI app, we need to initialize them here.

Parameters:

debug (bool | None)

upload_dbt_project(project_dir, destination, replace=False, delete_before=False)[source]

Push a dbt project from a given project_dir.

This operation is delegated to a DbtFSHook. An optional connection id is supported for remotes that require it.

Parameters:
  • project_dir (URLLike)

  • destination (URLLike)

  • replace (bool)

  • delete_before (bool)

Return type:

None

class airflow_dbt_python.hooks.dbt.DbtTaskResult(success, run_results, artifacts)[source]

A tuple returned after a dbt task executes.

Parameters:
  • success (bool)

  • run_results (Optional[RunResult])

  • artifacts (dict[str, Any])

success

Whether the task succeeded or not.

Type:

bool

run_results

Results from the dbt task, if available.

Type:

Optional[RunResult]

artifacts

A dictionary of saved dbt artifacts. It may be empty.

Type:

dict[str, Any]

artifacts: dict[str, Any]

Alias for field number 2

run_results: RunResult | None

Alias for field number 1

success: bool

Alias for field number 0

class airflow_dbt_python.hooks.dbt.DbtTemporaryDirectory(suffix=None, prefix=None, dir=None, ignore_cleanup_errors=True)[source]

A wrapper on TemporaryDirectory for older versions of Python.

Support for ignore_cleanup_errors was added in Python 3.10. There is a very obscure error that can happen when cleaning up a directory, even though everything should be cleaned. We would like to use ignore_cleanup_errors to provide clean up on a best-effort basis. For the time being, we are addressing this only for Python>=3.10.

Target hooks

Provides hooks to establish a dbt connection with supported targets.

The connection details will be extracted from an Airflow connection, and each target can interpret connection information differently according to its specific connection requirements.

class airflow_dbt_python.hooks.target.DbtBigQueryHook(*args, conn, **kwargs)[source]

A hook to interact with dbt using a BigQuery connection.

Parameters:

conn (Connection)

class airflow_dbt_python.hooks.target.DbtConnectionConditionParam(name, resolver)[source]

Connection parameter with dynamic override name and default value.

Parameters:
  • name (str)

  • resolver (Callable[[Connection], ResolverResult])

name

The original name of the parameter.

Type:

str

resolver

A function that resolves the parameter

Type:

Callable[[airflow.sdk.definitions.connection.Connection], airflow_dbt_python.hooks.target.ResolverResult]

name and default value based on the connection's `extra_dejson`.
name: str

Alias for field number 0

resolve(connection)[source]

Resolves the override name and default value for this parameter.

Parameters:

connection (Connection) – The Airflow connection object.

Returns:

The resolved override name and default value.

Return type:

ResolverResult

resolver: Callable[[Connection], ResolverResult]

Alias for field number 1

class airflow_dbt_python.hooks.target.DbtConnectionHook(*args, conn, **kwargs)[source]

A hook to get a dbt profile based on the Airflow connection.

Parameters:

conn (Connection)

classmethod get_db_conn_hook(conn_id)[source]

Get a dbt hook class depend on Airflow connection type.

Parameters:

conn_id (str)

Return type:

DbtConnectionHook

get_dbt_details_from_connection(conn)[source]

Extract dbt connection details from Airflow Connection.

dbt connection details may be present as Airflow Connection attributes or in the Connection’s extras. This class’ conn_params and conn_extra_params will be used to fetch required attributes from attributes and extras respectively. If conn_extra_params is empty, we merge parameters with all extras.

Subclasses may override this class attributes to narrow down the connection details for a specific dbt target (like Postgres, or Redshift).

Returns:

A dictionary of dbt connection details.

Parameters:

conn (Connection)

Return type:

dict[str, Any]

get_dbt_target_from_connection()[source]

Return a dictionary of connection details to use as a dbt target.

The connection details are fetched from an Airflow connection identified by self.dbt_conn_id.

Returns:

A dictionary with a configuration for a dbt target, or None if a matching

Airflow connection is not found for given dbt target.

Return type:

dict[str, Any] | None

class airflow_dbt_python.hooks.target.DbtConnectionHookMeta(name, bases, attrs, **kwargs)[source]

A hook metaclass to collect all subclasses of DbtConnectionHook.

Return type:

DbtConnectionHookMeta

class airflow_dbt_python.hooks.target.DbtConnectionParam(name, store_override_name=None, default=None, converter=None)[source]

A tuple indicating connection parameters relevant to dbt.

Parameters:
  • name (str)

  • store_override_name (str | None)

  • default (Any | None)

  • converter (Callable[[Any], Any] | None)

name

The name of the connection parameter. This name will be used to get the parameter from an Airflow Connection or its extras.

Type:

str

store_override_name

A new name for the connection parameter. If not None, this is the name used in a dbt profiles.

Type:

str | None

default

A default value if the parameter is not found.

Type:

Any | None

converter: Callable[[Any], Any] | None

Alias for field number 3

default: Any | None

Alias for field number 2

name: str

Alias for field number 0

property override_name

Returns the override_name if defined, otherwise defaults to name.

>>> DbtConnectionParam("login", "user").override_name
'user'
>>> DbtConnectionParam("port").override_name
'port'
store_override_name: str | None

Alias for field number 1

class airflow_dbt_python.hooks.target.DbtPostgresHook(*args, conn, **kwargs)[source]

A hook to interact with dbt using a Postgres connection.

Parameters:

conn (Connection)

get_dbt_details_from_connection(conn)[source]

Extract dbt connection details from Airflow Connection.

dbt connection details may be present as Airflow Connection attributes or in the Connection’s extras. This class’ conn_params and conn_extra_params will be used to fetch required attributes from attributes and extras respectively. If conn_extra_params is empty, we merge parameters with all extras.

Subclasses may override this class attributes to narrow down the connection details for a specific dbt target (like Postgres, or Redshift).

Returns:

A dictionary of dbt connection details.

Parameters:

conn (Connection)

Return type:

dict[str, Any]

class airflow_dbt_python.hooks.target.DbtRedshiftHook(*args, conn, **kwargs)[source]

A hook to interact with dbt using a Redshift connection.

Parameters:

conn (Connection)

class airflow_dbt_python.hooks.target.DbtSnowflakeHook(*args, conn, **kwargs)[source]

A hook to interact with dbt using a Snowflake connection.

Parameters:

conn (Connection)

class airflow_dbt_python.hooks.target.DbtSparkHook(*args, conn, **kwargs)[source]

A hook to interact with dbt using a Spark connection.

Parameters:

conn (Connection)

class airflow_dbt_python.hooks.target.DbtTrinoHook(*args, conn, **kwargs)[source]

A hook to interact with dbt using a Trino connection.

Parameters:

conn (Connection)

class airflow_dbt_python.hooks.target.ResolverCondition(condition_key, comparison_operator, expected)[source]

Condition for resolving connection parameters based on extra_dejson.

Parameters:
  • condition_key (str)

  • comparison_operator (Callable[[Any, Any], bool])

  • expected (Any)

condition_key

The key in extra_dejson to check.

Type:

str

comparison_operator

A function to compare the actual value with the expected value.

Type:

Callable[[Any, Any], bool]

expected

The expected value for the condition to be satisfied.

Type:

Any

comparison_operator: Callable[[Any, Any], bool]

Alias for field number 1

condition_key: str

Alias for field number 0

expected: Any

Alias for field number 2

class airflow_dbt_python.hooks.target.ResolverResult(override_name, default)[source]

Result of resolving a connection parameter.

Parameters:
  • override_name (str | None)

  • default (Any | None)

override_name

The name to override the parameter with, if applicable.

Type:

str | None

default

The default value to use if no value is found.

Type:

Any | None

default: Any | None

Alias for field number 1

override_name: str | None

Alias for field number 0

airflow_dbt_python.hooks.target.make_extra_dejson_resolver(*conditions, default=(None, None))[source]

Creates a resolver function for override names and defaults.

Parameters:
  • *conditions (tuple[ResolverCondition, ResolverResult]) – A sequence of conditions and their corresponding results.

  • default (ResolverResult) – The default result if no condition is met.

Returns:

A function that takes a Connection object and returns the appropriate ResolverResult.

Return type:

Callable[[Connection], ResolverResult]

airflow_dbt_python.hooks.target.try_decode_base64(s)[source]

Attempt to decode a string from base64.

If the string is not valid base64, returns the original value.

Parameters:

s (str) – The string to decode.

Returns:

The decoded string, or the original value if decoding fails.

Return type:

str

Filesystem hooks

The DbtFSHook interface

A DbtFSHook can download and upload dbt project files to a filesystem.

By filesystem we understand any storage that can hold dbt project files, like an S3 bucket or a git repository.

class airflow_dbt_python.hooks.fs.__init__.DbtFSHook(context=None)[source]

Represents a dbt project storing any dbt files.

A concrete backend class should implement the push and pull methods to fetch one or more dbt files. Backends can rely on an Airflow connection with a corresponding hook, but this is not enforced.

Delegating the responsibility of dealing with dbt files to backend subclasses allows us to support more backends without changing the DbtHook.

connection_id

An optional Airflow connection. If defined, will be used to instantiate a hook for this backend.

download_dbt_profiles(source, destination)[source]

Download a dbt profiles.yml file from a given source.

Parameters:
  • source (URL | str | Path) – URLLike pointing to a remote containing a profiles.yml file.

  • destination (URL | str | Path) – URLLike to a directory where the profiles.yml will be stored.

Returns:

The destination Path.

Return type:

Path

download_dbt_project(source, destination)[source]

Download all dbt project files from a given source.

Parameters:
  • source (URL | str | Path) – URLLike to a directory containing a dbt project.

  • destination (URL | str | Path) – URLLike to a directory where the will be stored.

Returns:

The destination Path.

Return type:

Path

upload_dbt_project(source, destination, replace=False, delete_before=False)[source]

Upload all dbt project files from a given source.

Parameters:
  • source (URL | str | Path) – URLLike to a directory containing a dbt project.

  • destination (URL | str | Path) – URLLike to a directory where the dbt project will be stored.

  • replace (bool) – Flag to indicate whether to replace existing files.

  • delete_before (bool) – Flag to indicate wheter to clear any existing files before uploading the dbt project.

airflow_dbt_python.hooks.fs.__init__.get_fs_hook(scheme, conn_id=None)[source]

Get a DbtFSHook as long as the scheme is supported.

In the future we should make our hooks discoverable and package ourselves as a proper Airflow providers package.

Parameters:
  • scheme (str)

  • conn_id (str | None)

Return type:

DbtFSHook

Git

A concrete DbtFSHook for git repositories with dulwich.

class airflow_dbt_python.hooks.fs.git.DbtGitFSHook(git_conn_id=None, commit_author='Airflow dbt <>', commit_msg='Airflow dbt committed on {ts: Y%-%m-%d H%:%M:%S}', username='git', upload_branch=None, upload_filter=<function no_filter>, remote_host='localhost', **kwargs)[source]

A dbt remote implementation for git repositories.

This concrete remote class implements the DbtFs interface by using any git repository to upload and download dbt files to and from.

The DbtGitFSHook subclasses Airflow’s SSHHook to interact with to utilize its defined methods to operate with SSH connections. However, SSH connections are not the only ones supported for interacting with git repositories: HTTP (http:// or https://) and plain TCP (git://) may be used.

Parameters:
  • git_conn_id (str | None)

  • commit_author (str)

  • commit_msg (str)

  • username (str)

  • upload_branch (str | None)

  • upload_filter (Callable[[URL], bool])

  • remote_host (str)

get_git_client_path(url)[source]

Initialize a dulwich git client according to given URL’s scheme.

Parameters:

url (URL)

Return type:

Tuple[Urllib3HttpGitClient | SSHGitClient | TCPGitClient, str, str | None]

airflow_dbt_python.hooks.fs.git.no_filter(_)[source]

A no-op filter.

Parameters:

_ (URL)

Return type:

bool

Local

A local filesystem remote for dbt.

Intended to be used only when running Airflow with a LocalExceutor.

class airflow_dbt_python.hooks.fs.local.DbtLocalFsHook(fs_conn_id='fs_default')[source]

A concrete dbt hook for a local filesystem.

This hook is intended to be used when running Airflow with a LocalExecutor, and it relies on shutil from the standard library to do all the file manipulation. For these reasons, running multiple concurrent tasks with this remote may lead to race conditions if attempting to push files to the remote.

Parameters:

fs_conn_id (str)

copy(source, destination, replace=False, delete_before=False)[source]

Push all dbt files under the source directory to another local path.

Pushing supports zipped projects: the destination will be used to determine if we are working with a zip file by looking at the file extension.

Parameters:
  • source (URL) – A local file path where to fetch the files to push.

  • destination (URL) – A local path where the file should be copied.

  • replace (bool) – Whether to replace existing files or not.

  • delete_before (bool) – Whether to delete the contents of destination before pushing.

Return type:

None

copy_one(source, destination, replace=False)[source]

Pull many files from local path.

If the file already exists, it will be ignored if replace is False (the default).

Parameters:
  • source (URL) – A local path to a directory containing the files to pull.

  • destination (URL) – A destination path where to pull the file to.

  • replace (bool) – A bool flag to indicate whether to replace existing files.

Return type:

None

get_url(url)[source]

Return an url relative to this hook’s basepath.

If the given url is absolute, simply return the url. If it’s none, then return an url made from basepath.

Parameters:

url (URL | None)

Return type:

URL

S3

An implementation for an S3 remote for dbt.

class airflow_dbt_python.hooks.fs.s3.DbtS3FSHook(*args, **kwargs)[source]

A dbt remote implementation for S3.

This concrete remote class implements the DbtFs interface by using S3 as a storage for uploading and downloading dbt files to and from. The DbtS3FSHook subclasses Airflow’s S3Hook to interact with S3. A connection id may be passed to set the connection to use with S3.

download_s3_object(s3_object, destination)[source]

Download an S3 object into a local destination.

Parameters:

destination (URL)

Return type:

None

iter_url(source)[source]

Iterate over an S3 key given by a URL.

Parameters:

source (URL)

Return type:

Iterable[URL]

load_file_handle_replace_error(file_url, key, bucket_name=None, replace=False, encrypt=False, gzip=False, acl_policy=None)[source]

Calls S3Hook.load_file but handles ValueError when replacing existing keys.

Will also log a warning whenever attempting to replace an existing key with replace = False.

Returns:

True if no ValueError was raised, False otherwise.

Parameters:
  • file_url (URL | str | Path)

  • key (str)

  • bucket_name (str | None)

  • replace (bool)

  • encrypt (bool)

  • gzip (bool)

  • acl_policy (str | None)

Return type:

bool

GCS

An implementation for an GCS remote for dbt.

class airflow_dbt_python.hooks.fs.gcs.DbtGCSFSHook(*args, **kwargs)[source]

A dbt remote implementation for GCS.

This concrete remote class implements the DbtFs interface by using GCS as a storage for uploading and downloading dbt files to and from. The DbtGCSFSHook subclasses Airflow’s GCSHook to interact with GCS. A connection id may be passed to set the connection to use with GCS.

check_for_key(key, bucket_name)[source]

Checking if the key exists in the bucket.

Parameters:
  • key (str)

  • bucket_name (str)

Return type:

bool

get_key(key, bucket_name)[source]

Get Blob object by key and bucket name.

Parameters:
  • key (str)

  • bucket_name (str)

Return type:

Blob

iter_url(source)[source]

Iterate over an GCS key given by a URL.

Parameters:

source (URL)

Return type:

Iterable[URL]

load_file(filename, key, bucket_name, replace=False, encrypt=False, gzip=False)[source]

Load a local file to GCS.

Parameters:
  • filename (Path | str) – path to the file to load.

  • key (str) – GCS key that will point to the file

  • bucket_name (str) – Name of the bucket in which to store the file

  • replace (bool) – A flag to decide whether or not to overwrite the key if it already exists. If replace is False and the key exists, an error will be raised.

  • encrypt (bool) – If True, the file will be encrypted on the server-side by GCS and will be stored in an encrypted form while at rest in GCS.

  • gzip (bool) – If True, the file will be compressed locally

Return type:

None

load_file_handle_replace_error(file_url, key, bucket_name=None, replace=False, encrypt=False, gzip=False)[source]

Calls GCSHook.load_file but handles ValueError when replacing existing keys.

Will also log a warning whenever attempting to replace an existing key with replace = False.

Returns:

True if no ValueError was raised, False otherwise.

Parameters:
  • file_url (URL | str | Path)

  • key (str)

  • bucket_name (str | None)

  • replace (bool)

  • encrypt (bool)

  • gzip (bool)

Return type:

bool