Python models
Note that only specific data platforms support dbt-py models.
We encourage you to:
- Read the original discussion that proposed this feature.
- Contribute to best practices for developing Python models in dbt.
- Share your thoughts and ideas on next steps for Python models.
- Join the #dbt-core-python-models channel in the dbt Community Slack.
Overview
dbt Python (dbt-py
) models can help you solve use cases that can't be solved with SQL. You can perform analyses using tools available in the open-source Python ecosystem, including state-of-the-art packages for data science and statistics. Before, you would have needed separate infrastructure and orchestration to run Python transformations in production. Python transformations defined in dbt are models in your project with all the same capabilities around testing, documentation, and lineage.
import ...
def model(dbt, session):
my_sql_model_df = dbt.ref("my_sql_model")
final_df = ... # stuff you can't write in SQL!
return final_df
version: 2
models:
- name: my_python_model
# Document within the same codebase
description: My transformation written in Python
# Configure in ways that feel intuitive and familiar
config:
materialized: table
tags: ['python']
# Test the results of my Python transformation
columns:
- name: id
# Standard validation for 'grain' of Python results
tests:
- unique
- not_null
tests:
# Write your own validation logic (in SQL) for Python results
- custom_generic_test
The prerequisites for dbt Python models include using an adapter for a data platform that supports a fully featured Python runtime. In a dbt Python model, all Python code is executed remotely on the platform. None of it is run by dbt locally. We believe in clearly separating model definition from model execution. In this and many other ways, you'll find that dbt's approach to Python models mirrors its longstanding approach to modeling data in SQL.
We've written this guide assuming that you have some familiarity with dbt. If you've never before written a dbt model, we encourage you to start by first reading dbt Models. Throughout, we'll be drawing connections between Python models and SQL models, as well as making clear their differences.
What is a Python model?
A dbt Python model is a function that reads in dbt sources or other models, applies a series of transformations, and returns a transformed dataset. DataFrame operations define the starting points, the end state, and each step along the way.
This is similar to the role of CTEs in dbt SQL models. We use CTEs to pull in upstream datasets, define (and name) a series of meaningful transformations, and end with a final select
statement. You can run the compiled version of a dbt SQL model to see the data included in the resulting view or table. When you dbt run
, dbt wraps that query in create view
, create table
, or more complex DDL to save its results in the database.
Instead of a final select
statement, each Python model returns a final DataFrame. Each DataFrame operation is "lazily evaluated." In development, you can preview its data, using methods like .show()
or .head()
. When you run a Python model, the full result of the final DataFrame will be saved as a table in your data warehouse.
dbt Python models have access to almost all of the same configuration options as SQL models. You can test and document them, add tags
and meta
properties, and grant access to their results to other users. You can select them by their name, file path, configurations, whether they are upstream or downstream of another model, or if they have been modified compared to a previous project state.
Defining a Python model
Each Python model lives in a .py
file in your models/
folder. It defines a function named model()
, which takes two parameters:
dbt
: A class compiled by dbt Core, unique to each model, enables you to run your Python code in the context of your dbt project and DAG.session
: A class representing your data platform’s connection to the Python backend. The session is needed to read in tables as DataFrames, and to write DataFrames back to tables. In PySpark, by convention, theSparkSession
is namedspark
, and available globally. For consistency across platforms, we always pass it into themodel
function as an explicit argument calledsession
.
The model()
function must return a single DataFrame. On Snowpark (Snowflake), this can be a Snowpark or pandas DataFrame. Via PySpark (Databricks + BigQuery), this can be a Spark, pandas, or pandas-on-Spark DataFrame. For more about choosing between pandas and native DataFrames, see DataFrame API + syntax.
When you dbt run --select python_model
, dbt will prepare and pass in both arguments (dbt
and session
). All you have to do is define the function. This is how every single Python model should look:
def model(dbt, session):
...
return final_df
Referencing other models
Python models participate fully in dbt's directed acyclic graph (DAG) of transformations. Use the dbt.ref()
method within a Python model to read data from other models (SQL or Python). If you want to read directly from a raw source table, use dbt.source()
. These methods return DataFrames pointing to the upstream source, model, seed, or snapshot.
def model(dbt, session):
# DataFrame representing an upstream model
upstream_model = dbt.ref("upstream_model_name")
# DataFrame representing an upstream source
upstream_source = dbt.source("upstream_source_name", "table_name")
...
Of course, you can ref()
your Python model in downstream SQL models, too:
with upstream_python_model as (
select * from {{ ref('my_python_model') }}
),
...
Referencing ephemeral models is currently not supported (see feature request)
Configuring Python models
Just like SQL models, there are three ways to configure Python models:
- In
dbt_project.yml
, where you can configure many models at once - In a dedicated
.yml
file, within themodels/
directory - Within the model's
.py
file, using thedbt.config()
method
Calling the dbt.config()
method will set configurations for your model within your .py
file, similar to the {{ config() }}
macro in .sql
model files:
def model(dbt, session):
# setting configuration
dbt.config(materialized="table")
There's a limit to how complex you can get with the dbt.config()
method. It accepts only literal values (strings, booleans, and numeric types) and dynamic configuration. Passing another function or a more complex data structure is not possible. The reason is that dbt statically analyzes the arguments to config()
while parsing your model without executing your Python code. If you need to set a more complex configuration, we recommend you define it using the config
property in a YAML file.
Accessing project context
dbt Python models don't use Jinja to render compiled code. Python models have limited access to global project contexts compared to SQL models. That context is made available from the dbt
class, passed in as an argument to the model()
function.
Out of the box, the dbt
class supports:
- Returning DataFrames referencing the locations of other resources:
dbt.ref()
+dbt.source()
- Accessing the database location of the current model:
dbt.this()
(also:dbt.this.database
,.schema
,.identifier
) - Determining if the current model's run is incremental:
dbt.is_incremental
It is possible to extend this context by "getting" them with dbt.config.get()
after they are configured in the model's config. Starting from dbt v1.8, the dbt.config.get()
method supports dynamic access to configurations within Python models, enhancing flexibility in model logic. This includes inputs such as var
, env_var
, and target
. If you want to use those values for the conditional logic in your model, we require setting them through a dedicated YAML file config:
version: 2
models:
- name: my_python_model
config:
materialized: table
target_name: "{{ target.name }}"
specific_var: "{{ var('SPECIFIC_VAR') }}"
specific_env_var: "{{ env_var('SPECIFIC_ENV_VAR') }}"
Then, within the model's Python code, use the dbt.config.get()
function to access values of configurations that have been set:
def model(dbt, session):
target_name = dbt.config.get("target_name")
specific_var = dbt.config.get("specific_var")
specific_env_var = dbt.config.get("specific_env_var")
orders_df = dbt.ref("fct_orders")
# limit data in dev
if target_name == "dev":
orders_df = orders_df.limit(500)
Materializations
Python models support these materializations:
table
(default)incremental
Incremental Python models support all the same incremental strategies as their SQL counterparts. The specific strategies supported depend on your adapter. As an example, incremental models are supported on BigQuery with Dataproc for the merge
incremental strategy; the insert_overwrite
strategy is not yet supported.
Python models can't be materialized as view
or ephemeral
. Python isn't supported for non-model resource types (like tests and snapshots).
For incremental models, like SQL models, you need to filter incoming tables to only new rows of data:
- Snowpark
- PySpark
import snowflake.snowpark.functions as F
def model(dbt, session):
dbt.config(materialized = "incremental")
df = dbt.ref("upstream_table")
if dbt.is_incremental:
# only new rows compared to max in current table
max_from_this = f"select max(updated_at) from {dbt.this}"
df = df.filter(df.updated_at >= session.sql(max_from_this).collect()[0][0])
# or only rows from the past 3 days
df = df.filter(df.updated_at >= F.dateadd("day", F.lit(-3), F.current_timestamp()))
...
return df
import pyspark.sql.functions as F
def model(dbt, session):
dbt.config(materialized = "incremental")
df = dbt.ref("upstream_table")
if dbt.is_incremental:
# only new rows compared to max in current table
max_from_this = f"select max(updated_at) from {dbt.this}"
df = df.filter(df.updated_at >= session.sql(max_from_this).collect()[0][0])
# or only rows from the past 3 days
df = df.filter(df.updated_at >= F.date_add(F.current_timestamp(), F.lit(-3)))
...
return df
Python-specific functionality
Defining functions
In addition to defining a model
function, the Python model can import other functions or define its own. Here's an example on Snowpark, defining a custom add_one
function:
def add_one(x):
return x + 1
def model(dbt, session):
dbt.config(materialized="table")
temps_df = dbt.ref("temperatures")
# warm things up just a little
df = temps_df.withColumn("degree_plus_one", add_one(temps_df["degree"]))
return df
Currently, Python functions defined in one dbt model can't be imported and reused in other models. Refer to Code reuse for the potential patterns being considered.
Using PyPI packages
You can also define functions that depend on third-party packages so long as those packages are installed and available to the Python runtime on your data platform. See notes on "Installing Packages" for specific data platforms.
In this example, we use the holidays
package to determine if a given date is a holiday in France. The code below uses the pandas API for simplicity and consistency across platforms. The exact syntax, and the need to refactor for multi-node processing, still vary.
- Snowpark
- PySpark
import holidays
def is_holiday(date_col):
# Chez Jaffle
french_holidays = holidays.France()
is_holiday = (date_col in french_holidays)
return is_holiday
def model(dbt, session):
dbt.config(
materialized = "table",
packages = ["holidays"]
)
orders_df = dbt.ref("stg_orders")
df = orders_df.to_pandas()
# apply our function
# (columns need to be in uppercase on Snowpark)
df["IS_HOLIDAY"] = df["ORDER_DATE"].apply(is_holiday)
df["ORDER_DATE"].dt.tz_localize('UTC') # convert from Number/Long to tz-aware Datetime
# return final dataset (Pandas DataFrame)
return df
import holidays
def is_holiday(date_col):
# Chez Jaffle
french_holidays = holidays.France()
is_holiday = (date_col in french_holidays)
return is_holiday
def model(dbt, session):
dbt.config(
materialized = "table",
packages = ["holidays"]
)
orders_df = dbt.ref("stg_orders")
df = orders_df.to_pandas_on_spark() # Spark 3.2+
# df = orders_df.toPandas() in earlier versions
# apply our function
df["is_holiday"] = df["order_date"].apply(is_holiday)
# convert back to PySpark
df = df.to_spark() # Spark 3.2+
# df = session.createDataFrame(df) in earlier versions
# return final dataset (PySpark DataFrame)
return df
Configuring packages
We encourage you to configure required packages and versions so dbt can track them in project metadata. This configuration is required for the implementation on some platforms. If you need specific versions of packages, specify them.
def model(dbt, session):
dbt.config(
packages = ["numpy==1.23.1", "scikit-learn"]
)
version: 2
models:
- name: my_python_model
config:
packages:
- "numpy==1.23.1"
- scikit-learn
User-defined functions (UDFs)
You can use the @udf
decorator or udf
function to define an "anonymous" function and call it within your model
function's DataFrame transformation. This is a typical pattern for applying more complex functions as DataFrame operations, especially if those functions require inputs from third-party packages.
- Snowpark
- PySpark
import snowflake.snowpark.types as T
import snowflake.snowpark.functions as F
import numpy
def register_udf_add_random():
add_random = F.udf(
# use 'lambda' syntax, for simple functional behavior
lambda x: x + numpy.random.normal(),
return_type=T.FloatType(),
input_types=[T.FloatType()]
)
return add_random
def model(dbt, session):
dbt.config(
materialized = "table",
packages = ["numpy"]
)
temps_df = dbt.ref("temperatures")
add_random = register_udf_add_random()
# warm things up, who knows by how much
df = temps_df.withColumn("degree_plus_random", add_random("degree"))
return df
Note: Due to a Snowpark limitation, it is not currently possible to register complex named UDFs within stored procedures and, therefore, dbt Python models. We are looking to add native support for Python UDFs as a project/DAG resource type in a future release. For the time being, if you want to create a "vectorized" Python UDF via the Batch API, we recommend either:
- Writing
create function
inside a SQL macro, to run as a hook or run-operation - Registering from a staged file within your Python model code
import pyspark.sql.types as T
import pyspark.sql.functions as F
import numpy
# use a 'decorator' for more readable code
@F.udf(returnType=T.DoubleType())
def add_random(x):
random_number = numpy.random.normal()
return x + random_number
def model(dbt, session):
dbt.config(
materialized = "table",
packages = ["numpy"]
)
temps_df = dbt.ref("temperatures")
# warm things up, who knows by how much
df = temps_df.withColumn("degree_plus_random", add_random("degree"))
return df