Add sources to your DAG
Related reference docs
Using sources
Sources make it possible to name and describe the data loaded into your warehouse by your Extract and Load tools. By declaring these tables as sources in dbt, you can then
- select from source tables in your models using the
{{ source() }}
function, helping define the lineage of your data - test your assumptions about your source data
- calculate the freshness of your source data
Declaring a source
Sources are defined in .yml
files nested under a sources:
key.
version: 2
sources:
- name: jaffle_shop
database: raw
schema: jaffle_shop
tables:
- name: orders
- name: customers
- name: stripe
tables:
- name: payments
*By default, schema
will be the same as name
. Add schema
only if you want to use a source name that differs from the existing schema.
If you're not already familiar with these files, be sure to check out the documentation on properties.yml files before proceeding.
Selecting from a source
Once a source has been defined, it can be referenced from a model using the {{ source()}}
function.
select
...
from {{ source('jaffle_shop', 'orders') }}
left join {{ source('jaffle_shop', 'customers') }} using (customer_id)
dbt will compile this to the full table name:
select
...
from raw.jaffle_shop.orders
left join raw.jaffle_shop.customers using (customer_id)
Using the {{ source () }}
function also creates a dependency between the model and the source table.
Testing and documenting sources
You can also:
- Add data tests to sources
- Add descriptions to sources, that get rendered as part of your documentation site
These should be familiar concepts if you've already added tests and descriptions to your models (if not check out the guides on testing and documentation).
version: 2
sources:
- name: jaffle_shop
description: This is a replica of the Postgres database used by our app
tables:
- name: orders
description: >
One record per order. Includes cancelled and deleted orders.
columns:
- name: id
description: Primary key of the orders table
tests:
- unique
- not_null
- name: status
description: Note that the status can change over time
- name: ...
- name: ...
You can find more details on the available properties for sources in the reference section.
FAQs
Snapshotting source data freshness
With a couple of extra configs, dbt can optionally snapshot the "freshness" of the data in your source tables. This is useful for understanding if your data pipelines are in a healthy state, and is a critical component of defining SLAs for your warehouse.
Declaring source freshness
To configure sources to snapshot freshness information, add a freshness
block to your source and loaded_at_field
to your table declaration:
version: 2
sources:
- name: jaffle_shop
database: raw
freshness: # default freshness
warn_after: {count: 12, period: hour}
error_after: {count: 24, period: hour}
loaded_at_field: _etl_loaded_at
tables:
- name: orders
freshness: # make this a little more strict
warn_after: {count: 6, period: hour}
error_after: {count: 12, period: hour}
- name: customers # this inherits the default freshness defined in the jaffle_shop source block at the beginning
- name: product_skus
freshness: null # do not check freshness for this table
In the freshness
block, one or both of warn_after
and error_after
can be provided. If neither is provided, then dbt will not calculate freshness snapshots for the tables in this source.
Additionally, the loaded_at_field
is required to calculate freshness for a table. If a loaded_at_field
is not provided, then dbt will not calculate freshness for the table.
These configs are applied hierarchically, so freshness
and loaded_at_field
values specified for a source
will flow through to all of the tables
defined in that source. This is useful when all of the tables in a source have the same loaded_at_field
, as the config can just be specified once in the top-level source definition.
Checking source freshness
To snapshot freshness information for your sources, use the dbt source freshness
command (reference docs):
$ dbt source freshness
Behind the scenes, dbt uses the freshness properties to construct a select
query, shown below. You can find this query in the query logs.
select
max(_etl_loaded_at) as max_loaded_at,
convert_timezone('UTC', current_timestamp()) as snapshotted_at
from raw.jaffle_shop.orders
The results of this query are used to determine whether the source is fresh or not:
Filter
Some databases can have tables where a filter over certain columns are required, in order prevent a full scan of the table, which could be costly. In order to do a freshness check on such tables a filter
argument can be added to the configuration, e.g. filter: _etl_loaded_at >= date_sub(current_date(), interval 1 day)
. For the example above, the resulting query would look like
select
max(_etl_loaded_at) as max_loaded_at,
convert_timezone('UTC', current_timestamp()) as snapshotted_at
from raw.jaffle_shop.orders
where _etl_loaded_at >= date_sub(current_date(), interval 1 day)